1use crate::future::{Route, RouteFuture};
4use alloc::boxed::Box;
5
6use crate::job_id_router::JobIdRouter;
7use crate::util::try_downcast;
8use blueprint_core::{IntoJobResult, Job, JobCall, JobId, JobResult};
9
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use bytes::Bytes;
13use core::marker::PhantomData;
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use core::{fmt, iter};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use tower::{BoxError, Layer, Service};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
22pub(crate) struct RouteId(pub u32);
23
24#[must_use]
31pub struct Router<Ctx = ()> {
32 inner: Arc<JobIdRouter<Ctx>>,
33}
34
35impl<Ctx> Clone for Router<Ctx> {
36 fn clone(&self) -> Self {
37 Self {
38 inner: Arc::clone(&self.inner),
39 }
40 }
41}
42
43impl<Ctx> Default for Router<Ctx>
44where
45 Ctx: Clone + Send + Sync + 'static,
46{
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl<Ctx> fmt::Debug for Router<Ctx> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("Router")
55 .field("inner", &self.inner)
56 .finish()
57 }
58}
59
60impl<Ctx> Router<Ctx>
61where
62 Ctx: Clone + Send + Sync + 'static,
63{
64 pub fn new() -> Self {
68 Self {
69 inner: Arc::new(JobIdRouter::default()),
70 }
71 }
72
73 fn into_inner(self) -> JobIdRouter<Ctx> {
74 Arc::try_unwrap(self.inner).unwrap_or_else(|arc| arc.as_ref().clone())
75 }
76
77 #[track_caller]
81 pub fn route<I, J, T>(self, job_id: I, job: J) -> Self
82 where
83 I: Into<JobId>,
84 J: Job<T, Ctx>,
85 T: 'static,
86 {
87 let mut inner = self.into_inner();
88 inner.route(job_id, job);
89 Router {
90 inner: Arc::new(inner),
91 }
92 }
93
94 pub fn route_service<T>(self, job_id: u32, service: T) -> Self
100 where
101 T: Service<JobCall, Error = BoxError> + Clone + Send + Sync + 'static,
102 T::Response: IntoJobResult,
103 T::Future: Send + 'static,
104 {
105 let service = match try_downcast::<Router<Ctx>, _>(service) {
106 Ok(_) => {
107 panic!("Invalid route: `Router::route_service` cannot be used with `Router`s.");
108 }
109 Err(service) => service,
110 };
111
112 let mut inner = self.into_inner();
113 inner.route_service(job_id, service);
114 Router {
115 inner: Arc::new(inner),
116 }
117 }
118
119 #[track_caller]
124 pub fn always<J, T>(self, job: J) -> Self
125 where
126 J: Job<T, Ctx>,
127 T: 'static,
128 {
129 let mut inner = self.into_inner();
130 inner.always(job);
131 Router {
132 inner: Arc::new(inner),
133 }
134 }
135
136 #[track_caller]
146 pub fn fallback<J, T>(self, job: J) -> Self
147 where
148 J: Job<T, Ctx>,
149 T: 'static,
150 {
151 let mut inner = self.into_inner();
152 inner.fallback(job);
153 Router {
154 inner: Arc::new(inner),
155 }
156 }
157
158 pub fn layer<L>(self, layer: L) -> Router<Ctx>
185 where
186 L: Layer<Route> + Clone + Send + Sync + 'static,
187 L::Service: Service<JobCall> + Clone + Send + Sync + 'static,
188 <L::Service as Service<JobCall>>::Response: IntoJobResult + 'static,
189 <L::Service as Service<JobCall>>::Error: Into<BoxError> + 'static,
190 <L::Service as Service<JobCall>>::Future: Send + 'static,
191 {
192 let inner = self.into_inner().layer(layer);
193 Router {
194 inner: Arc::new(inner),
195 }
196 }
197
198 #[must_use]
200 pub fn has_routes(&self) -> bool {
201 self.inner.has_routes()
202 }
203
204 #[doc = include_str!("../docs/with_context.md")]
205 pub fn with_context<Ctx2>(self, context: Ctx) -> Router<Ctx2> {
206 let inner = self.into_inner().with_context(context);
207 Router {
208 inner: Arc::new(inner),
209 }
210 }
211
212 pub(crate) fn call_with_context(
213 &self,
214 call: JobCall,
215 context: Ctx,
216 ) -> Option<FuturesUnordered<RouteFuture<BoxError>>> {
217 blueprint_core::trace!(
218 target: "blueprint-router",
219 job_id = %call.job_id(),
220 metadata = ?call.metadata(),
221 body = ?call.body(),
222 "routing a job call to inner routers"
223 );
224 let (call, context) = match self.inner.call_with_context(call, context) {
225 Ok(matched_call_future) => {
226 blueprint_core::trace!(
227 target: "blueprint-router",
228 matched_calls = matched_call_future.len(),
229 "A route matched this job call"
230 );
231 return Some(matched_call_future);
232 }
233 Err((call, context)) => (call, context),
234 };
235
236 blueprint_core::trace!(
238 target: "blueprint-router",
239 ?call,
240 "No explicit or always route caught this job call, passing to fallback"
241 );
242
243 self.inner
244 .call_fallback(call, context)
245 .map(|future| iter::once(future).collect::<FuturesUnordered<_>>())
246 }
247
248 pub fn as_service<B>(&mut self) -> RouterAsService<'_, B, Ctx> {
297 RouterAsService {
298 router: self,
299 _marker: PhantomData,
300 }
301 }
302}
303
304impl<B> Service<JobCall<B>> for Router<()>
305where
306 B: Into<Bytes>,
307{
308 type Response = Option<Vec<JobResult>>;
309 type Error = BoxError;
310 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
311
312 #[inline]
313 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
314 Poll::Ready(Ok(()))
315 }
316
317 #[inline]
318 #[allow(clippy::needless_continue)]
319 fn call(&mut self, call: JobCall<B>) -> Self::Future {
320 let Some(mut futures) = self.call_with_context(call.map(Into::into), ()) else {
321 return Box::pin(async { Ok(None) });
322 };
323
324 Box::pin(async move {
325 let mut results = Vec::with_capacity(futures.len());
326 while let Some(item) = futures.next().await {
327 blueprint_core::trace!(target: "blueprint-router", outcome = ?item, "Job finished with outcome");
328 match item {
329 Ok(Some(job)) => results.push(job),
330 Ok(None) => continue,
332 Err(e) => {
333 blueprint_core::error!(?e, "Job failed");
334 return Err(e);
335 }
336 }
337 }
338
339 Ok(Some(results))
340 })
341 }
342}
343
344pub struct RouterAsService<'a, B, Ctx = ()> {
348 router: &'a mut Router<Ctx>,
349 _marker: PhantomData<B>,
350}
351
352impl<B> Service<JobCall<B>> for RouterAsService<'_, B, ()>
353where
354 B: Into<Bytes>,
355{
356 type Response = Option<Vec<JobResult>>;
357 type Error = BoxError;
358 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
359
360 #[inline]
361 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362 <Router as Service<JobCall<B>>>::poll_ready(self.router, cx)
363 }
364
365 #[inline]
366 fn call(&mut self, call: JobCall<B>) -> Self::Future {
367 self.router.call(call)
368 }
369}
370
371impl<B, Ctx> fmt::Debug for RouterAsService<'_, B, Ctx>
372where
373 Ctx: fmt::Debug,
374{
375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376 f.debug_struct("RouterAsService")
377 .field("router", &self.router)
378 .finish()
379 }
380}
381
382#[test]
383fn traits() {
384 use crate::test_helpers::*;
385 assert_send::<Router<()>>();
386 assert_sync::<Router<()>>();
387}