Skip to main content

telltale_runtime/effects/middleware/
retry.rs

1// Retry middleware for effect handlers
2//
3// Implements exponential backoff retry logic for failed send operations.
4
5use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
12use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
13
14/// Retry middleware with exponential backoff
15#[derive(Clone)]
16pub struct Retry<H> {
17    inner: H,
18    max_retries: usize,
19    base_delay: Duration,
20}
21
22impl<H> Retry<H> {
23    pub fn new(inner: H) -> Self {
24        Self {
25            inner,
26            max_retries: 3,
27            base_delay: Duration::from_millis(100),
28        }
29    }
30
31    pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
32        Self {
33            inner,
34            max_retries,
35            base_delay,
36        }
37    }
38}
39
40#[async_trait]
41impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
42    type Role = H::Role;
43    type Endpoint = H::Endpoint;
44
45    async fn send<M: Serialize + Send + Sync>(
46        &mut self,
47        ep: &mut Self::Endpoint,
48        to: Self::Role,
49        msg: &M,
50    ) -> ChoreoResult<()> {
51        let mut retries = 0;
52        loop {
53            // bounded: exits on success or when retries >= max_retries
54            match self.inner.send(ep, to, msg).await {
55                Ok(()) => return Ok(()),
56                Err(_e) if retries < self.max_retries => {
57                    retries += 1;
58                    let delay = self.base_delay * (1 << (retries - 1));
59                    debug!(?to, ?retries, ?delay, "send failed, retrying");
60                    // Platform-specific sleep
61                    cfg_if! {
62                        if #[cfg(target_arch = "wasm32")] {
63                            wasm_timer::Delay::new(delay).await.ok();
64                        } else {
65                            tokio::time::sleep(delay).await;
66                        }
67                    }
68                }
69                Err(e) => return Err(e),
70            }
71        }
72    }
73
74    async fn recv<M: DeserializeOwned + Send>(
75        &mut self,
76        ep: &mut Self::Endpoint,
77        from: Self::Role,
78    ) -> ChoreoResult<M> {
79        // Recv typically shouldn't be retried as it changes protocol state
80        self.inner.recv(ep, from).await
81    }
82
83    async fn choose(
84        &mut self,
85        ep: &mut Self::Endpoint,
86        who: Self::Role,
87        label: <Self::Role as RoleId>::Label,
88    ) -> ChoreoResult<()> {
89        self.inner.choose(ep, who, label).await
90    }
91
92    async fn offer(
93        &mut self,
94        ep: &mut Self::Endpoint,
95        from: Self::Role,
96    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
97        self.inner.offer(ep, from).await
98    }
99
100    async fn with_timeout<F, T>(
101        &mut self,
102        ep: &mut Self::Endpoint,
103        at: Self::Role,
104        dur: Duration,
105        body: F,
106    ) -> ChoreoResult<T>
107    where
108        F: std::future::Future<Output = ChoreoResult<T>> + Send,
109    {
110        self.inner.with_timeout(ep, at, dur, body).await
111    }
112}
113
114impl<H> ExtensibleHandler for Retry<H>
115where
116    H: ExtensibleHandler + Send,
117{
118    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
119        self.inner.extension_registry()
120    }
121}