mocra 0.3.0

A distributed, event-driven crawling and data collection framework
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};

use async_trait::async_trait;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;

use super::types::{DagError, DagRunGuard, DagRunGuardAcquireOutcome};

#[derive(Default)]
pub struct InMemoryDagRunGuard {
    locks: DashMap<String, (String, u64)>,
    pub renew_count: AtomicUsize,
    next_fencing_token: AtomicU64,
}

#[async_trait]
impl DagRunGuard for InMemoryDagRunGuard {
    async fn try_acquire(
        &self,
        lock_key: &str,
        owner: &str,
        _ttl_ms: u64,
    ) -> Result<DagRunGuardAcquireOutcome, DagError> {
        match self.locks.entry(lock_key.to_string()) {
            Entry::Occupied(existing) => {
                let (current_owner, token) = existing.get();
                Ok(DagRunGuardAcquireOutcome {
                    acquired: current_owner == owner,
                    fencing_token: Some(*token),
                })
            }
            Entry::Vacant(slot) => {
                let token = self
                    .next_fencing_token
                    .fetch_add(1, Ordering::SeqCst)
                    .saturating_add(1);
                slot.insert((owner.to_string(), token));
                Ok(DagRunGuardAcquireOutcome {
                    acquired: true,
                    fencing_token: Some(token),
                })
            }
        }
    }

    async fn renew(&self, lock_key: &str, owner: &str, _ttl_ms: u64) -> Result<bool, DagError> {
        self.renew_count.fetch_add(1, Ordering::SeqCst);
        let ok = self
            .locks
            .get(lock_key)
            .map(|entry| entry.value().0 == owner)
            .unwrap_or(false);
        Ok(ok)
    }

    async fn release(&self, lock_key: &str, owner: &str) -> Result<(), DagError> {
        if let Some(current) = self.locks.get(lock_key)
            && current.value().0 != owner
        {
            return Ok(());
        }
        self.locks.remove(lock_key);
        Ok(())
    }
}