telltale_runtime/effects/middleware/
retry.rs1use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
12use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
13
14#[derive(Clone)]
16pub struct Retry<H> {
17 inner: H,
18 max_retries: usize,
19 base_delay: Duration,
20}
21
22impl<H> Retry<H> {
23 pub fn new(inner: H) -> Self {
24 Self {
25 inner,
26 max_retries: 3,
27 base_delay: Duration::from_millis(100),
28 }
29 }
30
31 pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
32 Self {
33 inner,
34 max_retries,
35 base_delay,
36 }
37 }
38}
39
40#[async_trait]
41impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
42 type Role = H::Role;
43 type Endpoint = H::Endpoint;
44
45 async fn send<M: Serialize + Send + Sync>(
46 &mut self,
47 ep: &mut Self::Endpoint,
48 to: Self::Role,
49 msg: &M,
50 ) -> ChoreoResult<()> {
51 let mut retries = 0;
52 loop {
53 match self.inner.send(ep, to, msg).await {
55 Ok(()) => return Ok(()),
56 Err(_e) if retries < self.max_retries => {
57 retries += 1;
58 let delay = self.base_delay * (1 << (retries - 1));
59 debug!(?to, ?retries, ?delay, "send failed, retrying");
60 cfg_if! {
62 if #[cfg(target_arch = "wasm32")] {
63 wasm_timer::Delay::new(delay).await.ok();
64 } else {
65 tokio::time::sleep(delay).await;
66 }
67 }
68 }
69 Err(e) => return Err(e),
70 }
71 }
72 }
73
74 async fn recv<M: DeserializeOwned + Send>(
75 &mut self,
76 ep: &mut Self::Endpoint,
77 from: Self::Role,
78 ) -> ChoreoResult<M> {
79 self.inner.recv(ep, from).await
81 }
82
83 async fn choose(
84 &mut self,
85 ep: &mut Self::Endpoint,
86 who: Self::Role,
87 label: <Self::Role as RoleId>::Label,
88 ) -> ChoreoResult<()> {
89 self.inner.choose(ep, who, label).await
90 }
91
92 async fn offer(
93 &mut self,
94 ep: &mut Self::Endpoint,
95 from: Self::Role,
96 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
97 self.inner.offer(ep, from).await
98 }
99
100 async fn with_timeout<F, T>(
101 &mut self,
102 ep: &mut Self::Endpoint,
103 at: Self::Role,
104 dur: Duration,
105 body: F,
106 ) -> ChoreoResult<T>
107 where
108 F: std::future::Future<Output = ChoreoResult<T>> + Send,
109 {
110 self.inner.with_timeout(ep, at, dur, body).await
111 }
112}
113
114impl<H> ExtensibleHandler for Retry<H>
115where
116 H: ExtensibleHandler + Send,
117{
118 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
119 self.inner.extension_registry()
120 }
121}