Skip to main content

actionqueue_engine/concurrency/
key_gate.rs

1//! Concurrency key gate for single-flight execution control.
2//!
3//! This module defines typed concurrency key primitives and the gate logic
4//! that prevents runs with the same concurrency key from executing in parallel.
5//!
6//! # Overview
7//!
8//! A concurrency key is an optional string attached to a task that ensures
9//! only one run with that key can be in the Running state at a time.
10//!
11//! ## Key States
12//!
13//! - **Free**: No run currently holds the key.
14//! - **Occupied**: A run is currently holding the key and executing.
15//!
16//! ## Gate Operations
17//!
18//! - **Acquire**: Attempts to acquire a key for a run. Returns `AcquireResult::Acquired`
19//!   if the key is free, or `AcquireResult::Occupied` if another run holds it.
20//! - **Release**: Releases a key, making it available for other runs.
21//! - **Query**: Checking whether a key is currently occupied.
22
23use std::fmt::{Display, Formatter};
24
25use actionqueue_core::ids::RunId;
26
27/// A typed concurrency key that ensures runs with the same key don't run in parallel.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct ConcurrencyKey(String);
30
31impl ConcurrencyKey {
32    /// Creates a new concurrency key from a string.
33    ///
34    /// In debug builds, panics if the value is empty.
35    pub fn new(key: impl Into<String>) -> Self {
36        let value = key.into();
37        assert!(!value.is_empty(), "ConcurrencyKey must not be empty");
38        Self(value)
39    }
40
41    /// Returns the key as a string slice.
42    pub fn as_str(&self) -> &str {
43        &self.0
44    }
45}
46
47impl From<String> for ConcurrencyKey {
48    fn from(value: String) -> Self {
49        Self::new(value)
50    }
51}
52
53impl From<&str> for ConcurrencyKey {
54    fn from(value: &str) -> Self {
55        Self::new(value.to_owned())
56    }
57}
58
59impl Display for ConcurrencyKey {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0)
62    }
63}
64
65/// Result of a concurrency key acquisition attempt.
66#[derive(Debug, Clone, PartialEq, Eq)]
67#[must_use]
68pub enum AcquireResult {
69    /// The key was successfully acquired for the run.
70    Acquired {
71        /// The concurrency key that was acquired.
72        key: ConcurrencyKey,
73        /// The run that now holds the key.
74        run_id: RunId,
75    },
76    /// The key is already occupied by another run.
77    Occupied {
78        /// The concurrency key that is occupied.
79        key: ConcurrencyKey,
80        /// The run that currently holds the key.
81        holder_run_id: RunId,
82    },
83}
84
85/// Result of a concurrency key release attempt.
86#[derive(Debug, Clone, PartialEq, Eq)]
87#[must_use]
88pub enum ReleaseResult {
89    /// The key was successfully released.
90    Released {
91        /// The concurrency key that was released.
92        key: ConcurrencyKey,
93    },
94    /// The key was not held by the specified run, or was already free.
95    NotHeld {
96        /// The concurrency key that was attempted to release.
97        key: ConcurrencyKey,
98        /// The run that attempted to release the key.
99        attempting_run_id: RunId,
100    },
101}
102
103/// A key gate state that tracks which run (if any) holds each concurrency key.
104#[derive(Debug, Clone, Default)]
105pub struct KeyGate {
106    /// Maps each concurrency key to the run that currently holds it.
107    occupied_keys: std::collections::HashMap<ConcurrencyKey, RunId>,
108}
109
110impl KeyGate {
111    /// Creates a new empty key gate.
112    ///
113    /// Empty initialization is safe post-recovery because the dispatch loop is
114    /// single-threaded and acquires concurrency keys at dispatch time (step 6 of
115    /// the tick cycle), not at recovery time. Runs that were in-flight when a
116    /// crash occurred recover to the Ready state via the uncertainty clause and
117    /// must re-acquire their concurrency keys before being dispatched again.
118    /// This means there is no window where a recovered run could bypass the
119    /// key gate.
120    pub fn new() -> Self {
121        Self { occupied_keys: std::collections::HashMap::new() }
122    }
123
124    /// Attempts to acquire a concurrency key for a run.
125    ///
126    /// Returns `AcquireResult::Acquired` if the key is free or already held by
127    /// the same run. Returns `AcquireResult::Occupied` if another run holds the key.
128    pub fn acquire(&mut self, key: ConcurrencyKey, run_id: RunId) -> AcquireResult {
129        if let Some(holder) = self.occupied_keys.get(&key) {
130            if *holder == run_id {
131                // Already held by this run
132                tracing::debug!(%key, %run_id, "concurrency key re-acquired by same run");
133                AcquireResult::Acquired { key, run_id }
134            } else {
135                // Key is held by another run
136                tracing::debug!(%key, %run_id, holder = %holder, "concurrency key occupied");
137                AcquireResult::Occupied { key, holder_run_id: *holder }
138            }
139        } else {
140            // Key is free, acquire it
141            tracing::debug!(%key, %run_id, "concurrency key acquired");
142            self.occupied_keys.insert(key.clone(), run_id);
143            AcquireResult::Acquired { key, run_id }
144        }
145    }
146
147    /// Attempts to release a concurrency key from a run.
148    ///
149    /// Returns `ReleaseResult::Released` if the key was held by the specified run.
150    /// Returns `ReleaseResult::NotHeld` if the key was not held by the run
151    /// or was already free.
152    pub fn release(&mut self, key: ConcurrencyKey, run_id: RunId) -> ReleaseResult {
153        match self.occupied_keys.get(&key) {
154            Some(holder) if *holder == run_id => {
155                self.occupied_keys.remove(&key);
156                tracing::debug!(%key, %run_id, "concurrency key released");
157                ReleaseResult::Released { key }
158            }
159            Some(_holder) => {
160                // Key is held by a different run
161                ReleaseResult::NotHeld { key, attempting_run_id: run_id }
162            }
163            None => {
164                // Key is already free
165                ReleaseResult::NotHeld { key, attempting_run_id: run_id }
166            }
167        }
168    }
169
170    /// Queries whether a concurrency key is currently occupied.
171    pub fn is_key_occupied(&self, key: &ConcurrencyKey) -> bool {
172        self.occupied_keys.contains_key(key)
173    }
174
175    /// Returns the run that currently holds the key, if any.
176    pub fn key_holder(&self, key: &ConcurrencyKey) -> Option<RunId> {
177        self.occupied_keys.get(key).copied()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn acquire_succeeds_when_key_is_free() {
187        let mut gate = KeyGate::new();
188        let key = ConcurrencyKey::new("my-key");
189        let run_id = RunId::new();
190
191        let result = gate.acquire(key.clone(), run_id);
192
193        match result {
194            AcquireResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
195                assert_eq!(acquired_key, key);
196                assert_eq!(acquired_run_id, run_id);
197            }
198            AcquireResult::Occupied { .. } => panic!("Expected acquisition to succeed"),
199        }
200
201        assert!(gate.is_key_occupied(&key));
202        assert_eq!(gate.key_holder(&key), Some(run_id));
203    }
204
205    #[test]
206    fn acquire_fails_when_key_is_occupied_by_different_run() {
207        let mut gate = KeyGate::new();
208        let key = ConcurrencyKey::new("my-key");
209        let holder_run_id = RunId::new();
210        let requesting_run_id = RunId::new();
211
212        // First run acquires the key
213        let _ = gate.acquire(key.clone(), holder_run_id);
214
215        // Second run tries to acquire the same key
216        let result = gate.acquire(key.clone(), requesting_run_id);
217
218        match result {
219            AcquireResult::Occupied { key: occupied_key, holder_run_id: occupied_holder } => {
220                assert_eq!(occupied_key, key);
221                assert_eq!(occupied_holder, holder_run_id);
222            }
223            AcquireResult::Acquired { .. } => panic!("Expected acquisition to fail"),
224        }
225
226        // The holder should remain unchanged
227        assert_eq!(gate.key_holder(&key), Some(holder_run_id));
228    }
229
230    #[test]
231    fn acquire_succeeds_when_same_run_reacquires_key() {
232        let mut gate = KeyGate::new();
233        let key = ConcurrencyKey::new("my-key");
234        let run_id = RunId::new();
235
236        // First acquisition
237        let _ = gate.acquire(key.clone(), run_id);
238
239        // Same run tries to acquire again (should still succeed, re-entrant)
240        let result = gate.acquire(key.clone(), run_id);
241
242        match result {
243            AcquireResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
244                assert_eq!(acquired_key, key);
245                assert_eq!(acquired_run_id, run_id);
246            }
247            AcquireResult::Occupied { .. } => panic!("Expected re-acquisition to succeed"),
248        }
249
250        assert!(gate.is_key_occupied(&key));
251        assert_eq!(gate.key_holder(&key), Some(run_id));
252    }
253
254    #[test]
255    fn release_releases_key_held_by_same_run() {
256        let mut gate = KeyGate::new();
257        let key = ConcurrencyKey::new("my-key");
258        let run_id = RunId::new();
259
260        // Acquire the key
261        let _ = gate.acquire(key.clone(), run_id);
262
263        // Release the key
264        let result = gate.release(key.clone(), run_id);
265
266        match result {
267            ReleaseResult::Released { key: released_key } => {
268                assert_eq!(released_key, key);
269            }
270            ReleaseResult::NotHeld { .. } => panic!("Expected release to succeed"),
271        }
272
273        assert!(!gate.is_key_occupied(&key));
274        assert_eq!(gate.key_holder(&key), None);
275    }
276
277    #[test]
278    fn release_fails_when_key_held_by_different_run() {
279        let mut gate = KeyGate::new();
280        let key = ConcurrencyKey::new("my-key");
281        let holder_run_id = RunId::new();
282        let attempting_run_id = RunId::new();
283
284        // Different run acquires the key
285        let _ = gate.acquire(key.clone(), holder_run_id);
286
287        // Different run tries to release
288        let result = gate.release(key.clone(), attempting_run_id);
289
290        match result {
291            ReleaseResult::NotHeld { key: released_key, attempting_run_id: attempted_run_id } => {
292                assert_eq!(released_key, key);
293                assert_eq!(attempted_run_id, attempting_run_id);
294            }
295            ReleaseResult::Released { .. } => panic!("Expected release to fail"),
296        }
297
298        // The key should still be held by the original holder
299        assert!(gate.is_key_occupied(&key));
300        assert_eq!(gate.key_holder(&key), Some(holder_run_id));
301    }
302
303    #[test]
304    fn release_has_no_effect_when_key_is_free() {
305        let mut gate = KeyGate::new();
306        let key = ConcurrencyKey::new("my-key");
307        let run_id = RunId::new();
308
309        // Try to release a free key
310        let result = gate.release(key.clone(), run_id);
311
312        match result {
313            ReleaseResult::NotHeld { .. } => {}
314            ReleaseResult::Released { .. } => panic!("Expected release to have no effect"),
315        }
316
317        assert!(!gate.is_key_occupied(&key));
318    }
319
320    #[test]
321    fn different_keys_can_be_occupied_simultaneously() {
322        let mut gate = KeyGate::new();
323        let key1 = ConcurrencyKey::new("key-1");
324        let key2 = ConcurrencyKey::new("key-2");
325        let run1_id = RunId::new();
326        let run2_id = RunId::new();
327
328        // Acquire different keys
329        let _ = gate.acquire(key1.clone(), run1_id);
330        let _ = gate.acquire(key2.clone(), run2_id);
331
332        // Both keys should be occupied
333        assert!(gate.is_key_occupied(&key1));
334        assert!(gate.is_key_occupied(&key2));
335        assert_eq!(gate.key_holder(&key1), Some(run1_id));
336        assert_eq!(gate.key_holder(&key2), Some(run2_id));
337    }
338}