use crate::path::AbsolutePath;
#[cfg(any(feature = "exec", feature = "compose"))]
use crate::{env::EnvError, process::ProcessError};
use crate::{provider::ProviderError, secrets::SecretError};
use async_trait::async_trait;
use futures::future::BoxFuture;
use indexmap::IndexMap;
use std::process::ExitStatus;
use thiserror::Error;
use tokio::signal::unix::{SignalKind, signal};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
#[derive(Debug, Error)]
pub enum HandlerError {
#[cfg(any(feature = "exec", feature = "compose"))]
#[error(transparent)]
Env(#[from] EnvError),
#[error(transparent)]
Secret(#[from] SecretError),
#[error(transparent)]
Provider(#[from] ProviderError),
#[cfg(any(feature = "exec", feature = "compose"))]
#[error(transparent)]
Process(#[from] ProcessError),
#[error("Process I/O error")]
Io(#[from] std::io::Error),
#[error("Process exited with {0}")]
Exited(ExitStatus),
#[error("Process terminated by signal")]
Signaled,
#[error("Operation interrupted")]
Interrupted,
}
impl HandlerError {
pub fn from_status(status: ExitStatus) -> Result<(), Self> {
if status.success() {
Ok(())
} else {
Err(Self::Exited(status))
}
}
}
#[async_trait]
pub trait EventHandler: Send + Sync {
fn paths(&self) -> Vec<AbsolutePath>;
async fn handle(&mut self, events: Vec<FsEvent>) -> Result<(), HandlerError>;
fn wait(&self) -> BoxFuture<'static, Result<(), HandlerError>> {
Box::pin(async move {
wait_for_signal(false).await;
Ok(())
})
}
async fn cleanup(&mut self) {}
}
pub struct StoppableHandler<H> {
inner: H,
token: CancellationToken,
}
impl<H: EventHandler> StoppableHandler<H> {
pub fn new(inner: H, token: CancellationToken) -> Self {
Self { inner, token }
}
}
#[async_trait]
impl<H: EventHandler> EventHandler for StoppableHandler<H> {
fn paths(&self) -> Vec<crate::path::AbsolutePath> {
self.inner.paths()
}
async fn handle(&mut self, events: Vec<FsEvent>) -> Result<(), HandlerError> {
self.inner.handle(events).await
}
fn wait(&self) -> BoxFuture<'static, Result<(), HandlerError>> {
let token = self.token.clone();
let inner_wait = self.inner.wait();
Box::pin(async move {
tokio::select! {
res = inner_wait => {
res
}
_ = token.cancelled() => {
Ok(())
}
}
})
}
async fn cleanup(&mut self) {
self.inner.cleanup().await
}
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
pub enum FsEvent {
Write(AbsolutePath),
Remove(AbsolutePath),
Move {
from: AbsolutePath,
to: AbsolutePath,
},
}
impl FsEvent {
pub fn affects(&self, check: impl Fn(&AbsolutePath) -> bool) -> bool {
match self {
Self::Write(p) | Self::Remove(p) => check(p),
Self::Move { from, to } => check(from) || check(to),
}
}
}
pub struct FsEventRegistry {
map: IndexMap<AbsolutePath, FsEvent>,
}
impl FsEventRegistry {
pub fn new() -> Self {
Self {
map: IndexMap::new(),
}
}
pub fn register(&mut self, event: FsEvent) {
match event {
FsEvent::Write(ref path) => {
self.update(path.clone(), event);
}
FsEvent::Remove(ref path) => {
self.update(path.clone(), event);
}
FsEvent::Move { ref from, ref to } => {
self.handle_move(from.clone(), to.clone());
}
}
}
fn handle_move(&mut self, from: AbsolutePath, to: AbsolutePath) {
let event = match self.map.get(&from) {
Some(FsEvent::Write(_)) => FsEvent::Write(to.clone()),
Some(FsEvent::Move { from: origin, .. }) => FsEvent::Move {
from: origin.clone(),
to: to.clone(),
},
_ => FsEvent::Move {
from: from.clone(),
to: to.clone(),
},
};
self.map.shift_remove(&from);
self.update(to, event);
}
fn update(&mut self, path: AbsolutePath, new_event: FsEvent) {
if let FsEvent::Remove(_) = &new_event
&& !self.map.contains_key(&path)
{
let is_move_source = self
.map
.values()
.any(|e| matches!(e, FsEvent::Move { from, .. } if from == &path));
if is_move_source {
debug!(
?path,
"FsEventRegistry: ignoring redundant Remove event for moved source"
);
return;
}
}
match (self.map.get(&path), &new_event) {
(Some(FsEvent::Write(_)), FsEvent::Remove(_)) => {
self.map.shift_remove(&path);
}
(Some(FsEvent::Move { .. }), FsEvent::Remove(_)) => {
self.map.insert(path, new_event);
}
(Some(FsEvent::Remove(_)), FsEvent::Write(_)) => {
self.map.insert(path, new_event);
}
_ => {
self.map.insert(path, new_event);
}
}
}
pub fn drain(&mut self) -> impl Iterator<Item = FsEvent> + '_ {
self.map.drain(..).map(|(_, event)| event)
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
}
impl Default for FsEventRegistry {
fn default() -> Self {
Self::new()
}
}
pub async fn wait_for_signal(interactive: bool) {
let mut sigterm = signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
if interactive {
let mut sigint = signal(SignalKind::interrupt()).expect("failed to install SIGINT handler");
let mut sigquit = signal(SignalKind::quit()).expect("failed to install SIGQUIT handler");
loop {
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down...");
return;
}
_ = sigint.recv() => {
debug!("Received SIGINT. Ignored in interactive mode.");
}
_ = sigquit.recv() => {
debug!("Received SIGQUIT. Ignored in interactive mode.");
}
}
}
} else {
let mut sigint = signal(SignalKind::interrupt()).expect("failed to install SIGINT handler");
let mut sigquit = signal(SignalKind::quit()).expect("failed to install SIGQUIT handler");
tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM, shutting down..."),
_ = sigint.recv() => info!("Received SIGINT, shutting down..."),
_ = sigquit.recv() => info!("Received SIGQUIT, shutting down..."),
}
}
}