tower_resilience_executor/lib.rs
1//! Executor delegation layer for Tower services.
2//!
3//! This crate provides a Tower layer that delegates request processing to an
4//! arbitrary executor for parallel processing. Unlike Tower's `Buffer` layer
5//! which processes requests serially, this layer spawns each request as a
6//! separate task, enabling true parallelism.
7//!
8//! # Use Cases
9//!
10//! - **CPU-bound processing**: Parallelize CPU-intensive request handling
11//! - **Runtime isolation**: Process requests on a dedicated runtime
12//! - **Thread pool delegation**: Use specific thread pools for certain workloads
13//! - **Blocking operations**: Offload blocking I/O to dedicated threads
14//!
15//! # Example
16//!
17//! ```rust
18//! use tower_resilience_executor::{ExecutorLayer, Executor};
19//! use tower::{Service, ServiceBuilder, ServiceExt};
20//! use tokio::runtime::Handle;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! // Create a simple service
24//! let service = tower::service_fn(|req: String| async move {
25//! Ok::<_, std::convert::Infallible>(format!("Hello, {}!", req))
26//! });
27//!
28//! // Wrap with executor layer using current runtime
29//! let mut service = ServiceBuilder::new()
30//! .layer(ExecutorLayer::current())
31//! .service(service);
32//!
33//! // Make a request - it will be processed on a spawned task
34//! let response = service.ready().await?.call("World".to_string()).await?;
35//! assert_eq!(response, "Hello, World!");
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! # Using a Dedicated Runtime
41//!
42//! ```rust,no_run
43//! use tower_resilience_executor::ExecutorLayer;
44//! use tower::ServiceBuilder;
45//!
46//! // Create a dedicated runtime for heavy computation
47//! let compute_runtime = tokio::runtime::Builder::new_multi_thread()
48//! .worker_threads(8)
49//! .thread_name("compute")
50//! .build()
51//! .unwrap();
52//!
53//! // Use the dedicated runtime for request processing
54//! let layer = ExecutorLayer::new(compute_runtime.handle().clone());
55//! ```
56//!
57//! # Combining with Bulkhead
58//!
59//! For bounded parallel execution, combine with a bulkhead layer:
60//!
61//! ```rust,ignore
62//! use tower_resilience_executor::ExecutorLayer;
63//! use tower_resilience_bulkhead::BulkheadLayer;
64//! use tower::ServiceBuilder;
65//!
66//! let service = ServiceBuilder::new()
67//! // Limit concurrent requests
68//! .layer(BulkheadLayer::builder().max_concurrent_calls(16).build())
69//! // Execute on dedicated runtime
70//! .layer(ExecutorLayer::current())
71//! .service(tower::service_fn(|_: ()| async { Ok::<_, ()>(()) }));
72//! ```
73//!
74//! # Service Requirements
75//!
76//! The wrapped service must implement `Clone`. This is necessary because each
77//! spawned task needs its own instance of the service. Most Tower services
78//! already implement `Clone`, and for those that don't, consider wrapping
79//! them with `Buffer` first.
80
81mod executor;
82mod layer;
83mod service;
84
85pub use executor::{BlockingExecutor, CurrentRuntime, Executor};
86pub use layer::{ExecutorLayer, ExecutorLayerBuilder};
87pub use service::{ExecutorError, ExecutorFuture, ExecutorService};
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use tower::{Service, ServiceBuilder, ServiceExt};
93
94 #[tokio::test]
95 async fn test_basic_usage() {
96 let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
97
98 let mut service = ServiceBuilder::new()
99 .layer(ExecutorLayer::current())
100 .service(service);
101
102 let response = service.ready().await.unwrap().call(21).await.unwrap();
103 assert_eq!(response, 42);
104 }
105
106 #[tokio::test]
107 async fn test_parallel_execution() {
108 use std::sync::atomic::{AtomicUsize, Ordering};
109 use std::sync::Arc;
110 use std::time::Duration;
111
112 let counter = Arc::new(AtomicUsize::new(0));
113 let max_concurrent = Arc::new(AtomicUsize::new(0));
114
115 let counter_clone = Arc::clone(&counter);
116 let max_clone = Arc::clone(&max_concurrent);
117
118 let service = tower::service_fn(move |_req: ()| {
119 let counter = Arc::clone(&counter_clone);
120 let max_concurrent = Arc::clone(&max_clone);
121 async move {
122 let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
123
124 // Update max concurrent if this is higher
125 let mut max = max_concurrent.load(Ordering::SeqCst);
126 while current > max {
127 match max_concurrent.compare_exchange_weak(
128 max,
129 current,
130 Ordering::SeqCst,
131 Ordering::SeqCst,
132 ) {
133 Ok(_) => break,
134 Err(m) => max = m,
135 }
136 }
137
138 // Simulate some work
139 tokio::time::sleep(Duration::from_millis(50)).await;
140
141 counter.fetch_sub(1, Ordering::SeqCst);
142 Ok::<_, &str>(())
143 }
144 });
145
146 let service = ServiceBuilder::new()
147 .layer(ExecutorLayer::current())
148 .service(service);
149
150 // Spawn multiple concurrent requests
151 let mut handles = Vec::new();
152 for _ in 0..10 {
153 let mut svc = service.clone();
154 handles.push(tokio::spawn(async move {
155 svc.ready().await.unwrap().call(()).await.unwrap();
156 }));
157 }
158
159 // Wait for all to complete
160 for handle in handles {
161 handle.await.unwrap();
162 }
163
164 // Verify we had parallel execution
165 assert!(
166 max_concurrent.load(Ordering::SeqCst) > 1,
167 "Expected parallel execution, but max concurrent was {}",
168 max_concurrent.load(Ordering::SeqCst)
169 );
170 }
171
172 #[tokio::test]
173 async fn test_with_custom_executor() {
174 let handle = tokio::runtime::Handle::current();
175
176 let service =
177 tower::service_fn(|req: String| async move { Ok::<_, &str>(req.to_uppercase()) });
178
179 let mut service = ServiceBuilder::new()
180 .layer(ExecutorLayer::new(handle))
181 .service(service);
182
183 let response = service
184 .ready()
185 .await
186 .unwrap()
187 .call("hello".to_string())
188 .await
189 .unwrap();
190 assert_eq!(response, "HELLO");
191 }
192
193 #[tokio::test]
194 async fn test_error_propagation() {
195 let service = tower::service_fn(|_req: ()| async move { Err::<(), _>("service error") });
196
197 let mut service = ServiceBuilder::new()
198 .layer(ExecutorLayer::current())
199 .service(service);
200
201 let result = service.ready().await.unwrap().call(()).await;
202 assert!(matches!(
203 result,
204 Err(ExecutorError::Service("service error"))
205 ));
206 }
207
208 #[tokio::test]
209 async fn test_builder_pattern() {
210 let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req) });
211
212 let mut service = ServiceBuilder::new()
213 .layer(
214 ExecutorLayer::<tokio::runtime::Handle>::builder()
215 .current()
216 .build(),
217 )
218 .service(service);
219
220 let response = service.ready().await.unwrap().call(42).await.unwrap();
221 assert_eq!(response, 42);
222 }
223}