1use futures::{Stream, StreamExt};
8use std::{
9 convert::Infallible,
10 future::{ready, Ready},
11 task::{Poll, Waker},
12};
13
14use crate::{
15 extract::{
16 context::{ConfigureContext, FilterContext},
17 Exclusive, FromContextOnce,
18 },
19 handler::{Handler, IntoHandler},
20 reactor::{http::HttpReactor, root::RootReactor},
21};
22use std::rc::Rc;
23
24pub struct Launcher {
27 context: ConfigureContext,
28}
29
30#[derive(thiserror::Error, Debug)]
32#[error("Launch Error")]
33pub struct LaunchError {}
34
35impl Launcher {
36 pub(crate) fn new(context: ConfigureContext) -> Self {
37 Self { context }
38 }
39
40 pub async fn launch<F, I>(self, filter: F) -> Result<(), LaunchError>
52 where
53 F: IntoHandler<FilterContext, I, Output = ()>,
54 {
55 let configure_context = Rc::new(self.context);
56 let filter = filter.into_handler();
57 let contexts = ContextCreateStream::new(configure_context.root_reactor.clone());
58
59 contexts
60 .map(|http_reactor| (&filter, &configure_context, http_reactor))
61 .for_each_concurrent(
62 None,
63 |(filter, configure_context, http_reactor)| async move {
64 let context =
65 FilterContext::new(configure_context.clone(), http_reactor.clone());
66
67 match filter.call(context).await {
68 Ok(()) => {}
69 Err(extraction_error) => {
70 log::error!("Extraction problem in filter: {extraction_error}");
71 }
72 }
73 },
74 )
75 .await;
76
77 Ok(())
78 }
79}
80
81impl FromContextOnce<ConfigureContext> for Launcher {
82 type Error = Infallible;
83
84 type Future<'c> = Ready<Result<Self, Self::Error>>;
85
86 fn from_context_once(context: Exclusive<ConfigureContext>) -> Self::Future<'_> {
87 let context = ConfigureContext {
89 host: context.host.clone(),
90 clock: context.clock.clone(),
91 grpc_host: context.grpc_host.clone(),
92 shared_data: context.shared_data.clone(),
93 #[cfg(feature = "experimental_metrics")]
94 metrics: context.metrics.clone(),
95 root_reactor: context.root_reactor.clone(),
96 unique_extractions: Default::default(),
97 };
98 ready(Ok(Launcher::new(context)))
99 }
100}
101
102struct ContextCreateStream {
103 reactor: Rc<RootReactor>,
104 waker: Option<Waker>,
105}
106
107impl ContextCreateStream {
108 fn new(reactor: Rc<RootReactor>) -> Self {
109 Self {
110 reactor,
111 waker: None,
112 }
113 }
114}
115
116impl Unpin for ContextCreateStream {}
117
118impl Stream for ContextCreateStream {
119 type Item = Rc<HttpReactor>;
120
121 fn poll_next(
122 mut self: std::pin::Pin<&mut Self>,
123 cx: &mut std::task::Context<'_>,
124 ) -> std::task::Poll<Option<Self::Item>> {
125 if self.reactor.done() {
126 self.reactor.take_create_waker();
127 return Poll::Ready(None);
128 }
129
130 if let Some(http_reactor) = self.reactor.take_new_http_reactor() {
131 return Poll::Ready(Some(http_reactor));
132 }
133
134 match &self.waker {
135 None => {
136 self.reactor.insert_create_waker(cx.waker().clone());
138 self.waker = Some(cx.waker().clone());
139 }
140 Some(waker) if !waker.will_wake(cx.waker()) => {
141 self.reactor.insert_create_waker(waker.clone());
142 self.waker = Some(cx.waker().clone());
143 }
144 Some(_) => {}
145 }
146 Poll::Pending
147 }
148}