use super::{SUPERVISOR, Supervisor};
use crate::Result;
use crate::ipc::server::IpcServer;
use crate::ipc::{IpcRequest, IpcResponse};
impl Supervisor {
pub(crate) async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
loop {
let (msg, send) = match ipc.read().await {
Ok(msg) => msg,
Err(e) => {
error!("failed to accept connection: {e:?}");
continue;
}
};
debug!("received message: {msg:?}");
tokio::spawn(async move {
let rsp = SUPERVISOR
.handle_ipc(msg)
.await
.unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
if let Err(err) = send.send(rsp).await {
debug!("failed to send message: {err:?}");
}
});
}
}
pub(crate) async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
let rsp = match req {
IpcRequest::Invalid { error } => {
warn!("Invalid IPC request: {error}");
return Ok(IpcResponse::Error(format!("Invalid request: {error}")));
}
IpcRequest::Connect => {
debug!("received connect message");
IpcResponse::Ok
}
IpcRequest::Stop { id } => {
self.stop(&id).await?
}
IpcRequest::Run(opts) => {
self.run(opts).await?
}
IpcRequest::Enable { id } => {
if self.enable(&id).await? {
IpcResponse::Yes
} else {
IpcResponse::No
}
}
IpcRequest::Disable { id } => {
if self.disable(&id).await? {
IpcResponse::Yes
} else {
IpcResponse::No
}
}
IpcRequest::GetActiveDaemons => {
let daemons = self.active_daemons().await;
IpcResponse::ActiveDaemons(daemons)
}
IpcRequest::GetNotifications => {
let notifications = self.get_notifications().await;
IpcResponse::Notifications(notifications)
}
IpcRequest::UpdateShellDir { shell_pid, dir } => {
let prev = self.get_shell_dir(shell_pid).await;
self.set_shell_dir(shell_pid, dir.clone()).await?;
self.cancel_pending_autostops_for_dir(&dir).await;
if let Some(prev) = prev {
self.leave_dir(&prev).await?;
}
self.refresh().await?;
IpcResponse::Ok
}
IpcRequest::Clean => {
self.clean().await?;
IpcResponse::Ok
}
IpcRequest::GetDisabledDaemons => {
let disabled = self.state_file.lock().await.disabled.clone();
IpcResponse::DisabledDaemons(disabled.into_iter().collect())
}
};
Ok(rsp)
}
}