1#![crate_name = "semaphore_key"]
2
3use log::{trace};
4use once_cell::sync::Lazy;
5use std::thread;
6use std::{collections::HashMap, sync::Arc};
7use tokio::sync::{RwLock, Semaphore};
8use tokio::task;
9
10static SEMAPHORE_MAP: Lazy<RwLock<HashMap<String, Arc<Semaphore>>>> =
11 Lazy::new(|| RwLock::new(HashMap::new()));
12
13pub struct SemaphoreKey {}
15
16impl SemaphoreKey {
17 pub async fn get_or_create_semaphore(key: &String, allowed_concurrent_threads: usize) -> Arc<Semaphore> {
26
27 trace!("Thread:{:?} request semaphore for key={} allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
28
29 let option_semaphore = SemaphoreKey::get_semaphore_if_exists_read_guard(key).await;
30
31 let semaphore = if let Some(semaphore) = option_semaphore {
32 trace!("Thread:{:?} semaphore exists for key={}", thread::current().id(), key);
33 semaphore
34 } else {
35 SemaphoreKey::create_new_semaphore_by_key(key, allowed_concurrent_threads).await
36 };
37
38 return semaphore;
39 }
40
41 pub async fn remove_if_exists(key: &String) -> Option<Arc<Semaphore>> {
48
49 trace!("Thread:{:?} remove semaphore for key={}", thread::current().id(), key);
50
51 let option_arc_semaphore: Option<Arc<Semaphore>>;
52
53 {
55 let mut write_guard = SEMAPHORE_MAP.write().await;
56 option_arc_semaphore = (write_guard).remove(key);
57 }
58 task::yield_now().await;
63
64 option_arc_semaphore
65 }
66
67 async fn get_semaphore_if_exists_read_guard(key: &String) -> Option<Arc<Semaphore>> {
68 let mut result: Option<Arc<Semaphore>> = None;
69
70 let read_guard = SEMAPHORE_MAP.read().await;
71 let option_arc_semaphore = (read_guard).get(key);
72
73 if let Some(arc_semaphore) = option_arc_semaphore {
74 let new_arc_semaphore = arc_semaphore.clone();
75 result = Some(new_arc_semaphore);
76 }
77
78 return result;
79 }
80
81 async fn create_new_semaphore_by_key(key: &String, allowed_concurrent_threads: usize) -> Arc<Semaphore> {
82
83 trace!("Thread:{:?} create new semaphore for key={} allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
84
85 let semaphore: Arc<Semaphore>;
86
87 {
89 let mut write_guard = SEMAPHORE_MAP.write().await;
90
91 let option_arc_semaphore = (write_guard).get(key);
93
94 if let Some(semaphore) = option_arc_semaphore {
95
96 task::yield_now().await;
98
99 return semaphore.clone();
100 }
101
102 trace!("Thread:{:?} create a new semaphore for key={} with allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
103
104 semaphore = Arc::new(Semaphore::new(allowed_concurrent_threads));
105
106 write_guard.insert(key.clone(), semaphore.clone()); }
108
109 task::yield_now().await;
115
116 return semaphore;
117 }
118}