Skip to main content

telltale_runtime/effects/middleware/
retry.rs

1// Retry middleware for effect handlers
2//
3// Implements exponential backoff retry logic for failed send operations.
4
5use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
12
13/// Retry middleware with exponential backoff
14#[derive(Clone)]
15pub struct Retry<H> {
16    inner: H,
17    max_retries: usize,
18    base_delay: Duration,
19}
20
21impl<H> Retry<H> {
22    pub fn new(inner: H) -> Self {
23        Self {
24            inner,
25            max_retries: 3,
26            base_delay: Duration::from_millis(100),
27        }
28    }
29
30    pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
31        Self {
32            inner,
33            max_retries,
34            base_delay,
35        }
36    }
37}
38
39#[async_trait]
40impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
41    type Role = H::Role;
42    type Endpoint = H::Endpoint;
43
44    async fn send<M: Serialize + Send + Sync>(
45        &mut self,
46        ep: &mut Self::Endpoint,
47        to: Self::Role,
48        msg: &M,
49    ) -> ChoreoResult<()> {
50        let mut retries = 0;
51        loop {
52            // bounded: exits on success or when retries >= max_retries
53            match self.inner.send(ep, to, msg).await {
54                Ok(()) => return Ok(()),
55                Err(_e) if retries < self.max_retries => {
56                    retries += 1;
57                    let delay = self.base_delay * (1 << (retries - 1));
58                    debug!(?to, ?retries, ?delay, "send failed, retrying");
59                    // Platform-specific sleep
60                    cfg_if! {
61                        if #[cfg(target_arch = "wasm32")] {
62                            wasm_timer::Delay::new(delay).await.ok();
63                        } else {
64                            tokio::time::sleep(delay).await;
65                        }
66                    }
67                }
68                Err(e) => return Err(e),
69            }
70        }
71    }
72
73    async fn recv<M: DeserializeOwned + Send>(
74        &mut self,
75        ep: &mut Self::Endpoint,
76        from: Self::Role,
77    ) -> ChoreoResult<M> {
78        // Recv typically shouldn't be retried as it changes protocol state
79        self.inner.recv(ep, from).await
80    }
81
82    async fn choose(
83        &mut self,
84        ep: &mut Self::Endpoint,
85        who: Self::Role,
86        label: <Self::Role as RoleId>::Label,
87    ) -> ChoreoResult<()> {
88        self.inner.choose(ep, who, label).await
89    }
90
91    async fn offer(
92        &mut self,
93        ep: &mut Self::Endpoint,
94        from: Self::Role,
95    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
96        self.inner.offer(ep, from).await
97    }
98
99    async fn with_timeout<F, T>(
100        &mut self,
101        ep: &mut Self::Endpoint,
102        at: Self::Role,
103        dur: Duration,
104        body: F,
105    ) -> ChoreoResult<T>
106    where
107        F: std::future::Future<Output = ChoreoResult<T>> + Send,
108    {
109        self.inner.with_timeout(ep, at, dur, body).await
110    }
111}