Skip to main content

opal/pipeline/
resource_group.rs

1use anyhow::{Context, Result};
2use std::fs::{self, OpenOptions};
3use std::io::Write;
4use std::path::PathBuf;
5
6#[derive(Debug, Clone)]
7pub struct ResourceGroupManager {
8    root: PathBuf,
9}
10
11impl ResourceGroupManager {
12    pub fn new(root: PathBuf) -> Self {
13        Self { root }
14    }
15
16    pub fn try_acquire(&self, group: &str, owner: &str) -> Result<bool> {
17        fs::create_dir_all(&self.root)
18            .with_context(|| format!("failed to create {}", self.root.display()))?;
19        let path = self.lock_path(group);
20        match OpenOptions::new().write(true).create_new(true).open(&path) {
21            Ok(mut file) => {
22                writeln!(file, "{owner}")
23                    .with_context(|| format!("failed to write {}", path.display()))?;
24                Ok(true)
25            }
26            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => Ok(false),
27            Err(err) => Err(err).with_context(|| format!("failed to open {}", path.display())),
28        }
29    }
30
31    pub fn release(&self, group: &str) -> Result<()> {
32        let path = self.lock_path(group);
33        if !path.exists() {
34            return Ok(());
35        }
36        fs::remove_file(&path).with_context(|| format!("failed to remove {}", path.display()))
37    }
38
39    fn lock_path(&self, group: &str) -> PathBuf {
40        self.root
41            .join(format!("{}.lock", sanitize_group_name(group)))
42    }
43}
44
45fn sanitize_group_name(group: &str) -> String {
46    let mut slug = String::new();
47    for ch in group.chars() {
48        if ch.is_ascii_alphanumeric() {
49            slug.push(ch.to_ascii_lowercase());
50        } else if matches!(ch, '-' | '_' | '.') {
51            slug.push('-');
52        }
53    }
54    if slug.is_empty() {
55        slug.push_str("resource-group");
56    }
57    slug
58}
59
60#[cfg(test)]
61mod tests {
62    use super::ResourceGroupManager;
63    use tempfile::tempdir;
64
65    #[test]
66    fn resource_group_manager_blocks_second_acquire() {
67        let dir = tempdir().expect("tempdir");
68        let manager = ResourceGroupManager::new(dir.path().join("locks"));
69
70        assert!(manager.try_acquire("prod-lock", "job-a").unwrap());
71        assert!(!manager.try_acquire("prod-lock", "job-b").unwrap());
72        manager.release("prod-lock").unwrap();
73        assert!(manager.try_acquire("prod-lock", "job-b").unwrap());
74    }
75}