telltale_runtime/effects/middleware/
retry.rs1use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
12
13#[derive(Clone)]
15pub struct Retry<H> {
16 inner: H,
17 max_retries: usize,
18 base_delay: Duration,
19}
20
21impl<H> Retry<H> {
22 pub fn new(inner: H) -> Self {
23 Self {
24 inner,
25 max_retries: 3,
26 base_delay: Duration::from_millis(100),
27 }
28 }
29
30 pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
31 Self {
32 inner,
33 max_retries,
34 base_delay,
35 }
36 }
37}
38
39#[async_trait]
40impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
41 type Role = H::Role;
42 type Endpoint = H::Endpoint;
43
44 async fn send<M: Serialize + Send + Sync>(
45 &mut self,
46 ep: &mut Self::Endpoint,
47 to: Self::Role,
48 msg: &M,
49 ) -> ChoreoResult<()> {
50 let mut retries = 0;
51 loop {
52 match self.inner.send(ep, to, msg).await {
54 Ok(()) => return Ok(()),
55 Err(_e) if retries < self.max_retries => {
56 retries += 1;
57 let delay = self.base_delay * (1 << (retries - 1));
58 debug!(?to, ?retries, ?delay, "send failed, retrying");
59 cfg_if! {
61 if #[cfg(target_arch = "wasm32")] {
62 wasm_timer::Delay::new(delay).await.ok();
63 } else {
64 tokio::time::sleep(delay).await;
65 }
66 }
67 }
68 Err(e) => return Err(e),
69 }
70 }
71 }
72
73 async fn recv<M: DeserializeOwned + Send>(
74 &mut self,
75 ep: &mut Self::Endpoint,
76 from: Self::Role,
77 ) -> ChoreoResult<M> {
78 self.inner.recv(ep, from).await
80 }
81
82 async fn choose(
83 &mut self,
84 ep: &mut Self::Endpoint,
85 who: Self::Role,
86 label: <Self::Role as RoleId>::Label,
87 ) -> ChoreoResult<()> {
88 self.inner.choose(ep, who, label).await
89 }
90
91 async fn offer(
92 &mut self,
93 ep: &mut Self::Endpoint,
94 from: Self::Role,
95 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
96 self.inner.offer(ep, from).await
97 }
98
99 async fn with_timeout<F, T>(
100 &mut self,
101 ep: &mut Self::Endpoint,
102 at: Self::Role,
103 dur: Duration,
104 body: F,
105 ) -> ChoreoResult<T>
106 where
107 F: std::future::Future<Output = ChoreoResult<T>> + Send,
108 {
109 self.inner.with_timeout(ep, at, dur, body).await
110 }
111}