semaphore_key/
lib.rs

1#![crate_name = "semaphore_key"]
2
3use log::{trace};
4use once_cell::sync::Lazy;
5use std::thread;
6use std::{collections::HashMap, sync::Arc};
7use tokio::sync::{RwLock, Semaphore};
8use tokio::task;
9
10static SEMAPHORE_MAP: Lazy<RwLock<HashMap<String, Arc<Semaphore>>>> =
11    Lazy::new(|| RwLock::new(HashMap::new()));
12
13/// The SemaphoreKey struct holds an implementation to be used to for the Semaphore by key functionality.
14pub struct SemaphoreKey {}
15
16impl SemaphoreKey {
17    /// Gets or creates a semaphore wrapped in an Arc by the provided key
18    ///
19    /// # Arguments
20    ///
21    /// * `key` - The key to get an existing or create a new semaphore by
22    ///
23    /// * `allowed_concurrent_threads` - Used when creating a new semaphore (if an existing one is not found by key),
24    /// to specify how many concurrent threads are allowed access.
25    pub async fn get_or_create_semaphore(key: &String, allowed_concurrent_threads: usize) -> Arc<Semaphore> {
26
27        trace!("Thread:{:?} request semaphore for key={} allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
28
29        let option_semaphore = SemaphoreKey::get_semaphore_if_exists_read_guard(key).await;
30
31        let semaphore = if let Some(semaphore) = option_semaphore {
32            trace!("Thread:{:?} semaphore exists for key={}", thread::current().id(), key);
33            semaphore
34        } else {
35            SemaphoreKey::create_new_semaphore_by_key(key, allowed_concurrent_threads).await
36        };
37
38        return semaphore;
39    }
40
41    /// Removes a semaphore from the internal map if it exists, and returns it wrapped in an Arc
42    ///
43    /// # Arguments
44    ///
45    /// * `key` - The key to get an existing semaphore by
46    ///
47    pub async fn remove_if_exists(key: &String) -> Option<Arc<Semaphore>> {
48
49        trace!("Thread:{:?} remove semaphore for key={}", thread::current().id(), key);
50
51        let option_arc_semaphore: Option<Arc<Semaphore>>;
52
53        //create new scope for the write_guard
54        {
55            let mut write_guard = SEMAPHORE_MAP.write().await;
56            option_arc_semaphore = (write_guard).remove(key);
57        }
58        //write_guard goes out of scope here
59
60        //yield control back to the tokio runtime to allow other threads/tasks,
61        //waiting for the write lock to continue
62        task::yield_now().await;
63
64        option_arc_semaphore
65    }
66
67    async fn get_semaphore_if_exists_read_guard(key: &String) -> Option<Arc<Semaphore>> {
68        let mut result: Option<Arc<Semaphore>> = None;
69
70        let read_guard = SEMAPHORE_MAP.read().await;
71        let option_arc_semaphore = (read_guard).get(key);
72
73        if let Some(arc_semaphore) = option_arc_semaphore {
74            let new_arc_semaphore = arc_semaphore.clone();
75            result = Some(new_arc_semaphore);
76        }
77
78        return result;
79    }
80
81    async fn create_new_semaphore_by_key(key: &String, allowed_concurrent_threads: usize) -> Arc<Semaphore> {
82
83        trace!("Thread:{:?} create new semaphore for key={} allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
84
85        let semaphore: Arc<Semaphore>;
86        
87        //use new scope for write_guard
88        {
89            let mut write_guard = SEMAPHORE_MAP.write().await;
90
91            //perform another check in write local before creating a new semaphore
92            let option_arc_semaphore = (write_guard).get(key);
93
94            if let Some(semaphore) = option_arc_semaphore {
95
96                //yield control back to the tokio runtime to allow other threads/tasks to continue
97                task::yield_now().await;
98
99                return semaphore.clone();
100            }
101
102            trace!("Thread:{:?} create a new semaphore for key={} with allowed threads={}", thread::current().id(), key, allowed_concurrent_threads);
103
104            semaphore = Arc::new(Semaphore::new(allowed_concurrent_threads));
105
106            write_guard.insert(key.clone(), semaphore.clone()); //insert a reference into hashmap
107        }
108
109        //The write guard goes out of scope here.
110        //Now that the new key and semaphore has been added,
111        //yield control back to the tokio runtime to allow other waiting threads/tasks,
112        //waiting on the write guard to continue.
113
114        task::yield_now().await;
115
116        return semaphore;
117    }
118}