blueprint_router/
routing.rs

1//! Routing between [`Service`]s and jobs.
2
3use crate::future::{Route, RouteFuture};
4use alloc::boxed::Box;
5
6use crate::job_id_router::JobIdRouter;
7use crate::util::try_downcast;
8use blueprint_core::{IntoJobResult, Job, JobCall, JobId, JobResult};
9
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use bytes::Bytes;
13use core::marker::PhantomData;
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use core::{fmt, iter};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use tower::{BoxError, Layer, Service};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
22pub(crate) struct RouteId(pub u32);
23
24/// The router type for composing jobs and services.
25///
26/// `Router<Ctx>` means a router that is missing a context of type `Ctx` to be able to handle requests.
27/// Thus, only `Router<()>` (i.e. without missing context) can be passed to a [`BlueprintRunner`]. See [`Router::with_context()`] for more details.
28///
29/// [`BlueprintRunner`]: https://docs.rs/blueprint-runner/latest/blueprint_runner/struct.BlueprintRunner.html
30#[must_use]
31pub struct Router<Ctx = ()> {
32    inner: Arc<JobIdRouter<Ctx>>,
33}
34
35impl<Ctx> Clone for Router<Ctx> {
36    fn clone(&self) -> Self {
37        Self {
38            inner: Arc::clone(&self.inner),
39        }
40    }
41}
42
43impl<Ctx> Default for Router<Ctx>
44where
45    Ctx: Clone + Send + Sync + 'static,
46{
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl<Ctx> fmt::Debug for Router<Ctx> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("Router")
55            .field("inner", &self.inner)
56            .finish()
57    }
58}
59
60impl<Ctx> Router<Ctx>
61where
62    Ctx: Clone + Send + Sync + 'static,
63{
64    /// Create a new `Router`.
65    ///
66    /// Unless you add additional routes this will ignore all requests.
67    pub fn new() -> Self {
68        Self {
69            inner: Arc::new(JobIdRouter::default()),
70        }
71    }
72
73    fn into_inner(self) -> JobIdRouter<Ctx> {
74        Arc::try_unwrap(self.inner).unwrap_or_else(|arc| arc.as_ref().clone())
75    }
76
77    /// Add a [`Job`] to the router, with the given job ID.
78    ///
79    /// The job will be called when a [`JobCall`] with the given job ID is received by the router.
80    #[track_caller]
81    pub fn route<I, J, T>(self, job_id: I, job: J) -> Self
82    where
83        I: Into<JobId>,
84        J: Job<T, Ctx>,
85        T: 'static,
86    {
87        let mut inner = self.into_inner();
88        inner.route(job_id, job);
89        Router {
90            inner: Arc::new(inner),
91        }
92    }
93
94    /// Add a [`Service`] to the router, with the given job ID.
95    ///
96    /// # Panics
97    ///
98    /// Panics if `service` is a `Router`.
99    pub fn route_service<T>(self, job_id: u32, service: T) -> Self
100    where
101        T: Service<JobCall, Error = BoxError> + Clone + Send + Sync + 'static,
102        T::Response: IntoJobResult,
103        T::Future: Send + 'static,
104    {
105        let service = match try_downcast::<Router<Ctx>, _>(service) {
106            Ok(_) => {
107                panic!("Invalid route: `Router::route_service` cannot be used with `Router`s.");
108            }
109            Err(service) => service,
110        };
111
112        let mut inner = self.into_inner();
113        inner.route_service(job_id, service);
114        Router {
115            inner: Arc::new(inner),
116        }
117    }
118
119    /// Add a [`Job`] that *always* gets called, regardless of the job ID
120    ///
121    /// This is useful for jobs that want to watch for certain events. Any [`JobCall`] received by
122    /// router will be passed to the `job`, regardless if another route matches.
123    #[track_caller]
124    pub fn always<J, T>(self, job: J) -> Self
125    where
126        J: Job<T, Ctx>,
127        T: 'static,
128    {
129        let mut inner = self.into_inner();
130        inner.always(job);
131        Router {
132            inner: Arc::new(inner),
133        }
134    }
135
136    /// Add a [`Job`] that gets called if no other route matches
137    ///
138    /// NOTE: This will replace any existing fallback route.
139    ///
140    /// This will **only** be called when:
141    /// - No other route matches the job ID
142    /// - No [`always`] route is present
143    ///
144    /// [`always`]: Router::always
145    #[track_caller]
146    pub fn fallback<J, T>(self, job: J) -> Self
147    where
148        J: Job<T, Ctx>,
149        T: 'static,
150    {
151        let mut inner = self.into_inner();
152        inner.fallback(job);
153        Router {
154            inner: Arc::new(inner),
155        }
156    }
157
158    /// Apply a [`tower::Layer`] to all routes in this `Router`
159    ///
160    /// See [`Job::layer()`]
161    ///
162    /// # Examples
163    ///
164    /// ```rust
165    /// use blueprint_sdk::{Job, Router};
166    /// use tower::limit::{ConcurrencyLimit, ConcurrencyLimitLayer};
167    ///
168    /// async fn job() { /* ... */
169    /// }
170    ///
171    /// async fn another_job() { /* ... */
172    /// }
173    ///
174    /// const JOB_ID: u32 = 0;
175    /// const ANOTHER_JOB_ID: u32 = 1;
176    ///
177    /// let app = Router::new()
178    ///     .route(JOB_ID, job)
179    ///     .route(ANOTHER_JOB_ID, another_job)
180    ///     // Limit concurrent calls to both `job` and `another_job` to 64
181    ///     .layer(ConcurrencyLimitLayer::new(64));
182    /// # let _: Router = app;
183    /// ```
184    pub fn layer<L>(self, layer: L) -> Router<Ctx>
185    where
186        L: Layer<Route> + Clone + Send + Sync + 'static,
187        L::Service: Service<JobCall> + Clone + Send + Sync + 'static,
188        <L::Service as Service<JobCall>>::Response: IntoJobResult + 'static,
189        <L::Service as Service<JobCall>>::Error: Into<BoxError> + 'static,
190        <L::Service as Service<JobCall>>::Future: Send + 'static,
191    {
192        let inner = self.into_inner().layer(layer);
193        Router {
194            inner: Arc::new(inner),
195        }
196    }
197
198    /// Whether the router currently has at least one route added.
199    #[must_use]
200    pub fn has_routes(&self) -> bool {
201        self.inner.has_routes()
202    }
203
204    #[doc = include_str!("../docs/with_context.md")]
205    pub fn with_context<Ctx2>(self, context: Ctx) -> Router<Ctx2> {
206        let inner = self.into_inner().with_context(context);
207        Router {
208            inner: Arc::new(inner),
209        }
210    }
211
212    pub(crate) fn call_with_context(
213        &self,
214        call: JobCall,
215        context: Ctx,
216    ) -> Option<FuturesUnordered<RouteFuture<BoxError>>> {
217        blueprint_core::trace!(
218            target: "blueprint-router",
219            job_id = %call.job_id(),
220            metadata = ?call.metadata(),
221            body = ?call.body(),
222            "routing a job call to inner routers"
223        );
224        let (call, context) = match self.inner.call_with_context(call, context) {
225            Ok(matched_call_future) => {
226                blueprint_core::trace!(
227                    target: "blueprint-router",
228                    matched_calls = matched_call_future.len(),
229                    "A route matched this job call"
230                );
231                return Some(matched_call_future);
232            }
233            Err((call, context)) => (call, context),
234        };
235
236        // At this point, no route matched the job ID, and there are no always routes
237        blueprint_core::trace!(
238            target: "blueprint-router",
239            ?call,
240            "No explicit or always route caught this job call, passing to fallback"
241        );
242
243        self.inner
244            .call_fallback(call, context)
245            .map(|future| iter::once(future).collect::<FuturesUnordered<_>>())
246    }
247
248    /// Convert the router into a borrowed [`Service`] with a fixed request body type, to aid type
249    /// inference.
250    ///
251    /// In some cases when calling methods from [`tower::ServiceExt`] on a [`Router`] you might get
252    /// type inference errors along the lines of
253    ///
254    /// ```not_rust
255    /// let response = router.ready().await?.call(request).await?;
256    ///                       ^^^^^ cannot infer type for type parameter `B`
257    /// ```
258    ///
259    /// This happens because `Router` implements [`Service`] with `impl<B> Service<Request<B>> for Router<()>`.
260    ///
261    /// For example:
262    ///
263    /// ```compile_fail
264    /// use blueprint_sdk::{Router, JobCall, Bytes};
265    /// use tower::{Service, ServiceExt};
266    ///
267    /// const MY_JOB_ID: u8 = 0;
268    ///
269    /// # async fn async_main() -> Result<(), blueprint_sdk::core::error::BoxError> {
270    /// let mut router = Router::new().route(MY_JOB_ID, || async {});
271    /// let request = JobCall::new(MY_JOB_ID, Bytes::new());
272    /// let response = router.ready().await?.call(request).await?;
273    /// # Ok(())
274    /// # }
275    /// ```
276    ///
277    /// Calling `Router::as_service` fixes that:
278    ///
279    /// ```
280    /// use blueprint_sdk::{JobCall, Router};
281    /// use bytes::Bytes;
282    /// use tower::{Service, ServiceExt};
283    ///
284    /// const MY_JOB_ID: u32 = 0;
285    ///
286    /// # async fn async_main() -> Result<(), blueprint_sdk::core::error::BoxError> {
287    /// let mut router = Router::new().route(MY_JOB_ID, || async {});
288    /// let request = JobCall::new(MY_JOB_ID, Bytes::new());
289    /// let response = router.as_service().ready().await?.call(request).await?;
290    /// # Ok(())
291    /// # }
292    /// ```
293    ///
294    /// This is mainly used when calling `Router` in tests. It shouldn't be necessary when running
295    /// the `Router` normally via the blueprint runner.
296    pub fn as_service<B>(&mut self) -> RouterAsService<'_, B, Ctx> {
297        RouterAsService {
298            router: self,
299            _marker: PhantomData,
300        }
301    }
302}
303
304impl<B> Service<JobCall<B>> for Router<()>
305where
306    B: Into<Bytes>,
307{
308    type Response = Option<Vec<JobResult>>;
309    type Error = BoxError;
310    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
311
312    #[inline]
313    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
314        Poll::Ready(Ok(()))
315    }
316
317    #[inline]
318    #[allow(clippy::needless_continue)]
319    fn call(&mut self, call: JobCall<B>) -> Self::Future {
320        let Some(mut futures) = self.call_with_context(call.map(Into::into), ()) else {
321            return Box::pin(async { Ok(None) });
322        };
323
324        Box::pin(async move {
325            let mut results = Vec::with_capacity(futures.len());
326            while let Some(item) = futures.next().await {
327                blueprint_core::trace!(target: "blueprint-router", outcome = ?item, "Job finished with outcome");
328                match item {
329                    Ok(Some(job)) => results.push(job),
330                    // Job produced nothing, and didn't error. Don't include it.
331                    Ok(None) => continue,
332                    Err(e) => {
333                        blueprint_core::error!(?e, "Job failed");
334                        return Err(e);
335                    }
336                }
337            }
338
339            Ok(Some(results))
340        })
341    }
342}
343
344/// A [`Router`] converted into a borrowed [`Service`] with a fixed body type.
345///
346/// See [`Router::as_service`] for more details.
347pub struct RouterAsService<'a, B, Ctx = ()> {
348    router: &'a mut Router<Ctx>,
349    _marker: PhantomData<B>,
350}
351
352impl<B> Service<JobCall<B>> for RouterAsService<'_, B, ()>
353where
354    B: Into<Bytes>,
355{
356    type Response = Option<Vec<JobResult>>;
357    type Error = BoxError;
358    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
359
360    #[inline]
361    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362        <Router as Service<JobCall<B>>>::poll_ready(self.router, cx)
363    }
364
365    #[inline]
366    fn call(&mut self, call: JobCall<B>) -> Self::Future {
367        self.router.call(call)
368    }
369}
370
371impl<B, Ctx> fmt::Debug for RouterAsService<'_, B, Ctx>
372where
373    Ctx: fmt::Debug,
374{
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        f.debug_struct("RouterAsService")
377            .field("router", &self.router)
378            .finish()
379    }
380}
381
382#[test]
383fn traits() {
384    use crate::test_helpers::*;
385    assert_send::<Router<()>>();
386    assert_sync::<Router<()>>();
387}