athena-gateway 3.18.0

Portable gateway request contracts and normalization primitives for Athena
Documentation
//! Fetch singleflight coordination for cache-equivalent gateway reads.
//!
//! The coordinator collapses concurrent fetches that share the same cache key
//! so only one caller performs the backend read while followers wait for the
//! shared result or fall back after a bounded timeout.

use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, Notify};

/// Shared fetch result shape propagated between singleflight leader/follower tasks.
pub type GatewayFetchRowsResult = Result<Vec<Value>, String>;

/// Tracks a single in-flight fetch result and wakes followers when it completes.
pub struct GatewayInFlightFetch {
    result: Mutex<Option<GatewayFetchRowsResult>>,
    notify: Notify,
}

impl GatewayInFlightFetch {
    fn new() -> Self {
        Self {
            result: Mutex::new(None),
            notify: Notify::new(),
        }
    }
}

/// Role assigned when a caller acquires a singleflight slot for a cache key.
pub enum GatewayFetchSingleflightRole {
    /// First caller for a cache key; expected to execute the backend fetch.
    Leader(Arc<GatewayInFlightFetch>),
    /// Subsequent caller waiting on the leader's shared result.
    Follower(Arc<GatewayInFlightFetch>),
}

/// Stateful coordinator that deduplicates concurrent gateway fetches per cache key.
pub struct GatewayFetchSingleflight {
    in_flight: Mutex<HashMap<String, Arc<GatewayInFlightFetch>>>,
    wait_timeout: Duration,
}

impl GatewayFetchSingleflight {
    /// Builds a new fetch singleflight coordinator.
    pub fn new(wait_timeout: Duration) -> Self {
        Self {
            in_flight: Mutex::new(HashMap::new()),
            wait_timeout,
        }
    }

    /// Returns the configured follower wait timeout.
    pub fn wait_timeout(&self) -> Duration {
        self.wait_timeout
    }

    /// Acquires the singleflight slot for `cache_key`.
    pub async fn acquire(&self, cache_key: &str) -> GatewayFetchSingleflightRole {
        let mut in_flight = self.in_flight.lock().await;

        if let Some(existing) = in_flight.get(cache_key) {
            return GatewayFetchSingleflightRole::Follower(existing.clone());
        }

        let flight = Arc::new(GatewayInFlightFetch::new());
        in_flight.insert(cache_key.to_string(), flight.clone());
        GatewayFetchSingleflightRole::Leader(flight)
    }

    /// Publishes the completed result for `cache_key` and wakes all followers.
    pub async fn publish_result(
        &self,
        cache_key: &str,
        flight: Arc<GatewayInFlightFetch>,
        result: GatewayFetchRowsResult,
    ) {
        {
            let mut shared_result = flight.result.lock().await;
            *shared_result = Some(result);
        }

        {
            let mut in_flight = self.in_flight.lock().await;
            if let Some(existing) = in_flight.get(cache_key)
                && Arc::ptr_eq(existing, &flight)
            {
                in_flight.remove(cache_key);
            }
        }

        flight.notify.notify_waiters();
    }

    /// Waits for the leader's shared result until the configured timeout elapses.
    pub async fn wait_for_result(
        &self,
        flight: Arc<GatewayInFlightFetch>,
    ) -> Option<GatewayFetchRowsResult> {
        {
            let shared_result = flight.result.lock().await;
            if let Some(result) = &*shared_result {
                return Some(result.clone());
            }
        }

        let wait_result = tokio::time::timeout(self.wait_timeout, flight.notify.notified()).await;
        if wait_result.is_err() {
            return None;
        }

        let shared_result = flight.result.lock().await;
        shared_result.clone()
    }
}

#[cfg(test)]
mod tests {
    use super::{GatewayFetchSingleflight, GatewayFetchSingleflightRole, GatewayInFlightFetch};
    use serde_json::json;
    use std::sync::Arc;
    use std::time::Duration;

    fn coordinator(timeout_ms: u64) -> GatewayFetchSingleflight {
        GatewayFetchSingleflight::new(Duration::from_millis(timeout_ms))
    }

    #[tokio::test(flavor = "current_thread")]
    async fn follower_receives_leader_result() {
        let key = "singleflight-test-key";
        let coordinator = coordinator(100);

        let leader_flight: Arc<GatewayInFlightFetch> = match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Leader(flight) => flight,
            GatewayFetchSingleflightRole::Follower(_) => panic!("expected leader role"),
        };

        let follower_flight: Arc<GatewayInFlightFetch> = match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Follower(flight) => flight,
            GatewayFetchSingleflightRole::Leader(_) => panic!("expected follower role"),
        };

        coordinator
            .publish_result(key, leader_flight, Ok(vec![json!({ "from": "leader" })]))
            .await;

        let waited = coordinator.wait_for_result(follower_flight).await;

        assert!(waited.is_some());
        assert_eq!(waited.unwrap(), Ok(vec![json!({ "from": "leader" })]));
    }

    #[tokio::test(flavor = "current_thread")]
    async fn entry_is_removed_after_publish() {
        let key = "singleflight-test-key-cleanup";
        let coordinator = coordinator(100);

        let leader_flight: Arc<GatewayInFlightFetch> = match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Leader(flight) => flight,
            GatewayFetchSingleflightRole::Follower(_) => panic!("expected leader role"),
        };

        coordinator
            .publish_result(key, leader_flight, Ok(vec![json!({ "ok": true })]))
            .await;

        match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Leader(_) => {}
            GatewayFetchSingleflightRole::Follower(_) => {
                panic!("expected a new leader after cleanup")
            }
        }
    }

    #[tokio::test(flavor = "current_thread")]
    async fn follower_times_out_without_result() {
        let key = "singleflight-test-key-timeout";
        let coordinator = coordinator(1);

        let _leader_flight: Arc<GatewayInFlightFetch> = match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Leader(flight) => flight,
            GatewayFetchSingleflightRole::Follower(_) => panic!("expected leader role"),
        };

        let follower_flight: Arc<GatewayInFlightFetch> = match coordinator.acquire(key).await {
            GatewayFetchSingleflightRole::Follower(flight) => flight,
            GatewayFetchSingleflightRole::Leader(_) => panic!("expected follower role"),
        };

        let waited = coordinator.wait_for_result(follower_flight).await;

        assert!(waited.is_none());
    }
}