resilience_rs/
bulkhead.rs1use std::sync::Arc;
2use std::thread;
3
4pub struct Bulkhead {
5 max_concurrent_tasks: usize,
6 semaphore: Arc<tokio::sync::Semaphore>,
7}
8
9impl Bulkhead {
10 fn new(max_concurrent_tasks: usize) -> Self {
11 Self {
12 max_concurrent_tasks,
13 semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent_tasks)),
14 }
15 }
16
17 async fn call<F, T, E>(&self, func: F) -> Result<T, E>
18 where
19 F: FnOnce() -> Result<T, E> + Send + 'static,
20 T: Send + 'static,
21 E: From<&'static str> + Send + 'static,
22 {
23 let permit = self.semaphore.acquire().await.unwrap();
24 let result = func();
25 drop(permit);
26 result
27 }
28}
29
30#[cfg(test)]
31mod tests {
32 use super::*;
33 use std::sync::atomic::{AtomicUsize, Ordering};
34 use std::sync::Arc;
35
36 #[tokio::test]
37 async fn test_bulkhead() {
38 let bulkhead = Arc::new(Bulkhead::new(3));
39
40 let handles: Vec<_> = (0..10)
41 .map(|i| {
42 let bulkhead = bulkhead.clone();
43 tokio::spawn(async move {
44 let task = move || -> Result<(), &'static str> {
45 println!("Task {} is running", i);
47 thread::sleep(Duration::from_secs(2));
48 println!("Task {} is done", i);
49 Ok(())
50 };
51
52 match bulkhead.call(task).await {
53 Ok(_) => println!("Task {} succeeded", i),
54 Err(err) => println!("Task {} failed with error: {}", i, err),
55 }
56 })
57 })
58 .collect();
59
60 futures::future::join_all(handles).await;
61}
62}
63