use crate::BootstrapResult;
#[cfg(feature = "telemetry-server")]
use crate::addr::ListenAddr;
use crate::utils::feature_use;
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, Stream};
#[cfg(feature = "logging")]
use slog_async::AsyncGuard;
use std::future::Future;
#[cfg(feature = "logging")]
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
feature_use!(cfg(feature = "telemetry-server"), {
use super::server::TelemetryServerFuture;
});
pub struct TelemetryDriver {
#[cfg(feature = "telemetry-server")]
server_addr: Option<ListenAddr>,
#[cfg(feature = "telemetry-server")]
server_fut: Option<TelemetryServerFuture>,
#[cfg(feature = "logging")]
logging_guard: Option<ManuallyDrop<AsyncGuard>>,
tele_futures: FuturesUnordered<BoxFuture<'static, BootstrapResult<()>>>,
}
impl TelemetryDriver {
pub(super) fn new(
#[cfg(feature = "telemetry-server")] server_fut: Option<TelemetryServerFuture>,
tele_futures: FuturesUnordered<BoxFuture<'static, BootstrapResult<()>>>,
) -> Self {
Self {
#[cfg(feature = "telemetry-server")]
server_addr: server_fut.as_ref().and_then(|fut| fut.local_addr().ok()),
#[cfg(feature = "telemetry-server")]
server_fut,
#[cfg(feature = "logging")]
logging_guard: None,
tele_futures,
}
}
#[cfg(feature = "logging")]
pub(super) fn set_logging_guard(&mut self, logging_async_guard: Option<AsyncGuard>) {
self.logging_guard = logging_async_guard.map(ManuallyDrop::new);
}
#[cfg(feature = "telemetry-server")]
pub fn server_addr(&self) -> Option<&ListenAddr> {
self.server_addr.as_ref()
}
pub fn with_graceful_shutdown(
&mut self,
signal: impl Future<Output = ()> + Send + Sync + 'static,
) {
#[cfg(feature = "telemetry-server")]
{
if let Some(server_fut) = self.server_fut.take() {
self.tele_futures.push(Box::pin(async move {
server_fut.with_graceful_shutdown(signal).await;
Ok(())
}));
return;
}
}
self.tele_futures.push(
async move {
signal.await;
Ok(())
}
.boxed(),
);
}
#[cfg(feature = "logging")]
pub fn shutdown_logger(&mut self) {
if let Some(guard) = self.logging_guard.take() {
drop(ManuallyDrop::into_inner(guard))
}
}
}
impl Future for TelemetryDriver {
type Output = BootstrapResult<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg_attr(not(feature = "telemetry-server"), allow(unused_mut))]
let mut server_res = Poll::Ready(Ok(()));
#[cfg(feature = "telemetry-server")]
if let Some(server_fut) = &mut self.server_fut {
let Poll::Pending = Pin::new(server_fut).poll(cx);
server_res = Poll::Pending;
}
loop {
let tele_res = ready!(Pin::new(&mut self.tele_futures).poll_next(cx)?);
if tele_res.is_none() {
return server_res;
}
}
}
}