use std::{
io,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Stream, StreamExt};
use tokio::{net::UnixStream, signal::unix::SignalKind};
use tokio_stream::wrappers::{SignalStream, UnixListenerStream};
use crate::tokio_ecdysis::StoppableStream;
pub(crate) enum Trigger {
Signal(SignalKind, SignalStream),
Uds(PathBuf, StoppableStream<UnixListenerStream>),
}
#[derive(Debug)]
pub(crate) enum TriggerReason {
Signal(SignalKind),
UnixStream(PathBuf, UnixStream),
}
impl Trigger {
pub(crate) async fn cleanup(self) -> Result<(), String> {
match self {
Trigger::Signal(_kind, _stream) => Ok(()),
Trigger::Uds(path, _listener_stream) => {
log::debug!("Removing {path:?}");
tokio::fs::remove_file(path.clone())
.await
.map_err(|e| e.to_string())
}
}
}
}
impl Stream for Trigger {
type Item = io::Result<TriggerReason>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.get_mut() {
Self::Signal(kind, signal_stream) => match ready!(signal_stream.poll_next_unpin(cx)) {
None => unreachable!(), Some(()) => Poll::Ready(Some(Ok(TriggerReason::Signal(*kind)))),
},
Self::Uds(path, listener_stream) => match ready!(listener_stream.poll_next_unpin(cx)) {
None => Poll::Ready(Some(Err(io::Error::other(
"Socket shut down unexpectedly.",
)))),
Some(Err(e)) => Poll::Ready(Some(Err(io::Error::other(format!(
"Unexpected error accepting connection on socket: {e}"
))))),
Some(Ok(stream)) => {
Poll::Ready(Some(Ok(TriggerReason::UnixStream(path.clone(), stream))))
}
},
}
}
}