use crate::utils::feature_use;
use crate::BootstrapResult;
use futures_util::future::BoxFuture;
use futures_util::stream::{FuturesUnordered, Stream};
use futures_util::FutureExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
feature_use!(cfg(feature = "telemetry-server"), {
use super::server::TelemetryServerFuture;
use anyhow::anyhow;
use hyper::Server;
use std::net::SocketAddr;
});
pub struct TelemetryDriver {
#[cfg(feature = "telemetry-server")]
server_addr: Option<SocketAddr>,
#[cfg(feature = "telemetry-server")]
server_fut: Option<TelemetryServerFuture>,
tele_futures: FuturesUnordered<BoxFuture<'static, BootstrapResult<()>>>,
}
impl TelemetryDriver {
pub(super) fn new(
#[cfg(feature = "telemetry-server")] server_fut: Option<TelemetryServerFuture>,
tele_futures: FuturesUnordered<BoxFuture<'static, BootstrapResult<()>>>,
) -> Self {
Self {
#[cfg(feature = "telemetry-server")]
server_addr: server_fut.as_ref().map(Server::local_addr),
#[cfg(feature = "telemetry-server")]
server_fut,
tele_futures,
}
}
#[cfg(feature = "telemetry-server")]
pub fn server_addr(&self) -> Option<SocketAddr> {
self.server_addr
}
pub fn with_graceful_shutdown(
&mut self,
signal: impl Future<Output = ()> + Send + Sync + 'static,
) {
#[cfg(feature = "telemetry-server")]
{
if let Some(server_fut) = self.server_fut.take() {
self.tele_futures.push(
async move { Ok(server_fut.with_graceful_shutdown(signal).await?) }.boxed(),
);
return;
}
}
self.tele_futures.push(
async move {
signal.await;
Ok(())
}
.boxed(),
);
}
}
impl Future for TelemetryDriver {
type Output = BootstrapResult<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready_res = vec![];
#[cfg(feature = "telemetry-server")]
if let Some(server_fut) = &mut self.server_fut {
if let Poll::Ready(res) = Pin::new(server_fut).poll(cx) {
ready_res.push(res.map_err(|err| anyhow!(err)));
}
}
let tele_futures_poll = Pin::new(&mut self.tele_futures).poll_next(cx);
if let Poll::Ready(Some(res)) = tele_futures_poll {
ready_res.push(res);
}
if ready_res.is_empty() {
Poll::Pending
} else {
Poll::Ready(ready_res.into_iter().collect())
}
}
}