Skip to main content

telltale_runtime/effects/middleware/
retry.rs

1// Retry middleware for effect handlers
2//
3// Implements exponential backoff retry logic for failed send operations.
4
5use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::contract::{DocumentedHandlerContract, HandlerContractProfile, RetryPolicy};
12use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
13use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
14
15/// Retry middleware with exponential backoff
16#[derive(Clone)]
17pub struct Retry<H> {
18    inner: H,
19    max_retries: usize,
20    base_delay: Duration,
21}
22
23impl<H> Retry<H> {
24    pub fn new(inner: H) -> Self {
25        Self {
26            inner,
27            max_retries: 3,
28            base_delay: Duration::from_millis(100),
29        }
30    }
31
32    pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
33        Self {
34            inner,
35            max_retries,
36            base_delay,
37        }
38    }
39}
40
41impl<H> DocumentedHandlerContract for Retry<H>
42where
43    H: DocumentedHandlerContract,
44{
45    fn contract_profile() -> HandlerContractProfile {
46        let mut profile = H::contract_profile();
47        profile.handler_name = std::any::type_name::<Self>();
48        profile.transport.retry_policy = RetryPolicy::ExternalMiddleware;
49        profile.notes.push(
50            "retry middleware may repeat send attempts but must not widen the handler's semantic contract",
51        );
52        profile
53    }
54}
55
56#[async_trait]
57impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
58    type Role = H::Role;
59    type Endpoint = H::Endpoint;
60
61    async fn send<M: Serialize + Send + Sync>(
62        &mut self,
63        ep: &mut Self::Endpoint,
64        to: Self::Role,
65        msg: &M,
66    ) -> ChoreoResult<()> {
67        let mut retries = 0;
68        loop {
69            // bounded: exits on success or when retries >= max_retries
70            match self.inner.send(ep, to, msg).await {
71                Ok(()) => return Ok(()),
72                Err(_e) if retries < self.max_retries => {
73                    retries += 1;
74                    let delay = self.base_delay * (1 << (retries - 1));
75                    debug!(?to, ?retries, ?delay, "send failed, retrying");
76                    // Platform-specific sleep
77                    cfg_if! {
78                        if #[cfg(target_arch = "wasm32")] {
79                            wasm_timer::Delay::new(delay).await.ok();
80                        } else {
81                            tokio::time::sleep(delay).await;
82                        }
83                    }
84                }
85                Err(e) => return Err(e),
86            }
87        }
88    }
89
90    async fn recv<M: DeserializeOwned + Send>(
91        &mut self,
92        ep: &mut Self::Endpoint,
93        from: Self::Role,
94    ) -> ChoreoResult<M> {
95        // Recv typically shouldn't be retried as it changes protocol state
96        self.inner.recv(ep, from).await
97    }
98
99    async fn choose(
100        &mut self,
101        ep: &mut Self::Endpoint,
102        who: Self::Role,
103        label: <Self::Role as RoleId>::Label,
104    ) -> ChoreoResult<()> {
105        self.inner.choose(ep, who, label).await
106    }
107
108    async fn offer(
109        &mut self,
110        ep: &mut Self::Endpoint,
111        from: Self::Role,
112    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
113        self.inner.offer(ep, from).await
114    }
115
116    async fn with_timeout<F, T>(
117        &mut self,
118        ep: &mut Self::Endpoint,
119        at: Self::Role,
120        dur: Duration,
121        body: F,
122    ) -> ChoreoResult<T>
123    where
124        F: std::future::Future<Output = ChoreoResult<T>> + Send,
125    {
126        self.inner.with_timeout(ep, at, dur, body).await
127    }
128}
129
130impl<H> ExtensibleHandler for Retry<H>
131where
132    H: ExtensibleHandler + Send,
133{
134    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
135        self.inner.extension_registry()
136    }
137}