rama_tower/
service.rs

1use crate::core::Service as TowerService;
2use crate::service_ready::Ready;
3use std::{fmt, sync::Arc};
4use tokio::sync::Mutex;
5
6#[derive(Clone)]
7/// Adapter to use a [`tower::Service`] as a [`rama::Service`],
8/// cloning the servicer for each request it has to serve.
9///
10/// Note that:
11/// - you should use [`SharedServiceAdapter`] in case you do not want it to be [`Clone`]d,
12///   but instead shared across serve calls (we'll wrap your tower service with a [`Mutex`]);
13/// - you are required to use the [`LayerServiceAdapter`] for tower layer services,
14///   which will automatically be the case if you use [`LayerAdapter`] to wrap a [`tower::Layer`].
15///
16/// ## Halting
17///
18/// This adapter assumes that a service will always become ready eventually,
19/// as it will call [`poll_ready`] until ready prior to [`calling`] the [`tower::Service`].
20/// Please ensure that your [`tower::Service`] does not require a side-step to prevent such halting.
21///
22/// [`tower::Service`]: tower_service::Service
23/// [`tower::Layer`]: tower_layer::Layer
24/// [`rama::Service`]: ::Service
25/// [`LayerAdapter`]: super::LayerServiceAdapter.
26/// [`LayerServiceAdapter`]: super::LayerServiceAdapter.
27/// [`poll_ready`]: tower_service::Service::poll_ready
28/// [`calling`]: tower_service::Service::call
29pub struct ServiceAdapter<T>(T);
30
31impl<T: Clone + Send + Sync + 'static> ServiceAdapter<T> {
32    /// Adapt a [`Clone`]/call [`tower::Service`] into a [`rama::Service`].
33    ///
34    /// See [`ServiceAdapter`] for more information.
35    ///
36    /// [`tower::Service`]: tower_service::Service
37    /// [`rama::Service`]: rama_core::Service
38    pub fn new(svc: T) -> Self {
39        Self(svc)
40    }
41
42    /// Consume itself to return the inner [`tower::Service`] back.
43    ///
44    /// [`tower::Service`]: tower_service::Service
45    pub fn into_inner(self) -> T {
46        self.0
47    }
48}
49
50impl<T: fmt::Debug> fmt::Debug for ServiceAdapter<T> {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_tuple("ServiceAdapter").field(&self.0).finish()
53    }
54}
55
56impl<T, State, Request> rama_core::Service<State, Request> for ServiceAdapter<T>
57where
58    T: TowerService<Request, Response: Send + 'static, Error: Send + 'static, Future: Send>
59        + Clone
60        + Send
61        + Sync
62        + 'static,
63    State: Clone + Send + Sync + 'static,
64    Request: Send + 'static,
65{
66    type Response = T::Response;
67    type Error = T::Error;
68
69    fn serve(
70        &self,
71        _ctx: rama_core::Context<State>,
72        req: Request,
73    ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
74        let svc = self.0.clone();
75        async move {
76            let mut svc = svc;
77            let ready = Ready::new(&mut svc);
78            let ready_svc = ready.await?;
79            ready_svc.call(req).await
80        }
81    }
82}
83
84/// Adapter to use a [`tower::Service`] as a [`rama::Service`],
85/// sharing the service between each request it has to serve.
86///
87/// Note that:
88/// - you should use [`ServiceAdapter`] in case you do not want it to be shared,
89///   and prefer it to be [`Clone`]d instead, which is anyway the more "normal" scenario;
90/// - you are required to use the [`LayerServiceAdapter`] for tower layer services,
91///   which will automatically be the case if you use [`LayerAdapter`] to wrap a [`tower::Layer`].
92///
93/// ## Halting
94///
95/// This adapter assumes that a service will always become ready eventually,
96/// as it will call [`poll_ready`] until ready prior to [`calling`] the [`tower::Service`].
97/// Please ensure that your [`tower::Service`] does not require a side-step to prevent such halting.
98///
99/// [`tower::Service`]: tower_service::Service
100/// [`tower::Layer`]: tower_layer::Layer
101/// [`rama::Service`]: rama_core::Service
102/// [`LayerAdapter`]: super::LayerServiceAdapter.
103/// [`LayerServiceAdapter`]: super::LayerServiceAdapter.
104/// [`poll_ready`]: tower_service::Service::poll_ready
105/// [`calling`]: tower_service::Service::call
106pub struct SharedServiceAdapter<T>(Arc<Mutex<T>>);
107
108impl<T: Send + Sync + 'static> SharedServiceAdapter<T> {
109    /// Adapt a shared [`tower::Service`] into a [`rama::Service`].
110    ///
111    /// See [`SharedServiceAdapter`] for more information.
112    ///
113    /// [`tower::Service`]: tower_service::Service
114    /// [`rama::Service`]: rama_core::Service
115    pub fn new(svc: T) -> Self {
116        Self(Arc::new(Mutex::new(svc)))
117    }
118}
119
120impl<T> Clone for SharedServiceAdapter<T> {
121    fn clone(&self) -> Self {
122        Self(self.0.clone())
123    }
124}
125
126impl<T: fmt::Debug> fmt::Debug for SharedServiceAdapter<T> {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_tuple("SharedServiceAdapter")
129            .field(&self.0)
130            .finish()
131    }
132}
133
134impl<T, State, Request> rama_core::Service<State, Request> for SharedServiceAdapter<T>
135where
136    T: TowerService<Request, Response: Send + 'static, Error: Send + 'static, Future: Send>
137        + Send
138        + Sync
139        + 'static,
140    State: Clone + Send + Sync + 'static,
141    Request: Send + 'static,
142{
143    type Response = T::Response;
144    type Error = T::Error;
145
146    fn serve(
147        &self,
148        _ctx: rama_core::Context<State>,
149        req: Request,
150    ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
151        let svc = self.0.clone();
152        async move {
153            let svc = svc;
154            let mut svc_guard = svc.lock().await;
155            let ready = Ready::new(&mut *svc_guard);
156            let ready_svc = ready.await?;
157            ready_svc.call(req).await
158        }
159    }
160}