Skip to main content

robust_provider/robust_provider/
builder.rs

1use std::{pin::Pin, time::Duration};
2
3use alloy::{network::Network, providers::RootProvider};
4
5use crate::robust_provider::{
6    Error, IntoRootProvider, RobustProvider, subscription::DEFAULT_RECONNECT_INTERVAL,
7};
8
9type BoxedProviderFuture<N> = Pin<Box<dyn Future<Output = Result<RootProvider<N>, Error>> + Send>>;
10
11// RPC retry and timeout settings
12/// Default timeout used by `RobustProvider`
13pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(60);
14/// Default timeout for subscriptions
15pub const DEFAULT_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(120);
16/// Default maximum number of retry attempts.
17pub const DEFAULT_MAX_RETRIES: usize = 3;
18/// Default base delay between retries.
19pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
20/// Default subscription channel size.
21pub const DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY: usize = 128;
22/// Default polling interval for HTTP subscriptions.
23///
24/// Set to 12 seconds to match approximate Ethereum mainnet block time.
25/// Adjust based on the target chain's block time.
26#[cfg(feature = "http-subscription")]
27pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(12);
28
29/// Builder for constructing a [`RobustProvider`].
30///
31/// Use this to configure timeouts, retry/backoff, and one or more fallback providers.
32pub struct RobustProviderBuilder<N: Network, P: IntoRootProvider<N>> {
33    primary_provider: P,
34    fallback_providers: Vec<BoxedProviderFuture<N>>,
35    call_timeout: Duration,
36    subscription_timeout: Duration,
37    max_retries: usize,
38    min_delay: Duration,
39    reconnect_interval: Duration,
40    subscription_buffer_capacity: usize,
41    #[cfg(feature = "http-subscription")]
42    poll_interval: Duration,
43    #[cfg(feature = "http-subscription")]
44    allow_http_subscriptions: bool,
45}
46
47impl<N: Network, P: IntoRootProvider<N>> RobustProviderBuilder<N, P> {
48    /// Create a new [`RobustProvider`] with default settings.
49    ///
50    /// The provided provider is treated as the primary provider.
51    /// Any type implementing [`IntoRootProvider`] can be used.
52    #[must_use]
53    pub fn new(provider: P) -> Self {
54        Self {
55            primary_provider: provider,
56            fallback_providers: vec![],
57            call_timeout: DEFAULT_CALL_TIMEOUT,
58            subscription_timeout: DEFAULT_SUBSCRIPTION_TIMEOUT,
59            max_retries: DEFAULT_MAX_RETRIES,
60            min_delay: DEFAULT_MIN_DELAY,
61            reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
62            subscription_buffer_capacity: DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY,
63            #[cfg(feature = "http-subscription")]
64            poll_interval: DEFAULT_POLL_INTERVAL,
65            #[cfg(feature = "http-subscription")]
66            allow_http_subscriptions: false,
67        }
68    }
69
70    /// Create a new [`RobustProvider`] with no retry attempts and only timeout set.
71    ///
72    /// The provided provider is treated as the primary provider.
73    #[must_use]
74    pub fn fragile(provider: P) -> Self {
75        Self::new(provider).max_retries(0).min_delay(Duration::ZERO)
76    }
77
78    /// Add a fallback provider to the list.
79    ///
80    /// Fallback providers are used when the primary provider times out or fails.
81    #[must_use]
82    pub fn fallback<F: IntoRootProvider<N> + Send + 'static>(mut self, provider: F) -> Self {
83        self.fallback_providers.push(Box::pin(provider.into_root_provider()));
84        self
85    }
86
87    /// Set the maximum timeout for RPC operations.
88    #[must_use]
89    pub fn call_timeout(mut self, timeout: Duration) -> Self {
90        self.call_timeout = timeout;
91        self
92    }
93
94    /// Set the timeout for subscription operations.
95    ///
96    /// This should be set higher than [`call_timeout`](Self::call_timeout) to accommodate chains
97    /// with slow block times. Default is [`DEFAULT_SUBSCRIPTION_TIMEOUT`].
98    #[must_use]
99    pub fn subscription_timeout(mut self, timeout: Duration) -> Self {
100        self.subscription_timeout = timeout;
101        self
102    }
103
104    /// Set the subscription stream buffer capacity.
105    ///
106    /// Controls the buffer capacity for subscription streams. If new blocks arrive
107    /// while the stream buffer is full, a lagged error will be emitted, indicating
108    /// that stream items were dropped due to the consumer not keeping pace with the stream.
109    ///
110    /// Internally calls [`alloy::providers::GetSubscription::channel_size`].
111    ///
112    /// Default is [`DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY`].
113    #[must_use]
114    pub fn subscription_buffer_capacity(mut self, buffer_capacity: usize) -> Self {
115        self.subscription_buffer_capacity = buffer_capacity;
116        self
117    }
118
119    /// Set the maximum number of retry attempts.
120    #[must_use]
121    pub fn max_retries(mut self, max_retries: usize) -> Self {
122        self.max_retries = max_retries;
123        self
124    }
125
126    /// Set the base delay for exponential backoff retries.
127    #[must_use]
128    pub fn min_delay(mut self, min_delay: Duration) -> Self {
129        self.min_delay = min_delay;
130        self
131    }
132
133    /// Set the interval for attempting to reconnect to the primary provider.
134    ///
135    /// After a failover to a fallback provider, the subscription will periodically
136    /// attempt to reconnect to the primary provider at this interval.
137    /// Default is [`DEFAULT_RECONNECT_INTERVAL`].
138    #[must_use]
139    pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self {
140        self.reconnect_interval = reconnect_interval;
141        self
142    }
143
144    /// Set the polling interval for HTTP-based subscriptions.
145    ///
146    /// This controls how frequently HTTP providers poll for new blocks
147    /// when used as subscription sources. Only relevant when
148    /// [`allow_http_subscriptions`](Self::allow_http_subscriptions) is enabled.
149    ///
150    /// Default is [`DEFAULT_POLL_INTERVAL`].
151    /// Adjust based on your target chain's block time.
152    ///
153    /// # Feature Flag
154    ///
155    /// This method requires the `http-subscription` feature.
156    ///
157    /// # Example
158    ///
159    /// ```rust,ignore
160    /// let robust = RobustProviderBuilder::new(http_provider)
161    ///     .allow_http_subscriptions(true)
162    ///     .poll_interval(Duration::from_secs(6)) // For faster chains
163    ///     .build()
164    ///     .await?;
165    /// ```
166    #[cfg(feature = "http-subscription")]
167    #[must_use]
168    pub fn poll_interval(mut self, interval: Duration) -> Self {
169        self.poll_interval = interval;
170        self
171    }
172
173    /// Enable HTTP providers for subscriptions via polling.
174    ///
175    /// When enabled, HTTP providers can participate in subscriptions
176    /// by polling for new blocks at the configured [`poll_interval`](Self::poll_interval).
177    ///
178    /// # Trade-offs
179    ///
180    /// * **Latency**: New blocks detected with up to `poll_interval` delay
181    /// * **RPC Load**: Generates one RPC call per `poll_interval`
182    /// * **Intermediate Blocks**: Depending on the node/provider semantics, you may not observe
183    ///   every intermediate block when `poll_interval` is larger than the chain's block time (e.g.
184    ///   if only the latest head is exposed).
185    ///
186    /// # Feature Flag
187    ///
188    /// This method requires the `http-subscription` feature.
189    ///
190    /// # Example
191    ///
192    /// ```rust,ignore
193    /// let robust = RobustProviderBuilder::new(http_provider)
194    ///     .allow_http_subscriptions(true)
195    ///     .build()
196    ///     .await?;
197    /// ```
198    #[cfg(feature = "http-subscription")]
199    #[must_use]
200    pub fn allow_http_subscriptions(mut self, allow: bool) -> Self {
201        self.allow_http_subscriptions = allow;
202        self
203    }
204
205    /// Build the `RobustProvider`.
206    ///
207    /// Final builder method: consumes the builder and returns the built [`RobustProvider`].
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if any of the providers fail to connect.
212    pub async fn build(self) -> Result<RobustProvider<N>, Error> {
213        debug!(
214            call_timeout_ms = self.call_timeout.as_millis(),
215            subscription_timeout_ms = self.subscription_timeout.as_millis(),
216            max_retries = self.max_retries,
217            fallback_count = self.fallback_providers.len(),
218            "Building RobustProvider"
219        );
220
221        let primary_provider = self.primary_provider.into_root_provider().await?;
222
223        let mut fallback_providers = Vec::with_capacity(self.fallback_providers.len());
224        for (idx, fallback) in self.fallback_providers.into_iter().enumerate() {
225            trace!(fallback_index = idx, "Connecting fallback provider");
226            // ignore unused var warning when tracing disabled
227            _ = idx;
228
229            fallback_providers.push(fallback.await?);
230        }
231
232        info!("RobustProvider initialized");
233
234        Ok(RobustProvider {
235            primary_provider,
236            fallback_providers,
237            call_timeout: self.call_timeout,
238            subscription_timeout: self.subscription_timeout,
239            max_retries: self.max_retries,
240            min_delay: self.min_delay,
241            reconnect_interval: self.reconnect_interval,
242            subscription_buffer_capacity: self.subscription_buffer_capacity,
243            #[cfg(feature = "http-subscription")]
244            poll_interval: self.poll_interval,
245            #[cfg(feature = "http-subscription")]
246            allow_http_subscriptions: self.allow_http_subscriptions,
247        })
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use alloy::{
255        node_bindings::Anvil,
256        providers::{ProviderBuilder, WsConnect},
257    };
258
259    #[tokio::test]
260    async fn test_builder_primary_type_different_to_fallback() -> anyhow::Result<()> {
261        let anvil = Anvil::new().try_spawn()?;
262
263        let fill_provider = ProviderBuilder::new()
264            .connect_ws(WsConnect::new(anvil.ws_endpoint_url().as_str()))
265            .await?;
266
267        let root_provider = RootProvider::new_http(anvil.endpoint_url());
268
269        let robust = RobustProviderBuilder::new(fill_provider)
270            .fallback(root_provider)
271            .call_timeout(Duration::from_secs(5))
272            .build()
273            .await?;
274
275        assert_eq!(robust.fallback_providers.len(), 1);
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn test_builder_with_multiple_fallback_types() -> anyhow::Result<()> {
282        let anvil = Anvil::new().try_spawn()?;
283
284        let fill_provider = ProviderBuilder::new()
285            .connect_ws(WsConnect::new(anvil.ws_endpoint_url().as_str()))
286            .await?;
287
288        let root_provider = RootProvider::new_http(anvil.endpoint_url());
289
290        let url_provider = anvil.endpoint_url();
291
292        let robust = RobustProviderBuilder::new(fill_provider)
293            .fallback(root_provider)
294            .fallback(url_provider.clone())
295            .fallback(url_provider)
296            .build()
297            .await?;
298
299        assert_eq!(robust.fallback_providers.len(), 3);
300
301        Ok(())
302    }
303}