rama_tower/service.rs
1use crate::core::Service as TowerService;
2use crate::service_ready::Ready;
3use std::{fmt, sync::Arc};
4use tokio::sync::Mutex;
5
6#[derive(Clone)]
7/// Adapter to use a [`tower::Service`] as a [`rama::Service`],
8/// cloning the servicer for each request it has to serve.
9///
10/// Note that:
11/// - you should use [`SharedServiceAdapter`] in case you do not want it to be [`Clone`]d,
12/// but instead shared across serve calls (we'll wrap your tower service with a [`Mutex`]);
13/// - you are required to use the [`LayerServiceAdapter`] for tower layer services,
14/// which will automatically be the case if you use [`LayerAdapter`] to wrap a [`tower::Layer`].
15///
16/// ## Halting
17///
18/// This adapter assumes that a service will always become ready eventually,
19/// as it will call [`poll_ready`] until ready prior to [`calling`] the [`tower::Service`].
20/// Please ensure that your [`tower::Service`] does not require a side-step to prevent such halting.
21///
22/// [`tower::Service`]: tower_service::Service
23/// [`tower::Layer`]: tower_layer::Layer
24/// [`rama::Service`]: ::Service
25/// [`LayerAdapter`]: super::LayerServiceAdapter.
26/// [`LayerServiceAdapter`]: super::LayerServiceAdapter.
27/// [`poll_ready`]: tower_service::Service::poll_ready
28/// [`calling`]: tower_service::Service::call
29pub struct ServiceAdapter<T>(T);
30
31impl<T: Clone + Send + Sync + 'static> ServiceAdapter<T> {
32 /// Adapt a [`Clone`]/call [`tower::Service`] into a [`rama::Service`].
33 ///
34 /// See [`ServiceAdapter`] for more information.
35 ///
36 /// [`tower::Service`]: tower_service::Service
37 /// [`rama::Service`]: rama_core::Service
38 pub fn new(svc: T) -> Self {
39 Self(svc)
40 }
41
42 /// Consume itself to return the inner [`tower::Service`] back.
43 ///
44 /// [`tower::Service`]: tower_service::Service
45 pub fn into_inner(self) -> T {
46 self.0
47 }
48}
49
50impl<T: fmt::Debug> fmt::Debug for ServiceAdapter<T> {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_tuple("ServiceAdapter").field(&self.0).finish()
53 }
54}
55
56impl<T, State, Request> rama_core::Service<State, Request> for ServiceAdapter<T>
57where
58 T: TowerService<Request, Response: Send + 'static, Error: Send + 'static, Future: Send>
59 + Clone
60 + Send
61 + Sync
62 + 'static,
63 State: Clone + Send + Sync + 'static,
64 Request: Send + 'static,
65{
66 type Response = T::Response;
67 type Error = T::Error;
68
69 fn serve(
70 &self,
71 _ctx: rama_core::Context<State>,
72 req: Request,
73 ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
74 let svc = self.0.clone();
75 async move {
76 let mut svc = svc;
77 let ready = Ready::new(&mut svc);
78 let ready_svc = ready.await?;
79 ready_svc.call(req).await
80 }
81 }
82}
83
84/// Adapter to use a [`tower::Service`] as a [`rama::Service`],
85/// sharing the service between each request it has to serve.
86///
87/// Note that:
88/// - you should use [`ServiceAdapter`] in case you do not want it to be shared,
89/// and prefer it to be [`Clone`]d instead, which is anyway the more "normal" scenario;
90/// - you are required to use the [`LayerServiceAdapter`] for tower layer services,
91/// which will automatically be the case if you use [`LayerAdapter`] to wrap a [`tower::Layer`].
92///
93/// ## Halting
94///
95/// This adapter assumes that a service will always become ready eventually,
96/// as it will call [`poll_ready`] until ready prior to [`calling`] the [`tower::Service`].
97/// Please ensure that your [`tower::Service`] does not require a side-step to prevent such halting.
98///
99/// [`tower::Service`]: tower_service::Service
100/// [`tower::Layer`]: tower_layer::Layer
101/// [`rama::Service`]: rama_core::Service
102/// [`LayerAdapter`]: super::LayerServiceAdapter.
103/// [`LayerServiceAdapter`]: super::LayerServiceAdapter.
104/// [`poll_ready`]: tower_service::Service::poll_ready
105/// [`calling`]: tower_service::Service::call
106pub struct SharedServiceAdapter<T>(Arc<Mutex<T>>);
107
108impl<T: Send + Sync + 'static> SharedServiceAdapter<T> {
109 /// Adapt a shared [`tower::Service`] into a [`rama::Service`].
110 ///
111 /// See [`SharedServiceAdapter`] for more information.
112 ///
113 /// [`tower::Service`]: tower_service::Service
114 /// [`rama::Service`]: rama_core::Service
115 pub fn new(svc: T) -> Self {
116 Self(Arc::new(Mutex::new(svc)))
117 }
118}
119
120impl<T> Clone for SharedServiceAdapter<T> {
121 fn clone(&self) -> Self {
122 Self(self.0.clone())
123 }
124}
125
126impl<T: fmt::Debug> fmt::Debug for SharedServiceAdapter<T> {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_tuple("SharedServiceAdapter")
129 .field(&self.0)
130 .finish()
131 }
132}
133
134impl<T, State, Request> rama_core::Service<State, Request> for SharedServiceAdapter<T>
135where
136 T: TowerService<Request, Response: Send + 'static, Error: Send + 'static, Future: Send>
137 + Send
138 + Sync
139 + 'static,
140 State: Clone + Send + Sync + 'static,
141 Request: Send + 'static,
142{
143 type Response = T::Response;
144 type Error = T::Error;
145
146 fn serve(
147 &self,
148 _ctx: rama_core::Context<State>,
149 req: Request,
150 ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
151 let svc = self.0.clone();
152 async move {
153 let svc = svc;
154 let mut svc_guard = svc.lock().await;
155 let ready = Ready::new(&mut *svc_guard);
156 let ready_svc = ready.await?;
157 ready_svc.call(req).await
158 }
159 }
160}