noosphere_storage/
retry.rs1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use std::time::{Duration, Instant};
5use tokio::select;
6
7use crate::BlockStore;
8
9const DEFAULT_MAX_RETRIES: u32 = 2u32;
10const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
11const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_millis(100);
12const DEFAULT_BACKOFF: Backoff = Backoff::Linear {
13 increment: Duration::from_secs(1),
14 ceiling: Duration::from_secs(3),
15};
16
17#[derive(Clone)]
20pub enum Backoff {
21 Linear {
23 increment: Duration,
25 ceiling: Duration,
27 },
28 Exponential {
30 exponent: f32,
32 ceiling: Duration,
34 },
35}
36
37impl Backoff {
38 pub fn step(&self, duration: Duration) -> Duration {
41 match self {
42 Backoff::Linear { increment, ceiling } => (duration + *increment).min(*ceiling),
43 Backoff::Exponential { exponent, ceiling } => {
44 Duration::from_secs_f32(duration.as_secs_f32().powf(*exponent)).min(*ceiling)
45 }
46 }
47 }
48}
49
50#[derive(Clone)]
62pub struct BlockStoreRetry<S>
63where
64 S: BlockStore,
65{
66 pub store: S,
69 pub maximum_retries: u32,
72 pub attempt_window: Duration,
75 pub minimum_delay: Duration,
77 pub backoff: Option<Backoff>,
80}
81
82impl<S> From<S> for BlockStoreRetry<S>
83where
84 S: BlockStore,
85{
86 fn from(store: S) -> Self {
87 Self {
88 store,
89 maximum_retries: DEFAULT_MAX_RETRIES,
90 attempt_window: DEFAULT_TIMEOUT,
91 minimum_delay: DEFAULT_MINIMUM_DELAY,
92 backoff: Some(DEFAULT_BACKOFF),
93 }
94 }
95}
96
97#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99impl<S> BlockStore for BlockStoreRetry<S>
100where
101 S: BlockStore,
102{
103 async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
104 self.store.put_block(cid, block).await
105 }
106
107 async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
108 let mut retry_count = 0;
109 let mut next_timeout = self.attempt_window;
110
111 loop {
112 if retry_count > self.maximum_retries {
113 break;
114 }
115
116 let window_start = Instant::now();
117
118 select! {
119 result = self.store.get_block(cid) => {
120 match result {
121 Ok(maybe_block) => {
122 return Ok(maybe_block);
123 },
124 Err(error) => {
125 warn!("Error while getting {}: {}", cid, error);
126 },
127 };
128 },
129 _ = tokio::time::sleep(next_timeout) => {
130 warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs_f32());
131 }
132 }
133
134 let spent_window_time = Instant::now() - window_start;
135
136 let remaining_window_time = self.attempt_window
140 - spent_window_time
141 .max(self.minimum_delay)
142 .min(self.attempt_window);
143
144 retry_count += 1;
145
146 if let Some(backoff) = &self.backoff {
147 next_timeout = backoff.step(next_timeout);
148 trace!(
149 "Next timeout will be {} seconds",
150 next_timeout.as_secs_f32()
151 );
152 }
153
154 tokio::time::sleep(remaining_window_time).await;
155 }
156
157 Err(anyhow!(
158 "Failed to get {} after {} tries...",
159 cid,
160 retry_count
161 ))
162 }
163}