telltale-runtime 17.0.0

Choreographic programming for Telltale - effect-based distributed protocols
Documentation
// Metrics collection middleware for effect handlers
//
// Tracks counts of sends, receives, and errors for monitoring and analysis.

use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;

use crate::effects::contract::{DocumentedHandlerContract, HandlerContractProfile};
use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};

/// Metrics collection middleware
#[derive(Clone)]
pub struct Metrics<H> {
    inner: H,
    send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
    recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
    error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
}

impl<H> Metrics<H> {
    pub fn new(inner: H) -> Self {
        Self {
            inner,
            send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
            recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
            error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
        }
    }

    pub fn send_count(&self) -> u64 {
        self.send_count.load(std::sync::atomic::Ordering::Relaxed)
    }

    pub fn recv_count(&self) -> u64 {
        self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
    }

    pub fn error_count(&self) -> u64 {
        self.error_count.load(std::sync::atomic::Ordering::Relaxed)
    }
}

impl<H> DocumentedHandlerContract for Metrics<H>
where
    H: DocumentedHandlerContract,
{
    fn contract_profile() -> HandlerContractProfile {
        let mut profile = H::contract_profile();
        profile.handler_name = std::any::type_name::<Self>();
        profile
            .notes
            .push("metrics middleware may count outcomes but must not alter payloads or labels");
        profile
    }
}

#[async_trait]
impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
    type Role = H::Role;
    type Endpoint = H::Endpoint;

    async fn send<M: Serialize + Send + Sync>(
        &mut self,
        ep: &mut Self::Endpoint,
        to: Self::Role,
        msg: &M,
    ) -> ChoreoResult<()> {
        let result = self.inner.send(ep, to, msg).await;
        if result.is_ok() {
            self.send_count
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        } else {
            self.error_count
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        }
        result
    }

    async fn recv<M: DeserializeOwned + Send>(
        &mut self,
        ep: &mut Self::Endpoint,
        from: Self::Role,
    ) -> ChoreoResult<M> {
        let result = self.inner.recv(ep, from).await;
        if result.is_ok() {
            self.recv_count
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        } else {
            self.error_count
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        }
        result
    }

    async fn choose(
        &mut self,
        ep: &mut Self::Endpoint,
        who: Self::Role,
        label: <Self::Role as RoleId>::Label,
    ) -> ChoreoResult<()> {
        self.inner.choose(ep, who, label).await
    }

    async fn offer(
        &mut self,
        ep: &mut Self::Endpoint,
        from: Self::Role,
    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
        self.inner.offer(ep, from).await
    }

    async fn with_timeout<F, T>(
        &mut self,
        ep: &mut Self::Endpoint,
        at: Self::Role,
        dur: Duration,
        body: F,
    ) -> ChoreoResult<T>
    where
        F: std::future::Future<Output = ChoreoResult<T>> + Send,
    {
        self.inner.with_timeout(ep, at, dur, body).await
    }
}

impl<H> ExtensibleHandler for Metrics<H>
where
    H: ExtensibleHandler + Send,
{
    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
        self.inner.extension_registry()
    }
}