use std::task::Poll;
use libsystemd::daemon::NotifyState;
use pin_project_lite::pin_project;
use tokio::task::{AbortHandle, JoinHandle};
use crate::{Accept, IntoAccept};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Notify<T>(T);
impl<T> Notify<T> {
#[inline]
pub const fn new(inner: T) -> Self {
Self(inner)
}
}
impl<I, S, T> IntoAccept<I, S> for Notify<T>
where
T: IntoAccept<I, S>,
{
type Accept = NotifyAcceptor<T::Accept>;
type Future = NotifyFuture<T::Future>;
fn into_accept(self) -> Self::Future {
fn notify() -> std::io::Result<Option<JoinHandle<()>>> {
if !libsystemd::daemon::booted() {
return Ok(None);
}
libsystemd::daemon::notify(false, &[NotifyState::Ready])
.map_err(|err| std::io::Error::other(err))?;
if let Some(duration) = libsystemd::daemon::watchdog_enabled(false) {
Ok(Some(tokio::task::spawn(async move {
while libsystemd::daemon::notify(false, &[NotifyState::Watchdog]).is_ok() {
tokio::time::sleep(duration).await;
}
})))
} else {
Ok(None)
}
}
NotifyFuture {
inner: match notify() {
Ok(task) => NotifyFutureInner::Future {
future: self.0.into_accept(),
task: task.map(|x| x.abort_handle()),
},
Err(err) => NotifyFutureInner::Err { err: Some(err) },
},
}
}
}
pin_project! {
#[doc(hidden)]
pub struct NotifyFuture<F> {
#[pin] inner: NotifyFutureInner<F>
}
}
impl<F, T> Future for NotifyFuture<F>
where
F: Future<Output = std::io::Result<T>>,
{
type Output = std::io::Result<NotifyAcceptor<T>>;
#[inline]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}
pin_project! {
#[project = NotifyFutureInnerProj]
enum NotifyFutureInner<F> {
Err {
err: Option<std::io::Error>,
},
Future {
#[pin] future: F,
task: Option<AbortHandle>,
},
}
}
impl<F, T> Future for NotifyFutureInner<F>
where
F: Future<Output = std::io::Result<T>>,
{
type Output = std::io::Result<NotifyAcceptor<T>>;
#[inline]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
NotifyFutureInnerProj::Err { err } => Poll::Ready(Err(err.take().unwrap())),
NotifyFutureInnerProj::Future { future, task } => match future.poll(cx) {
Poll::Ready(Ok(inner)) => Poll::Ready(Ok(NotifyAcceptor {
inner,
task: task.take(),
})),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
},
}
}
}
pub struct NotifyAcceptor<T> {
inner: T,
task: Option<AbortHandle>,
}
impl<I, S, T> Accept<I, S> for NotifyAcceptor<T>
where
T: Accept<I, S>,
{
type Stream = T::Stream;
type Service = T::Service;
type Future = T::Future;
#[inline]
fn accept(&self, stream: I, service: S) -> Self::Future {
self.inner.accept(stream, service)
}
}
impl<T> Drop for NotifyAcceptor<T> {
#[inline]
fn drop(&mut self) {
if let Some(task) = self.task.take() {
task.abort();
}
}
}