layered/
tower.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Tower interoperability for layered services.
5//!
6//! Use Tower services and middleware with layered services through the [`Adapter`] type.
7//! Provides bidirectional conversion between Tower and layered service traits.
8//!
9//! # Requirements
10//!
11//! - Services must be `Clone` for concurrent usage
12//! - All types must be `Send` for async compatibility
13//! - Layered services must return `Result<T, E>` when used with Tower
14//! - Tower's back-pressure (`poll_ready`) is handled automatically
15//!
16//! # Examples
17//!
18//! Use a layered service with Tower middleware:
19//!
20//! ```
21//! use std::time::Duration;
22//!
23//! use layered::tower::Adapter;
24//! use layered::{Execute, Service};
25//! use tower::ServiceBuilder;
26//!
27//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let service = Execute::new(|req: String| async move {
29//!     Ok::<String, std::io::Error>(format!("Processed: {}", req))
30//! });
31//!
32//! let tower_service = ServiceBuilder::new().service(Adapter(service));
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! Use Tower layers in a layered stack:
38//!
39//! ```
40//! use layered::tower::tower_layer;
41//! use layered::{Execute, Service, Stack};
42//! use tower_layer::Identity;
43//!
44//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
45//! let service = (
46//!     tower_layer(Identity::default()),
47//!     Execute::new(|req: String| async move {
48//!         Ok::<String, std::io::Error>(format!("Processed: {}", req))
49//!     }),
50//! ).build();
51//!
52//! let result = service.execute("request".to_string()).await;
53//! # Ok(())
54//! # }
55//! ```
56
57use std::future::poll_fn;
58use std::pin::Pin;
59use std::task::{Context, Poll};
60
61use tower_layer::Layer;
62
63use crate::Service;
64
65/// Bidirectional adapter between layered and Tower service traits.
66///
67/// Wraps a service to convert between `layered`'s [`Service`] and Tower's
68/// [`tower_service::Service`]. Handles back-pressure
69/// and async execution differences automatically.
70#[derive(Debug, Clone)]
71pub struct Adapter<S>(pub S);
72
73impl<S, Req, T, E> tower_service::Service<Req> for Adapter<S>
74where
75    S: Service<Req, Out = Result<T, E>> + Clone + 'static,
76    T: Send + 'static,
77    E: Send + 'static,
78    Req: Send + 'static,
79{
80    type Response = T;
81    type Error = E;
82    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
83
84    #[cfg_attr(test, mutants::skip)] // causes unkillable mutants
85    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        // Layered services don't have back-pressure - they're always ready to accept requests
87        Poll::Ready(Ok(()))
88    }
89
90    fn call(&mut self, req: Req) -> Self::Future {
91        let service = self.0.clone();
92        Box::pin(async move { service.execute(req).await })
93    }
94}
95
96impl<S, In: Send> Service<In> for Adapter<S>
97where
98    S: tower_service::Service<In> + Send + Sync + Clone,
99    S::Future: Send,
100{
101    type Out = Result<S::Response, S::Error>;
102
103    fn execute(&self, input: In) -> impl Future<Output = Self::Out> + Send {
104        let mut clone = self.0.clone();
105
106        async move {
107            // Wait for the Tower service to be ready (handle back-pressure)
108            poll_fn(|cx| clone.poll_ready(cx)).await?;
109            // Execute the request
110            clone.call(input).await
111        }
112    }
113}
114
115/// Wraps a Tower layer for use with layered services.
116///
117/// # Examples
118///
119/// ```
120/// use layered::tower::tower_layer;
121/// use tower::layer::util::Identity;
122///
123/// let identity_layer = tower_layer(Identity::new());
124/// ```
125pub fn tower_layer<L>(tower_layer: L) -> AdapterLayer<L> {
126    AdapterLayer(tower_layer)
127}
128
129/// Layer adapter that applies Tower layers to layered services.
130///
131/// Wraps a Tower layer to handle service trait conversions automatically.
132#[derive(Debug, Clone)]
133pub struct AdapterLayer<L>(L);
134
135impl<L, S> Layer<S> for AdapterLayer<L>
136where
137    L: Layer<Adapter<S>> + Clone,
138{
139    type Service = Adapter<L::Service>;
140
141    fn layer(&self, inner: S) -> Self::Service {
142        // 1. Wrap the layered service to make it Tower-compatible
143        let tower_adapted = Adapter(inner);
144        // 2. Apply the Tower layer
145        let tower_layered = self.0.layer(tower_adapted);
146        // 3. Wrap the result back for compatibility with layered services
147        Adapter(tower_layered)
148    }
149}
150
151#[cfg_attr(coverage_nightly, coverage(off))]
152#[cfg(test)]
153mod tests {
154    use futures::executor::block_on;
155    use tower::service_fn;
156    use tower_service::Service as TowerService;
157
158    use super::*;
159    use crate::testing::MockService;
160
161    #[test]
162    fn adapt_tower_ok() {
163        let service = service_fn(|req: u32| async move { Ok::<_, ()>(req + 1) });
164        let service = Adapter(service);
165
166        let result = block_on(service.execute(0));
167
168        assert_eq!(result, Ok(1));
169    }
170
171    #[test]
172    fn adapt_tower_ensure_poll_error_respected() {
173        let service = MockService::new(Poll::Ready(Err("error".to_string())), Err("call error".to_string()));
174        let service = Adapter(service);
175
176        let result = block_on(service.execute("request".to_string()));
177
178        assert_eq!(result, Err("error".to_string()));
179    }
180
181    #[test]
182    fn adapt_oxidizer_ok() {
183        let mock_service = MockService::new(Poll::Ready(Ok(())), Ok("success".to_string()));
184        let mut service = Adapter(mock_service);
185
186        let result = block_on(async move { service.call("request".to_string()).await });
187
188        assert_eq!(result, Ok("success".to_string()));
189    }
190
191    #[test]
192    fn poll_ready_always_returns_ready_ok() {
193        let mock_service = MockService::new(Poll::Ready(Ok(())), Ok("success".to_string()));
194        let mut adapter = Adapter(mock_service);
195
196        let waker = futures::task::noop_waker();
197        let mut cx = Context::from_waker(&waker);
198
199        let result = adapter.poll_ready(&mut cx);
200        assert_eq!(result, Poll::Ready(Ok(())));
201    }
202
203    #[test]
204    fn poll_ready_consistent_behavior() {
205        let mock_service = MockService::new(Poll::Ready(Ok(())), Ok("success".to_string()));
206        let mut adapter = Adapter(mock_service);
207
208        let waker = futures::task::noop_waker();
209        let mut cx = Context::from_waker(&waker);
210
211        // Multiple calls should return the same result
212        for _ in 0..3 {
213            let result = adapter.poll_ready(&mut cx);
214            assert_eq!(result, Poll::Ready(Ok(())));
215        }
216    }
217
218    #[test]
219    fn poll_ready_with_mock_service() {
220        let mock_service = MockService::new(Poll::Ready(Ok(())), Ok("success".to_string()));
221        let mut mock_adapter = Adapter(mock_service);
222
223        let waker = futures::task::noop_waker();
224        let mut cx = Context::from_waker(&waker);
225
226        assert_eq!(mock_adapter.poll_ready(&mut cx), Poll::Ready(Ok(())));
227    }
228
229    #[test]
230    fn poll_ready_mutation_equivalence() {
231        // This test specifically addresses the mutation testing result
232        // Both Poll::Ready(Ok(())) and Poll::from(Ok(())) should be functionally equivalent
233        let mock_service = MockService::new(Poll::Ready(Ok(())), Ok("success".to_string()));
234        let mut adapter = Adapter(mock_service);
235
236        let waker = futures::task::noop_waker();
237        let mut cx = Context::from_waker(&waker);
238
239        let result1 = adapter.poll_ready(&mut cx);
240        let result2 = Poll::from(Ok::<(), String>(()));
241
242        // Both should be Poll::Ready(Ok(()))
243        assert_eq!(result1, Poll::Ready(Ok(())));
244        assert_eq!(result2, Poll::Ready(Ok(())));
245        assert_eq!(result1, result2);
246    }
247
248    #[test]
249    fn adapter_execute_fails_when_tower_service_poll_ready_errors() {
250        let mock_service = MockService::new(Poll::Ready(Err("service unavailable".to_string())), Ok("success".to_string()));
251        let service = Adapter(mock_service);
252
253        let result = block_on(service.execute("request".to_string()));
254
255        assert_eq!(result, Err("service unavailable".to_string()));
256    }
257
258    #[test]
259    fn tower_layer_adapter() {
260        use crate::{Execute, Stack};
261        use tower_layer::Identity;
262
263        let stack = (tower_layer(Identity::new()), Execute::new(|x: i32| async move { Ok::<_, ()>(x) }));
264        let svc = stack.build();
265        assert_eq!(block_on(svc.execute(42)), Ok(42));
266    }
267}