use super::{SUPERVISOR, Supervisor};
use crate::Result;
use crate::ipc::server::IpcServer;
use crate::ipc::{IpcRequest, IpcResponse};
use miette::IntoDiagnostic;
const VERSION: &str = env!("CARGO_PKG_VERSION");
impl Supervisor {
pub(crate) async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
loop {
let (msg, send) = match ipc.read().await {
Ok(msg) => msg,
Err(e) => {
error!("failed to accept connection: {e:?}");
continue;
}
};
debug!("received message: {msg:?}");
tokio::spawn(async move {
let rsp = SUPERVISOR
.handle_ipc(msg)
.await
.unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
if let Err(err) = send.send(rsp).await {
debug!("failed to send message: {err:?}");
}
});
}
}
pub(crate) async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
let rsp = match req {
IpcRequest::Invalid { error } => {
warn!("Invalid IPC request: {error}");
return Ok(IpcResponse::Error(format!("Invalid request: {error}")));
}
IpcRequest::Connect => {
debug!("received connect message (legacy, no version info)");
IpcResponse::Ok
}
IpcRequest::ConnectV2 {
version: client_version,
} => {
debug!("received connect message (client version: {client_version})");
if client_version != VERSION {
warn!(
"Client version {client_version} differs from supervisor version {VERSION}. \
Restart the supervisor with: pitchfork supervisor start --force"
);
}
IpcResponse::ConnectOk {
version: VERSION.to_string(),
}
}
IpcRequest::Stop { id } => {
self.stop(&id).await?
}
IpcRequest::Run(opts) => {
self.run(opts).await?
}
IpcRequest::Enable { id } => {
if self.enable(&id).await? {
IpcResponse::Yes
} else {
IpcResponse::No
}
}
IpcRequest::Disable { id } => {
if self.disable(&id).await? {
IpcResponse::Yes
} else {
IpcResponse::No
}
}
IpcRequest::GetActiveDaemons => {
let daemons = self.active_daemons().await;
IpcResponse::ActiveDaemons(daemons)
}
IpcRequest::GetNotifications => {
let notifications = self.get_notifications().await;
IpcResponse::Notifications(notifications)
}
IpcRequest::UpdateShellDir { shell_pid, dir } => {
let prev = self.get_shell_dir(shell_pid).await;
self.set_shell_dir(shell_pid, dir.clone()).await?;
self.cancel_pending_autostops_for_dir(&dir).await;
if let Some(prev) = prev {
self.leave_dir(&prev).await?;
}
self.refresh().await?;
IpcResponse::Ok
}
IpcRequest::Clean => {
self.clean().await?;
IpcResponse::Ok
}
IpcRequest::GetDisabledDaemons => {
let disabled = self.state_file.lock().await.disabled.clone();
IpcResponse::DisabledDaemons(disabled.into_iter().collect())
}
IpcRequest::SyncMdns => {
self.sync_mdns().await;
IpcResponse::MdnsSynced
}
IpcRequest::ReloadConfig => {
tokio::task::spawn_blocking(|| {
crate::settings::reload_settings();
crate::logger::apply_settings();
})
.await
.into_diagnostic()?;
IpcResponse::ConfigReloaded
}
};
self.flush_state().await;
Ok(rsp)
}
}