use crate::control::command::{CommandMeta, CommandResult, ControlCommand};
use crate::error::types::SupervisorError;
use crate::id::types::{ChildId, SupervisorPath};
use crate::runtime::control_loop::RuntimeCommand;
use tokio::sync::{broadcast, mpsc, oneshot};
#[derive(Debug, Clone)]
pub struct SupervisorHandle {
command_sender: mpsc::Sender<RuntimeCommand>,
event_sender: broadcast::Sender<String>,
}
impl SupervisorHandle {
pub fn new(
command_sender: mpsc::Sender<RuntimeCommand>,
event_sender: broadcast::Sender<String>,
) -> Self {
Self {
command_sender,
event_sender,
}
}
pub async fn add_child(
&self,
target: SupervisorPath,
child_manifest: impl Into<String>,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.send(ControlCommand::AddChild {
meta: CommandMeta::new(requested_by, reason),
target,
child_manifest: child_manifest.into(),
})
.await
}
pub async fn remove_child(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.child_command(child_id, requested_by, reason, |meta, child_id| {
ControlCommand::RemoveChild { meta, child_id }
})
.await
}
pub async fn restart_child(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.child_command(child_id, requested_by, reason, |meta, child_id| {
ControlCommand::RestartChild { meta, child_id }
})
.await
}
pub async fn pause_child(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.child_command(child_id, requested_by, reason, |meta, child_id| {
ControlCommand::PauseChild { meta, child_id }
})
.await
}
pub async fn resume_child(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.child_command(child_id, requested_by, reason, |meta, child_id| {
ControlCommand::ResumeChild { meta, child_id }
})
.await
}
pub async fn quarantine_child(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.child_command(child_id, requested_by, reason, |meta, child_id| {
ControlCommand::QuarantineChild { meta, child_id }
})
.await
}
pub async fn shutdown_tree(
&self,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<CommandResult, SupervisorError> {
self.send(ControlCommand::ShutdownTree {
meta: CommandMeta::new(requested_by, reason),
})
.await
}
pub async fn current_state(&self) -> Result<CommandResult, SupervisorError> {
self.send(ControlCommand::CurrentState {
meta: CommandMeta::new("system", "current_state"),
})
.await
}
pub fn subscribe_events(&self) -> broadcast::Receiver<String> {
self.event_sender.subscribe()
}
async fn send(&self, command: ControlCommand) -> Result<CommandResult, SupervisorError> {
let (reply_sender, reply_receiver) = oneshot::channel();
self.command_sender
.send(RuntimeCommand::Control {
command,
reply_sender,
})
.await
.map_err(|_| SupervisorError::InvalidTransition {
message: "runtime control loop is closed".to_owned(),
})?;
reply_receiver
.await
.map_err(|_| SupervisorError::InvalidTransition {
message: "runtime control loop dropped command reply".to_owned(),
})?
}
async fn child_command<F>(
&self,
child_id: ChildId,
requested_by: impl Into<String>,
reason: impl Into<String>,
builder: F,
) -> Result<CommandResult, SupervisorError>
where
F: FnOnce(CommandMeta, ChildId) -> ControlCommand,
{
let meta = CommandMeta::new(requested_by, reason);
self.send(builder(meta, child_id)).await
}
}