Skip to main content

rust_supervisor/control/
handle.rs

1//! Public runtime control handle.
2//!
3//! The handle owns the command sender side and exposes asynchronous control
4//! methods. It keeps command construction separate from runtime execution.
5
6use crate::child_runner::runner::ChildRunReport;
7use crate::control::command::{CommandMeta, CommandResult, ControlCommand};
8use crate::dashboard::runtime::DashboardIpcRuntimeGuard;
9use crate::error::types::SupervisorError;
10use crate::id::types::{ChildId, SupervisorPath};
11use crate::observe::pipeline::{ObservabilityPipeline, TestRecorder};
12use crate::runtime::lifecycle::{RuntimeControlPlane, RuntimeExitReport, RuntimeHealthReport};
13use crate::runtime::message::{ControlPlaneMessage, RuntimeLoopMessage};
14use std::sync::Arc;
15use std::sync::Mutex;
16use tokio::sync::{broadcast, mpsc, oneshot};
17
18/// Cloneable handle used to control a running supervisor.
19#[derive(Debug, Clone)]
20pub struct SupervisorHandle {
21    /// Sender side used to submit runtime control commands.
22    command_sender: mpsc::Sender<RuntimeLoopMessage>,
23    /// Broadcast sender used to create lifecycle event subscriptions.
24    event_sender: broadcast::Sender<String>,
25    /// Runtime control plane lifecycle state.
26    control_plane: RuntimeControlPlane,
27    /// Shared typed observability pipeline.
28    observability: Arc<Mutex<ObservabilityPipeline>>,
29    /// Optional dashboard IPC runtime guard.
30    dashboard_runtime: Option<Arc<DashboardIpcRuntimeGuard>>,
31}
32
33impl SupervisorHandle {
34    /// Creates a runtime handle from channel senders.
35    ///
36    /// # Arguments
37    ///
38    /// - `command_sender`: Sender used to submit runtime commands.
39    /// - `event_sender`: Sender used to subscribe to lifecycle events.
40    ///
41    /// # Returns
42    ///
43    /// Returns a [`SupervisorHandle`].
44    pub fn new(
45        command_sender: mpsc::Sender<RuntimeLoopMessage>,
46        event_sender: broadcast::Sender<String>,
47        control_plane: RuntimeControlPlane,
48    ) -> Self {
49        Self::new_with_observability(
50            command_sender,
51            event_sender,
52            control_plane,
53            Arc::new(Mutex::new(ObservabilityPipeline::new(16, 16))),
54        )
55    }
56
57    /// Creates a runtime handle with a shared observability pipeline.
58    ///
59    /// # Arguments
60    ///
61    /// - `command_sender`: Sender used to submit runtime commands.
62    /// - `event_sender`: Sender used to subscribe to lifecycle event text.
63    /// - `control_plane`: Runtime control plane lifecycle state.
64    /// - `observability`: Shared typed observability pipeline.
65    ///
66    /// # Returns
67    ///
68    /// Returns a [`SupervisorHandle`].
69    pub(crate) fn new_with_observability(
70        command_sender: mpsc::Sender<RuntimeLoopMessage>,
71        event_sender: broadcast::Sender<String>,
72        control_plane: RuntimeControlPlane,
73        observability: Arc<Mutex<ObservabilityPipeline>>,
74    ) -> Self {
75        Self {
76            command_sender,
77            event_sender,
78            control_plane,
79            observability,
80            dashboard_runtime: None,
81        }
82    }
83
84    /// Attaches a dashboard IPC runtime guard to this handle.
85    ///
86    /// # Arguments
87    ///
88    /// - `dashboard_runtime`: Guard that owns dashboard IPC runtime tasks.
89    ///
90    /// # Returns
91    ///
92    /// Returns this handle with dashboard runtime lifecycle attached.
93    pub(crate) fn with_dashboard_runtime(
94        mut self,
95        dashboard_runtime: Arc<DashboardIpcRuntimeGuard>,
96    ) -> Self {
97        self.dashboard_runtime = Some(dashboard_runtime);
98        self
99    }
100
101    /// Adds a child manifest under a supervisor path.
102    ///
103    /// # Arguments
104    ///
105    /// - `target`: Supervisor path that should receive the child.
106    /// - `child_manifest`: Child manifest text supplied by the caller.
107    /// - `requested_by`: Caller that requested the command.
108    /// - `reason`: Human-readable command reason.
109    ///
110    /// # Child Manifest Example
111    ///
112    /// The runtime expects a YAML child declaration. The smallest useful
113    /// manifest names the child and selects a task kind:
114    ///
115    /// ```yaml
116    /// name: worker
117    /// kind: async_worker
118    /// ```
119    ///
120    /// Optional fields can be added when the child needs dependencies,
121    /// lifecycle policy, resource limits, command permissions, environment
122    /// variables, or secret references:
123    ///
124    /// ```yaml
125    /// name: worker
126    /// kind: async_worker
127    /// criticality: optional
128    /// restart_policy: transient
129    /// dependencies:
130    ///   - cache
131    /// health_check:
132    ///   check_interval_secs: 10
133    ///   timeout_secs: 5
134    ///   max_retries: 3
135    /// readiness:
136    ///   check_interval_secs: 5
137    ///   timeout_secs: 3
138    /// resource_limits:
139    ///   max_memory_mb: 256
140    ///   max_cpu_percent: 80
141    ///   max_file_descriptors: 1024
142    /// command_permissions:
143    ///   allow_shutdown: false
144    ///   allow_restart: true
145    ///   allowed_signals:
146    ///     - SIGTERM
147    /// environment:
148    ///   - name: WORKER_MODE
149    ///     value: queue
150    ///   - name: API_TOKEN
151    ///     secret_ref: ${API_TOKEN}
152    /// secrets:
153    ///   - name: API_TOKEN
154    ///     key: workers/api_token
155    ///     required: true
156    /// ```
157    ///
158    /// # Example
159    ///
160    /// ```no_run
161    /// # async fn add_child_example() -> Result<(), rust_supervisor::error::types::SupervisorError> {
162    /// use rust_supervisor::control::command::CommandResult;
163    /// use rust_supervisor::id::types::SupervisorPath;
164    /// use rust_supervisor::runtime::supervisor::Supervisor;
165    /// use rust_supervisor::spec::supervisor::SupervisorSpec;
166    ///
167    /// let handle = Supervisor::start(SupervisorSpec::root(Vec::new())).await?;
168    /// let result = handle
169    ///     .add_child(
170    ///         SupervisorPath::root(),
171    ///         "name: worker\nkind: async_worker\n",
172    ///         "operator",
173    ///         "attach worker during runtime update",
174    ///     )
175    ///     .await?;
176    ///
177    /// assert!(matches!(result, CommandResult::ChildAdded { .. }));
178    /// # Ok(())
179    /// # }
180    /// ```
181    ///
182    /// # Returns
183    ///
184    /// Returns a [`CommandResult`] after the runtime accepts the command.
185    pub async fn add_child(
186        &self,
187        target: SupervisorPath,
188        child_manifest: impl Into<String>,
189        requested_by: impl Into<String>,
190        reason: impl Into<String>,
191    ) -> Result<CommandResult, SupervisorError> {
192        self.send(ControlCommand::AddChild {
193            meta: CommandMeta::new(requested_by, reason),
194            target,
195            child_manifest: child_manifest.into(),
196        })
197        .await
198    }
199
200    /// Removes a child from runtime governance.
201    ///
202    /// # Arguments
203    ///
204    /// - `child_id`: Target child identifier.
205    /// - `requested_by`: Caller that requested the command.
206    /// - `reason`: Human-readable command reason.
207    ///
208    /// # Returns
209    ///
210    /// Returns a [`CommandResult`] after removal or idempotent reuse.
211    pub async fn remove_child(
212        &self,
213        child_id: ChildId,
214        requested_by: impl Into<String>,
215        reason: impl Into<String>,
216    ) -> Result<CommandResult, SupervisorError> {
217        self.child_command(child_id, requested_by, reason, |meta, child_id| {
218            ControlCommand::RemoveChild { meta, child_id }
219        })
220        .await
221    }
222
223    /// Restarts a child explicitly.
224    ///
225    /// # Arguments
226    ///
227    /// - `child_id`: Target child identifier.
228    /// - `requested_by`: Caller that requested the command.
229    /// - `reason`: Human-readable command reason.
230    ///
231    /// # Returns
232    ///
233    /// Returns a [`CommandResult`] after restart dispatch.
234    pub async fn restart_child(
235        &self,
236        child_id: ChildId,
237        requested_by: impl Into<String>,
238        reason: impl Into<String>,
239    ) -> Result<CommandResult, SupervisorError> {
240        self.child_command(child_id, requested_by, reason, |meta, child_id| {
241            ControlCommand::RestartChild { meta, child_id }
242        })
243        .await
244    }
245
246    /// Pauses a child idempotently.
247    ///
248    /// # Arguments
249    ///
250    /// - `child_id`: Target child identifier.
251    /// - `requested_by`: Caller that requested the command.
252    /// - `reason`: Human-readable command reason.
253    ///
254    /// # Returns
255    ///
256    /// Returns the current child state after the command.
257    pub async fn pause_child(
258        &self,
259        child_id: ChildId,
260        requested_by: impl Into<String>,
261        reason: impl Into<String>,
262    ) -> Result<CommandResult, SupervisorError> {
263        self.child_command(child_id, requested_by, reason, |meta, child_id| {
264            ControlCommand::PauseChild { meta, child_id }
265        })
266        .await
267    }
268
269    /// Resumes a child idempotently.
270    ///
271    /// # Arguments
272    ///
273    /// - `child_id`: Target child identifier.
274    /// - `requested_by`: Caller that requested the command.
275    /// - `reason`: Human-readable command reason.
276    ///
277    /// # Returns
278    ///
279    /// Returns the current child state after the command.
280    pub async fn resume_child(
281        &self,
282        child_id: ChildId,
283        requested_by: impl Into<String>,
284        reason: impl Into<String>,
285    ) -> Result<CommandResult, SupervisorError> {
286        self.child_command(child_id, requested_by, reason, |meta, child_id| {
287            ControlCommand::ResumeChild { meta, child_id }
288        })
289        .await
290    }
291
292    /// Quarantines a child idempotently.
293    ///
294    /// # Arguments
295    ///
296    /// - `child_id`: Target child identifier.
297    /// - `requested_by`: Caller that requested the command.
298    /// - `reason`: Human-readable command reason.
299    ///
300    /// # Returns
301    ///
302    /// Returns the current child state after the command.
303    pub async fn quarantine_child(
304        &self,
305        child_id: ChildId,
306        requested_by: impl Into<String>,
307        reason: impl Into<String>,
308    ) -> Result<CommandResult, SupervisorError> {
309        self.child_command(child_id, requested_by, reason, |meta, child_id| {
310            ControlCommand::QuarantineChild { meta, child_id }
311        })
312        .await
313    }
314
315    /// Shuts down the supervisor tree idempotently.
316    ///
317    /// # Arguments
318    ///
319    /// - `requested_by`: Caller that requested shutdown.
320    /// - `reason`: Human-readable shutdown reason.
321    ///
322    /// # Returns
323    ///
324    /// Returns the current shutdown result.
325    pub async fn shutdown_tree(
326        &self,
327        requested_by: impl Into<String>,
328        reason: impl Into<String>,
329    ) -> Result<CommandResult, SupervisorError> {
330        self.send(ControlCommand::ShutdownTree {
331            meta: CommandMeta::new(requested_by, reason),
332        })
333        .await
334    }
335
336    /// Queries the current runtime state.
337    ///
338    /// # Arguments
339    ///
340    /// This function has no arguments.
341    ///
342    /// # Returns
343    ///
344    /// Returns a [`CommandResult::CurrentState`] value.
345    pub async fn current_state(&self) -> Result<CommandResult, SupervisorError> {
346        self.send(ControlCommand::CurrentState {
347            meta: CommandMeta::new("system", "current_state"),
348        })
349        .await
350    }
351
352    /// Reports whether the runtime control loop is alive.
353    ///
354    /// # Arguments
355    ///
356    /// This function has no arguments.
357    ///
358    /// # Returns
359    ///
360    /// Returns `true` when ordinary control commands may still be accepted.
361    pub fn is_alive(&self) -> bool {
362        self.control_plane.is_alive()
363    }
364
365    /// Returns a runtime control plane health report.
366    ///
367    /// # Arguments
368    ///
369    /// This function has no arguments.
370    ///
371    /// # Returns
372    ///
373    /// Returns a [`RuntimeHealthReport`] value for the current observation.
374    pub fn health(&self) -> RuntimeHealthReport {
375        self.control_plane.health()
376    }
377
378    /// Waits until the runtime control plane reaches a final state.
379    ///
380    /// # Arguments
381    ///
382    /// This function has no arguments.
383    ///
384    /// # Returns
385    ///
386    /// Returns the cached [`RuntimeExitReport`].
387    pub async fn join(&self) -> Result<RuntimeExitReport, SupervisorError> {
388        let report = self.control_plane.join().await;
389        let _ignored = self.event_sender.send(format!(
390            "runtime_control_loop_join_completed:{}:{}:{}",
391            report.state.as_str(),
392            report.phase,
393            report.reason
394        ));
395        Ok(report)
396    }
397
398    /// Requests shutdown for the runtime control plane itself.
399    ///
400    /// # Arguments
401    ///
402    /// - `requested_by`: Caller that requested shutdown.
403    /// - `reason`: Human-readable shutdown reason.
404    ///
405    /// # Returns
406    ///
407    /// Returns the final [`RuntimeExitReport`].
408    pub async fn shutdown(
409        &self,
410        requested_by: impl Into<String>,
411        reason: impl Into<String>,
412    ) -> Result<RuntimeExitReport, SupervisorError> {
413        let meta = CommandMeta::new(requested_by, reason);
414        if let Some(report) = self
415            .control_plane
416            .mark_shutdown_requested(meta.requested_by.clone(), meta.reason.clone())?
417        {
418            return Ok(report);
419        }
420
421        let (reply_sender, reply_receiver) = oneshot::channel();
422        if self
423            .command_sender
424            .send(RuntimeLoopMessage::ControlPlane(
425                ControlPlaneMessage::Shutdown { meta, reply_sender },
426            ))
427            .await
428            .is_err()
429        {
430            return Err(self
431                .closed_runtime_error_after_join("runtime control loop is closed")
432                .await);
433        }
434        match reply_receiver.await {
435            Ok(result) => {
436                result?;
437            }
438            Err(_) => {
439                return Err(self
440                    .closed_runtime_error_after_join("runtime control loop dropped shutdown reply")
441                    .await);
442            }
443        }
444        self.join().await
445    }
446
447    /// Subscribes to runtime event text emitted by the control loop.
448    ///
449    /// # Arguments
450    ///
451    /// This function has no arguments.
452    ///
453    /// # Returns
454    ///
455    /// Returns a broadcast receiver for event text.
456    pub fn subscribe_events(&self) -> broadcast::Receiver<String> {
457        let receiver = self.event_sender.subscribe();
458        if self.control_plane.is_alive() {
459            let _ignored = self
460                .event_sender
461                .send("runtime_control_loop_started:startup".to_owned());
462        }
463        receiver
464    }
465
466    /// Returns a copy of the typed observability test recorder.
467    ///
468    /// # Arguments
469    ///
470    /// This function has no arguments.
471    ///
472    /// # Returns
473    ///
474    /// Returns the currently retained [`TestRecorder`] contents.
475    pub fn observability_recorder(&self) -> TestRecorder {
476        self.observability
477            .lock()
478            .map(|pipeline| pipeline.test_recorder.clone())
479            .unwrap_or_default()
480    }
481
482    /// Hidden integration-test hook that feeds a synthetic [`ChildRunReport`] through the mailbox.
483    ///
484    /// Production callers must not rely on this hook.
485    #[doc(hidden)]
486    pub async fn generation_fencing_replay_child_exit_for_test(
487        &self,
488        report: ChildRunReport,
489    ) -> Result<(), SupervisorError> {
490        if let Some(report_final) = self.control_plane.final_report() {
491            return Err(runtime_exit_error(&report_final));
492        }
493        if !self.control_plane.is_alive() {
494            return Err(SupervisorError::InvalidTransition {
495                message: format!(
496                    "runtime control loop is not alive: state={}",
497                    self.control_plane.health().state.as_str()
498                ),
499            });
500        }
501        self.command_sender
502            .send(RuntimeLoopMessage::ControlPlane(
503                ControlPlaneMessage::ReplayChildExitForTest {
504                    report: Box::new(report),
505                },
506            ))
507            .await
508            .map_err(|_| SupervisorError::InvalidTransition {
509                message: "runtime control loop is closed".to_owned(),
510            })
511    }
512
513    /// Sends one control command and waits for the result.
514    ///
515    /// # Arguments
516    ///
517    /// - `command`: Command that should be executed by the runtime loop.
518    ///
519    /// # Returns
520    ///
521    /// Returns a command result or a supervisor error when the runtime is gone.
522    async fn send(&self, command: ControlCommand) -> Result<CommandResult, SupervisorError> {
523        if let Some(report) = self.control_plane.final_report() {
524            return Err(runtime_exit_error(&report));
525        }
526        if !self.control_plane.is_alive() {
527            return Err(SupervisorError::InvalidTransition {
528                message: format!(
529                    "runtime control loop is not alive: state={}",
530                    self.control_plane.health().state.as_str()
531                ),
532            });
533        }
534        command.validate_audit_metadata()?;
535        let (reply_sender, reply_receiver) = oneshot::channel();
536        if self
537            .command_sender
538            .send(RuntimeLoopMessage::Control {
539                command,
540                reply_sender,
541            })
542            .await
543            .is_err()
544        {
545            return Err(self
546                .closed_runtime_error_after_join("runtime control loop is closed")
547                .await);
548        }
549        match reply_receiver.await {
550            Ok(result) => result,
551            Err(_) => Err(self
552                .closed_runtime_error_after_join("runtime control loop dropped command reply")
553                .await),
554        }
555    }
556
557    /// Builds and sends a child-targeted command.
558    ///
559    /// # Arguments
560    ///
561    /// - `child_id`: Target child identifier.
562    /// - `requested_by`: Caller that requested the command.
563    /// - `reason`: Human-readable command reason.
564    /// - `builder`: Command builder for the child operation.
565    ///
566    /// # Returns
567    ///
568    /// Returns a command result from the runtime loop.
569    async fn child_command<F>(
570        &self,
571        child_id: ChildId,
572        requested_by: impl Into<String>,
573        reason: impl Into<String>,
574        builder: F,
575    ) -> Result<CommandResult, SupervisorError>
576    where
577        F: FnOnce(CommandMeta, ChildId) -> ControlCommand,
578    {
579        let meta = CommandMeta::new(requested_by, reason);
580        self.send(builder(meta, child_id)).await
581    }
582
583    /// Builds an error after waiting for the runtime exit report when possible.
584    async fn closed_runtime_error_after_join(&self, fallback: &str) -> SupervisorError {
585        if let Some(report) = self.control_plane.final_report() {
586            return runtime_exit_error(&report);
587        }
588        if self.command_sender.is_closed() {
589            let report = self.control_plane.join().await;
590            return runtime_exit_error(&report);
591        }
592        SupervisorError::InvalidTransition {
593            message: fallback.to_owned(),
594        }
595    }
596}
597
598/// Builds a control command error from a runtime exit report.
599fn runtime_exit_error(report: &RuntimeExitReport) -> SupervisorError {
600    SupervisorError::InvalidTransition {
601        message: format!(
602            "runtime control loop already exited: state={}, phase={}, reason={}",
603            report.state.as_str(),
604            report.phase,
605            report.reason
606        ),
607    }
608}