noosphere_storage/
retry.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use cid::Cid;
4use std::time::{Duration, Instant};
5use tokio::select;
6
7use crate::BlockStore;
8
9const DEFAULT_MAX_RETRIES: u32 = 2u32;
10const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
11const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_millis(100);
12const DEFAULT_BACKOFF: Backoff = Backoff::Linear {
13    increment: Duration::from_secs(1),
14    ceiling: Duration::from_secs(3),
15};
16
17/// Backoff configuration used to define how [BlockStoreRetry] should time
18/// further attempts when store reads fail.
19#[derive(Clone)]
20pub enum Backoff {
21    /// The time between retry attempts increases linearly
22    Linear {
23        /// Increment to increase the next time window by
24        increment: Duration,
25        /// The maximum time window length
26        ceiling: Duration,
27    },
28    /// The time between retry attempts increases exponentially
29    Exponential {
30        /// The power to increase the next time window by
31        exponent: f32,
32        /// The maximum time window length
33        ceiling: Duration,
34    },
35}
36
37impl Backoff {
38    /// Apply the backoff configuration to a given input [Duration] and return
39    /// the result
40    pub fn step(&self, duration: Duration) -> Duration {
41        match self {
42            Backoff::Linear { increment, ceiling } => (duration + *increment).min(*ceiling),
43            Backoff::Exponential { exponent, ceiling } => {
44                Duration::from_secs_f32(duration.as_secs_f32().powf(*exponent)).min(*ceiling)
45            }
46        }
47    }
48}
49
50/// Implements retry and timeout logic for accessing blocks from a [BlockStore].
51/// Any [BlockStore] can be wrapped by [BlockStoreRetry] to get retry and
52/// timeout logic for free. Each attempt to lookup a block is time limited by to
53/// a specified window with optional [Backoff], and at most `maximum_retries`
54/// will be made to load the block.
55///
56/// Local [BlockStore] implementations won't benefit a lot from this, but
57/// network implementations such as [IpfsStore] can be made more reliable with a
58/// modest retry policy (and timeouts will help make sure we don't hang
59/// indefinitely waiting for an implementation like Kubo to get its act
60/// together).
61#[derive(Clone)]
62pub struct BlockStoreRetry<S>
63where
64    S: BlockStore,
65{
66    /// A [BlockStore] implementation that the [BlockStoreRetry] proxies
67    /// reads to in order to implement retry behavior
68    pub store: S,
69    /// The maximum number of additional attempts to make if a read to the
70    /// wrapped store should fail
71    pub maximum_retries: u32,
72    /// The maximum time that a read is allowed to take before it is considered
73    /// failed
74    pub attempt_window: Duration,
75    /// The minimum time between attempts
76    pub minimum_delay: Duration,
77    /// If a [Backoff] is configured, the attempt window will grow with each
78    /// attempt based on the configuration
79    pub backoff: Option<Backoff>,
80}
81
82impl<S> From<S> for BlockStoreRetry<S>
83where
84    S: BlockStore,
85{
86    fn from(store: S) -> Self {
87        Self {
88            store,
89            maximum_retries: DEFAULT_MAX_RETRIES,
90            attempt_window: DEFAULT_TIMEOUT,
91            minimum_delay: DEFAULT_MINIMUM_DELAY,
92            backoff: Some(DEFAULT_BACKOFF),
93        }
94    }
95}
96
97#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99impl<S> BlockStore for BlockStoreRetry<S>
100where
101    S: BlockStore,
102{
103    async fn put_block(&mut self, cid: &Cid, block: &[u8]) -> Result<()> {
104        self.store.put_block(cid, block).await
105    }
106
107    async fn get_block(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
108        let mut retry_count = 0;
109        let mut next_timeout = self.attempt_window;
110
111        loop {
112            if retry_count > self.maximum_retries {
113                break;
114            }
115
116            let window_start = Instant::now();
117
118            select! {
119                result = self.store.get_block(cid) => {
120                    match result {
121                        Ok(maybe_block) => {
122                            return Ok(maybe_block);
123                        },
124                        Err(error) => {
125                          warn!("Error while getting {}: {}", cid, error);
126                        },
127                    };
128                },
129                _ = tokio::time::sleep(next_timeout) => {
130                    warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs_f32());
131                }
132            }
133
134            let spent_window_time = Instant::now() - window_start;
135
136            // NOTE: Be careful here; `Duration` will overflow when dealing with
137            // negative values so these operations are effectively fallible.
138            // https://doc.rust-lang.org/std/time/struct.Duration.html#panics-7
139            let remaining_window_time = self.attempt_window
140                - spent_window_time
141                    .max(self.minimum_delay)
142                    .min(self.attempt_window);
143
144            retry_count += 1;
145
146            if let Some(backoff) = &self.backoff {
147                next_timeout = backoff.step(next_timeout);
148                trace!(
149                    "Next timeout will be {} seconds",
150                    next_timeout.as_secs_f32()
151                );
152            }
153
154            tokio::time::sleep(remaining_window_time).await;
155        }
156
157        Err(anyhow!(
158            "Failed to get {} after {} tries...",
159            cid,
160            retry_count
161        ))
162    }
163}