actionqueue_engine/concurrency/
key_gate.rs1use std::fmt::{Display, Formatter};
24
25use actionqueue_core::ids::RunId;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct ConcurrencyKey(String);
30
31impl ConcurrencyKey {
32 pub fn new(key: impl Into<String>) -> Self {
36 let value = key.into();
37 assert!(!value.is_empty(), "ConcurrencyKey must not be empty");
38 Self(value)
39 }
40
41 pub fn as_str(&self) -> &str {
43 &self.0
44 }
45}
46
47impl From<String> for ConcurrencyKey {
48 fn from(value: String) -> Self {
49 Self::new(value)
50 }
51}
52
53impl From<&str> for ConcurrencyKey {
54 fn from(value: &str) -> Self {
55 Self::new(value.to_owned())
56 }
57}
58
59impl Display for ConcurrencyKey {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0)
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67#[must_use]
68pub enum AcquireResult {
69 Acquired {
71 key: ConcurrencyKey,
73 run_id: RunId,
75 },
76 Occupied {
78 key: ConcurrencyKey,
80 holder_run_id: RunId,
82 },
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87#[must_use]
88pub enum ReleaseResult {
89 Released {
91 key: ConcurrencyKey,
93 },
94 NotHeld {
96 key: ConcurrencyKey,
98 attempting_run_id: RunId,
100 },
101}
102
103#[derive(Debug, Clone, Default)]
105pub struct KeyGate {
106 occupied_keys: std::collections::HashMap<ConcurrencyKey, RunId>,
108}
109
110impl KeyGate {
111 pub fn new() -> Self {
121 Self { occupied_keys: std::collections::HashMap::new() }
122 }
123
124 pub fn acquire(&mut self, key: ConcurrencyKey, run_id: RunId) -> AcquireResult {
129 if let Some(holder) = self.occupied_keys.get(&key) {
130 if *holder == run_id {
131 tracing::debug!(%key, %run_id, "concurrency key re-acquired by same run");
133 AcquireResult::Acquired { key, run_id }
134 } else {
135 tracing::debug!(%key, %run_id, holder = %holder, "concurrency key occupied");
137 AcquireResult::Occupied { key, holder_run_id: *holder }
138 }
139 } else {
140 tracing::debug!(%key, %run_id, "concurrency key acquired");
142 self.occupied_keys.insert(key.clone(), run_id);
143 AcquireResult::Acquired { key, run_id }
144 }
145 }
146
147 pub fn release(&mut self, key: ConcurrencyKey, run_id: RunId) -> ReleaseResult {
153 match self.occupied_keys.get(&key) {
154 Some(holder) if *holder == run_id => {
155 self.occupied_keys.remove(&key);
156 tracing::debug!(%key, %run_id, "concurrency key released");
157 ReleaseResult::Released { key }
158 }
159 Some(_holder) => {
160 ReleaseResult::NotHeld { key, attempting_run_id: run_id }
162 }
163 None => {
164 ReleaseResult::NotHeld { key, attempting_run_id: run_id }
166 }
167 }
168 }
169
170 pub fn is_key_occupied(&self, key: &ConcurrencyKey) -> bool {
172 self.occupied_keys.contains_key(key)
173 }
174
175 pub fn key_holder(&self, key: &ConcurrencyKey) -> Option<RunId> {
177 self.occupied_keys.get(key).copied()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[test]
186 fn acquire_succeeds_when_key_is_free() {
187 let mut gate = KeyGate::new();
188 let key = ConcurrencyKey::new("my-key");
189 let run_id = RunId::new();
190
191 let result = gate.acquire(key.clone(), run_id);
192
193 match result {
194 AcquireResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
195 assert_eq!(acquired_key, key);
196 assert_eq!(acquired_run_id, run_id);
197 }
198 AcquireResult::Occupied { .. } => panic!("Expected acquisition to succeed"),
199 }
200
201 assert!(gate.is_key_occupied(&key));
202 assert_eq!(gate.key_holder(&key), Some(run_id));
203 }
204
205 #[test]
206 fn acquire_fails_when_key_is_occupied_by_different_run() {
207 let mut gate = KeyGate::new();
208 let key = ConcurrencyKey::new("my-key");
209 let holder_run_id = RunId::new();
210 let requesting_run_id = RunId::new();
211
212 let _ = gate.acquire(key.clone(), holder_run_id);
214
215 let result = gate.acquire(key.clone(), requesting_run_id);
217
218 match result {
219 AcquireResult::Occupied { key: occupied_key, holder_run_id: occupied_holder } => {
220 assert_eq!(occupied_key, key);
221 assert_eq!(occupied_holder, holder_run_id);
222 }
223 AcquireResult::Acquired { .. } => panic!("Expected acquisition to fail"),
224 }
225
226 assert_eq!(gate.key_holder(&key), Some(holder_run_id));
228 }
229
230 #[test]
231 fn acquire_succeeds_when_same_run_reacquires_key() {
232 let mut gate = KeyGate::new();
233 let key = ConcurrencyKey::new("my-key");
234 let run_id = RunId::new();
235
236 let _ = gate.acquire(key.clone(), run_id);
238
239 let result = gate.acquire(key.clone(), run_id);
241
242 match result {
243 AcquireResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
244 assert_eq!(acquired_key, key);
245 assert_eq!(acquired_run_id, run_id);
246 }
247 AcquireResult::Occupied { .. } => panic!("Expected re-acquisition to succeed"),
248 }
249
250 assert!(gate.is_key_occupied(&key));
251 assert_eq!(gate.key_holder(&key), Some(run_id));
252 }
253
254 #[test]
255 fn release_releases_key_held_by_same_run() {
256 let mut gate = KeyGate::new();
257 let key = ConcurrencyKey::new("my-key");
258 let run_id = RunId::new();
259
260 let _ = gate.acquire(key.clone(), run_id);
262
263 let result = gate.release(key.clone(), run_id);
265
266 match result {
267 ReleaseResult::Released { key: released_key } => {
268 assert_eq!(released_key, key);
269 }
270 ReleaseResult::NotHeld { .. } => panic!("Expected release to succeed"),
271 }
272
273 assert!(!gate.is_key_occupied(&key));
274 assert_eq!(gate.key_holder(&key), None);
275 }
276
277 #[test]
278 fn release_fails_when_key_held_by_different_run() {
279 let mut gate = KeyGate::new();
280 let key = ConcurrencyKey::new("my-key");
281 let holder_run_id = RunId::new();
282 let attempting_run_id = RunId::new();
283
284 let _ = gate.acquire(key.clone(), holder_run_id);
286
287 let result = gate.release(key.clone(), attempting_run_id);
289
290 match result {
291 ReleaseResult::NotHeld { key: released_key, attempting_run_id: attempted_run_id } => {
292 assert_eq!(released_key, key);
293 assert_eq!(attempted_run_id, attempting_run_id);
294 }
295 ReleaseResult::Released { .. } => panic!("Expected release to fail"),
296 }
297
298 assert!(gate.is_key_occupied(&key));
300 assert_eq!(gate.key_holder(&key), Some(holder_run_id));
301 }
302
303 #[test]
304 fn release_has_no_effect_when_key_is_free() {
305 let mut gate = KeyGate::new();
306 let key = ConcurrencyKey::new("my-key");
307 let run_id = RunId::new();
308
309 let result = gate.release(key.clone(), run_id);
311
312 match result {
313 ReleaseResult::NotHeld { .. } => {}
314 ReleaseResult::Released { .. } => panic!("Expected release to have no effect"),
315 }
316
317 assert!(!gate.is_key_occupied(&key));
318 }
319
320 #[test]
321 fn different_keys_can_be_occupied_simultaneously() {
322 let mut gate = KeyGate::new();
323 let key1 = ConcurrencyKey::new("key-1");
324 let key2 = ConcurrencyKey::new("key-2");
325 let run1_id = RunId::new();
326 let run2_id = RunId::new();
327
328 let _ = gate.acquire(key1.clone(), run1_id);
330 let _ = gate.acquire(key2.clone(), run2_id);
331
332 assert!(gate.is_key_occupied(&key1));
334 assert!(gate.is_key_occupied(&key2));
335 assert_eq!(gate.key_holder(&key1), Some(run1_id));
336 assert_eq!(gate.key_holder(&key2), Some(run2_id));
337 }
338}