coil-runtime 0.1.1

HTTP runtime and request handling for the Coil framework.
Documentation
use super::*;

#[cfg(test)]
pub(crate) fn shared_cache_runtime_for_test(
    backend: coil_cache::CacheBackendKind,
    namespace: String,
) -> std::sync::Arc<dyn coil_cache::DistributedCacheRuntime> {
    use std::collections::BTreeMap;
    use std::sync::{Arc, Mutex, OnceLock};

    static REGISTRY: OnceLock<
        Mutex<BTreeMap<String, Arc<dyn coil_cache::DistributedCacheRuntime>>>,
    > = OnceLock::new();

    let key = format!("{backend:?}:{namespace}");
    let registry = REGISTRY.get_or_init(|| Mutex::new(BTreeMap::new()));
    let mut guard = registry.lock().expect("test cache registry mutex poisoned");
    guard
        .entry(key)
        .or_insert_with(|| {
            Arc::new(SharedCacheRuntimeHarness::new(
                coil_cache::DistributedCacheClient::emulated_shared_runtime(backend),
            ))
        })
        .clone()
}

#[cfg(test)]
pub(crate) fn shared_jobs_runtime_for_test(
    runtime: &JobsRuntimeServices,
    namespace: String,
) -> std::sync::Arc<dyn coil_jobs::JobsCoordinationRuntime> {
    use std::collections::BTreeMap;
    use std::sync::{Arc, Mutex, OnceLock};

    static REGISTRY: OnceLock<
        Mutex<BTreeMap<String, Arc<dyn coil_jobs::JobsCoordinationRuntime>>>,
    > = OnceLock::new();

    let key = format!(
        "{:?}:{}:{}:{}:{}:{}",
        runtime.backend,
        runtime.topology.work_queue.as_str(),
        runtime.topology.scheduled_queue.as_str(),
        runtime.topology.domain_events_queue.as_str(),
        runtime.topology.dead_letter_queue.as_str(),
        namespace
    );
    let registry = REGISTRY.get_or_init(|| Mutex::new(BTreeMap::new()));
    let mut guard = registry.lock().expect("test jobs registry mutex poisoned");
    guard
        .entry(key)
        .or_insert_with(|| {
            Arc::new(SharedJobsRuntimeHarness::new(
                coil_jobs::JobsBackendAdapter::emulated_shared_runtime(runtime),
            ))
        })
        .clone()
}

#[cfg(test)]
#[derive(Clone)]
struct SharedCacheRuntimeHarness {
    runtime: std::sync::Arc<dyn coil_cache::DistributedCacheRuntime>,
}

#[cfg(test)]
impl SharedCacheRuntimeHarness {
    fn new(runtime: std::sync::Arc<dyn coil_cache::DistributedCacheRuntime>) -> Self {
        Self { runtime }
    }
}

#[cfg(test)]
impl coil_cache::DistributedCacheRuntime for SharedCacheRuntimeHarness {
    fn insert(&self, entry: coil_cache::CacheEntry) {
        self.runtime.insert(entry);
    }

    fn lookup(
        &self,
        key: &coil_cache::CacheKey,
        now: coil_cache::CacheInstant,
    ) -> coil_cache::CacheLookup {
        self.runtime.lookup(key, now)
    }

    fn invalidate(&self, tags: &coil_cache::InvalidationSet) -> Vec<coil_cache::CacheKey> {
        self.runtime.invalidate(tags)
    }

    fn begin_fill(
        &self,
        key: &coil_cache::CacheKey,
        mode: coil_cache::RequestCoalescingMode,
        holder: String,
    ) -> coil_cache::FillDecision {
        self.runtime.begin_fill(key, mode, holder)
    }

    fn complete_fill(
        &self,
        lease: &coil_cache::FillLease,
    ) -> Result<(), coil_cache::CacheModelError> {
        self.runtime.complete_fill(lease)
    }

    fn metrics(&self) -> coil_cache::CacheMetrics {
        self.runtime.metrics()
    }

    fn is_shared_backend(&self) -> bool {
        true
    }
}

#[cfg(test)]
#[derive(Clone)]
struct SharedJobsRuntimeHarness {
    runtime: std::sync::Arc<dyn coil_jobs::JobsCoordinationRuntime>,
}

#[cfg(test)]
impl SharedJobsRuntimeHarness {
    fn new(runtime: std::sync::Arc<dyn coil_jobs::JobsCoordinationRuntime>) -> Self {
        Self { runtime }
    }
}

#[cfg(test)]
impl coil_jobs::JobsCoordinationRuntime for SharedJobsRuntimeHarness {
    fn snapshot(&self) -> coil_jobs::JobsCoordinatorSnapshot {
        self.runtime.snapshot()
    }

    fn enqueue(
        &self,
        spec: coil_jobs::JobSpec,
        now: coil_jobs::JobInstant,
    ) -> Result<(), coil_jobs::JobsModelError> {
        self.runtime.enqueue(spec, now)
    }

    fn retry_dead_letter(
        &self,
        dead_letter_id: &coil_jobs::DeadLetterId,
        now: coil_jobs::JobInstant,
    ) -> Result<coil_jobs::QueuedJobRecord, coil_jobs::JobsModelError> {
        self.runtime.retry_dead_letter(dead_letter_id, now)
    }

    fn acquire_scheduler_leadership(
        &self,
        node_id: String,
        now: coil_jobs::JobInstant,
        lease_ttl: std::time::Duration,
    ) -> Result<coil_jobs::SchedulerLeadership, coil_jobs::JobsModelError> {
        self.runtime
            .acquire_scheduler_leadership(node_id, now, lease_ttl)
    }

    fn promote_due_jobs(
        &self,
        node_id: &str,
        now: coil_jobs::JobInstant,
    ) -> Result<Vec<coil_jobs::JobId>, coil_jobs::JobsModelError> {
        self.runtime.promote_due_jobs(node_id, now)
    }

    fn lease_ready_jobs(
        &self,
        queue: &coil_jobs::JobQueueName,
        worker_id: String,
        now: coil_jobs::JobInstant,
        lease_ttl: std::time::Duration,
        max_jobs: usize,
    ) -> Result<Vec<coil_jobs::JobLease>, coil_jobs::JobsModelError> {
        self.runtime
            .lease_ready_jobs(queue, worker_id, now, lease_ttl, max_jobs)
    }

    fn acknowledge_completed(
        &self,
        lease: &coil_jobs::JobLease,
        now: coil_jobs::JobInstant,
    ) -> Result<(), coil_jobs::JobsModelError> {
        self.runtime.acknowledge_completed(lease, now)
    }

    fn acknowledge_failed(
        &self,
        lease: &coil_jobs::JobLease,
        now: coil_jobs::JobInstant,
        reason: coil_jobs::DeadLetterReason,
        error_message: String,
    ) -> Result<coil_jobs::JobFailureDisposition, coil_jobs::JobsModelError> {
        self.runtime
            .acknowledge_failed(lease, now, reason, error_message)
    }

    fn is_shared_backend(&self) -> bool {
        true
    }
}