telltale_runtime/effects/middleware/
retry.rs1use async_trait::async_trait;
6use cfg_if::cfg_if;
7use serde::{de::DeserializeOwned, Serialize};
8use std::time::Duration;
9use tracing::debug;
10
11use crate::effects::contract::{DocumentedHandlerContract, HandlerContractProfile, RetryPolicy};
12use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
13use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
14
15#[derive(Clone)]
17pub struct Retry<H> {
18 inner: H,
19 max_retries: usize,
20 base_delay: Duration,
21}
22
23impl<H> Retry<H> {
24 pub fn new(inner: H) -> Self {
25 Self {
26 inner,
27 max_retries: 3,
28 base_delay: Duration::from_millis(100),
29 }
30 }
31
32 pub fn with_config(inner: H, max_retries: usize, base_delay: Duration) -> Self {
33 Self {
34 inner,
35 max_retries,
36 base_delay,
37 }
38 }
39}
40
41impl<H> DocumentedHandlerContract for Retry<H>
42where
43 H: DocumentedHandlerContract,
44{
45 fn contract_profile() -> HandlerContractProfile {
46 let mut profile = H::contract_profile();
47 profile.handler_name = std::any::type_name::<Self>();
48 profile.transport.retry_policy = RetryPolicy::ExternalMiddleware;
49 profile.notes.push(
50 "retry middleware may repeat send attempts but must not widen the handler's semantic contract",
51 );
52 profile
53 }
54}
55
56#[async_trait]
57impl<H: ChoreoHandler + Send> ChoreoHandler for Retry<H> {
58 type Role = H::Role;
59 type Endpoint = H::Endpoint;
60
61 async fn send<M: Serialize + Send + Sync>(
62 &mut self,
63 ep: &mut Self::Endpoint,
64 to: Self::Role,
65 msg: &M,
66 ) -> ChoreoResult<()> {
67 let mut retries = 0;
68 loop {
69 match self.inner.send(ep, to, msg).await {
71 Ok(()) => return Ok(()),
72 Err(_e) if retries < self.max_retries => {
73 retries += 1;
74 let delay = self.base_delay * (1 << (retries - 1));
75 debug!(?to, ?retries, ?delay, "send failed, retrying");
76 cfg_if! {
78 if #[cfg(target_arch = "wasm32")] {
79 wasm_timer::Delay::new(delay).await.ok();
80 } else {
81 tokio::time::sleep(delay).await;
82 }
83 }
84 }
85 Err(e) => return Err(e),
86 }
87 }
88 }
89
90 async fn recv<M: DeserializeOwned + Send>(
91 &mut self,
92 ep: &mut Self::Endpoint,
93 from: Self::Role,
94 ) -> ChoreoResult<M> {
95 self.inner.recv(ep, from).await
97 }
98
99 async fn choose(
100 &mut self,
101 ep: &mut Self::Endpoint,
102 who: Self::Role,
103 label: <Self::Role as RoleId>::Label,
104 ) -> ChoreoResult<()> {
105 self.inner.choose(ep, who, label).await
106 }
107
108 async fn offer(
109 &mut self,
110 ep: &mut Self::Endpoint,
111 from: Self::Role,
112 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
113 self.inner.offer(ep, from).await
114 }
115
116 async fn with_timeout<F, T>(
117 &mut self,
118 ep: &mut Self::Endpoint,
119 at: Self::Role,
120 dur: Duration,
121 body: F,
122 ) -> ChoreoResult<T>
123 where
124 F: std::future::Future<Output = ChoreoResult<T>> + Send,
125 {
126 self.inner.with_timeout(ep, at, dur, body).await
127 }
128}
129
130impl<H> ExtensibleHandler for Retry<H>
131where
132 H: ExtensibleHandler + Send,
133{
134 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
135 self.inner.extension_registry()
136 }
137}