use futures::{Stream, StreamExt};
use std::{
convert::Infallible,
future::{ready, Ready},
task::{Poll, Waker},
};
use crate::{
extract::{
context::{ConfigureContext, FilterContext},
Exclusive, FromContextOnce,
},
handler::{Handler, IntoHandler},
reactor::{http::HttpReactor, root::RootReactor},
};
use std::rc::Rc;
pub struct Launcher {
context: ConfigureContext,
}
#[derive(thiserror::Error, Debug)]
#[error("Launch Error")]
pub struct LaunchError {}
impl Launcher {
pub(crate) fn new(context: ConfigureContext) -> Self {
Self { context }
}
pub async fn launch<F, I>(self, filter: F) -> Result<(), LaunchError>
where
F: IntoHandler<FilterContext, I, Output = ()>,
{
let configure_context = Rc::new(self.context);
let filter = filter.into_handler();
let contexts = ContextCreateStream::new(configure_context.root_reactor.clone());
contexts
.map(|http_reactor| (&filter, &configure_context, http_reactor))
.for_each_concurrent(
None,
|(filter, configure_context, http_reactor)| async move {
let context =
FilterContext::new(configure_context.clone(), http_reactor.clone());
match filter.call(context).await {
Ok(()) => {}
Err(extraction_error) => {
log::error!("Extraction problem in filter: {extraction_error}");
}
}
},
)
.await;
Ok(())
}
}
impl FromContextOnce<ConfigureContext> for Launcher {
type Error = Infallible;
type Future<'c> = Ready<Result<Self, Self::Error>>;
fn from_context_once(context: Exclusive<ConfigureContext>) -> Self::Future<'_> {
let context = ConfigureContext {
host: context.host.clone(),
clock: context.clock.clone(),
grpc_host: context.grpc_host.clone(),
shared_data: context.shared_data.clone(),
#[cfg(feature = "experimental_metrics")]
metrics: context.metrics.clone(),
root_reactor: context.root_reactor.clone(),
unique_extractions: Default::default(),
};
ready(Ok(Launcher::new(context)))
}
}
struct ContextCreateStream {
reactor: Rc<RootReactor>,
waker: Option<Waker>,
}
impl ContextCreateStream {
fn new(reactor: Rc<RootReactor>) -> Self {
Self {
reactor,
waker: None,
}
}
}
impl Unpin for ContextCreateStream {}
impl Stream for ContextCreateStream {
type Item = Rc<HttpReactor>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.reactor.done() {
self.reactor.take_create_waker();
return Poll::Ready(None);
}
if let Some(http_reactor) = self.reactor.take_new_http_reactor() {
return Poll::Ready(Some(http_reactor));
}
match &self.waker {
None => {
self.reactor.insert_create_waker(cx.waker().clone());
self.waker = Some(cx.waker().clone());
}
Some(waker) if !waker.will_wake(cx.waker()) => {
self.reactor.insert_create_waker(waker.clone());
self.waker = Some(cx.waker().clone());
}
Some(_) => {}
}
Poll::Pending
}
}