Skip to main content

tower_resilience_executor/
lib.rs

1//! Executor delegation layer for Tower services.
2//!
3//! This crate provides a Tower layer that delegates request processing to an
4//! arbitrary executor for parallel processing. Unlike Tower's `Buffer` layer
5//! which processes requests serially, this layer spawns each request as a
6//! separate task, enabling true parallelism.
7//!
8//! # Use Cases
9//!
10//! - **CPU-bound processing**: Parallelize CPU-intensive request handling
11//! - **Runtime isolation**: Process requests on a dedicated runtime
12//! - **Thread pool delegation**: Use specific thread pools for certain workloads
13//! - **Blocking operations**: Offload blocking I/O to dedicated threads
14//!
15//! # Example
16//!
17//! ```rust
18//! use tower_resilience_executor::{ExecutorLayer, Executor};
19//! use tower::{Service, ServiceBuilder, ServiceExt};
20//! use tokio::runtime::Handle;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! // Create a simple service
24//! let service = tower::service_fn(|req: String| async move {
25//!     Ok::<_, std::convert::Infallible>(format!("Hello, {}!", req))
26//! });
27//!
28//! // Wrap with executor layer using current runtime
29//! let mut service = ServiceBuilder::new()
30//!     .layer(ExecutorLayer::current())
31//!     .service(service);
32//!
33//! // Make a request - it will be processed on a spawned task
34//! let response = service.ready().await?.call("World".to_string()).await?;
35//! assert_eq!(response, "Hello, World!");
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! # Using a Dedicated Runtime
41//!
42//! ```rust,no_run
43//! use tower_resilience_executor::ExecutorLayer;
44//! use tower::ServiceBuilder;
45//!
46//! // Create a dedicated runtime for heavy computation
47//! let compute_runtime = tokio::runtime::Builder::new_multi_thread()
48//!     .worker_threads(8)
49//!     .thread_name("compute")
50//!     .build()
51//!     .unwrap();
52//!
53//! // Use the dedicated runtime for request processing
54//! let layer = ExecutorLayer::new(compute_runtime.handle().clone());
55//! ```
56//!
57//! # Combining with Bulkhead
58//!
59//! For bounded parallel execution, combine with a bulkhead layer:
60//!
61//! ```rust,ignore
62//! use tower_resilience_executor::ExecutorLayer;
63//! use tower_resilience_bulkhead::BulkheadLayer;
64//! use tower::ServiceBuilder;
65//!
66//! let service = ServiceBuilder::new()
67//!     // Limit concurrent requests
68//!     .layer(BulkheadLayer::builder().max_concurrent_calls(16).build())
69//!     // Execute on dedicated runtime
70//!     .layer(ExecutorLayer::current())
71//!     .service(tower::service_fn(|_: ()| async { Ok::<_, ()>(()) }));
72//! ```
73//!
74//! # Service Requirements
75//!
76//! The wrapped service must implement `Clone`. This is necessary because each
77//! spawned task needs its own instance of the service. Most Tower services
78//! already implement `Clone`, and for those that don't, consider wrapping
79//! them with `Buffer` first.
80
81mod executor;
82mod layer;
83mod service;
84
85pub use executor::{BlockingExecutor, CurrentRuntime, Executor};
86pub use layer::{ExecutorLayer, ExecutorLayerBuilder};
87pub use service::{ExecutorError, ExecutorFuture, ExecutorService};
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use tower::{Service, ServiceBuilder, ServiceExt};
93
94    #[tokio::test]
95    async fn test_basic_usage() {
96        let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
97
98        let mut service = ServiceBuilder::new()
99            .layer(ExecutorLayer::current())
100            .service(service);
101
102        let response = service.ready().await.unwrap().call(21).await.unwrap();
103        assert_eq!(response, 42);
104    }
105
106    #[tokio::test]
107    async fn test_parallel_execution() {
108        use std::sync::atomic::{AtomicUsize, Ordering};
109        use std::sync::Arc;
110        use std::time::Duration;
111
112        let counter = Arc::new(AtomicUsize::new(0));
113        let max_concurrent = Arc::new(AtomicUsize::new(0));
114
115        let counter_clone = Arc::clone(&counter);
116        let max_clone = Arc::clone(&max_concurrent);
117
118        let service = tower::service_fn(move |_req: ()| {
119            let counter = Arc::clone(&counter_clone);
120            let max_concurrent = Arc::clone(&max_clone);
121            async move {
122                let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
123
124                // Update max concurrent if this is higher
125                let mut max = max_concurrent.load(Ordering::SeqCst);
126                while current > max {
127                    match max_concurrent.compare_exchange_weak(
128                        max,
129                        current,
130                        Ordering::SeqCst,
131                        Ordering::SeqCst,
132                    ) {
133                        Ok(_) => break,
134                        Err(m) => max = m,
135                    }
136                }
137
138                // Simulate some work
139                tokio::time::sleep(Duration::from_millis(50)).await;
140
141                counter.fetch_sub(1, Ordering::SeqCst);
142                Ok::<_, &str>(())
143            }
144        });
145
146        let service = ServiceBuilder::new()
147            .layer(ExecutorLayer::current())
148            .service(service);
149
150        // Spawn multiple concurrent requests
151        let mut handles = Vec::new();
152        for _ in 0..10 {
153            let mut svc = service.clone();
154            handles.push(tokio::spawn(async move {
155                svc.ready().await.unwrap().call(()).await.unwrap();
156            }));
157        }
158
159        // Wait for all to complete
160        for handle in handles {
161            handle.await.unwrap();
162        }
163
164        // Verify we had parallel execution
165        assert!(
166            max_concurrent.load(Ordering::SeqCst) > 1,
167            "Expected parallel execution, but max concurrent was {}",
168            max_concurrent.load(Ordering::SeqCst)
169        );
170    }
171
172    #[tokio::test]
173    async fn test_with_custom_executor() {
174        let handle = tokio::runtime::Handle::current();
175
176        let service =
177            tower::service_fn(|req: String| async move { Ok::<_, &str>(req.to_uppercase()) });
178
179        let mut service = ServiceBuilder::new()
180            .layer(ExecutorLayer::new(handle))
181            .service(service);
182
183        let response = service
184            .ready()
185            .await
186            .unwrap()
187            .call("hello".to_string())
188            .await
189            .unwrap();
190        assert_eq!(response, "HELLO");
191    }
192
193    #[tokio::test]
194    async fn test_error_propagation() {
195        let service = tower::service_fn(|_req: ()| async move { Err::<(), _>("service error") });
196
197        let mut service = ServiceBuilder::new()
198            .layer(ExecutorLayer::current())
199            .service(service);
200
201        let result = service.ready().await.unwrap().call(()).await;
202        assert!(matches!(
203            result,
204            Err(ExecutorError::Service("service error"))
205        ));
206    }
207
208    #[tokio::test]
209    async fn test_builder_pattern() {
210        let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req) });
211
212        let mut service = ServiceBuilder::new()
213            .layer(
214                ExecutorLayer::<tokio::runtime::Handle>::builder()
215                    .current()
216                    .build(),
217            )
218            .service(service);
219
220        let response = service.ready().await.unwrap().call(42).await.unwrap();
221        assert_eq!(response, 42);
222    }
223}