resilience_rs/
bulkhead.rs

1use std::sync::Arc;
2use std::thread;
3
4pub struct Bulkhead {
5    max_concurrent_tasks: usize,
6    semaphore: Arc<tokio::sync::Semaphore>,
7}
8
9impl Bulkhead {
10    fn new(max_concurrent_tasks: usize) -> Self {
11        Self {
12            max_concurrent_tasks,
13            semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent_tasks)),
14        }
15    }
16
17    async fn call<F, T, E>(&self, func: F) -> Result<T, E>
18    where
19        F: FnOnce() -> Result<T, E> + Send + 'static,
20        T: Send + 'static,
21        E: From<&'static str> + Send + 'static,
22    {
23        let permit = self.semaphore.acquire().await.unwrap();
24        let result = func();
25        drop(permit);
26        result
27    }
28}
29
30#[cfg(test)]
31mod tests {
32    use super::*;
33    use std::sync::atomic::{AtomicUsize, Ordering};
34    use std::sync::Arc;
35
36    #[tokio::test]
37    async fn test_bulkhead() {
38    let bulkhead = Arc::new(Bulkhead::new(3));
39
40    let handles: Vec<_> = (0..10)
41        .map(|i| {
42            let bulkhead = bulkhead.clone();
43            tokio::spawn(async move {
44                let task = move || -> Result<(), &'static str> {
45                    // Simulate a task
46                    println!("Task {} is running", i);
47                    thread::sleep(Duration::from_secs(2));
48                    println!("Task {} is done", i);
49                    Ok(())
50                };
51
52                match bulkhead.call(task).await {
53                    Ok(_) => println!("Task {} succeeded", i),
54                    Err(err) => println!("Task {} failed with error: {}", i, err),
55                }
56            })
57        })
58        .collect();
59
60    futures::future::join_all(handles).await;
61}
62}
63