stream_reconnect/
config.rs

1//! Provides options to configure the behavior of reconnect-stream items,
2//! specifically related to reconnect behavior.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7pub type DurationIterator = Box<dyn Iterator<Item = Duration> + Send + Sync>;
8
9/// User specified options that control the behavior of the [ReconnectStream](crate::ReconnectStream) upon disconnect.
10#[derive(Clone)]
11pub struct ReconnectOptions(Box<Inner>);
12
13impl ReconnectOptions {
14    pub(crate) fn retries_to_attempt_fn(&self) -> &Arc<dyn Fn() -> DurationIterator + Send + Sync> {
15        &self.0.retries_to_attempt_fn
16    }
17    pub(crate) fn exit_if_first_connect_fails(&self) -> bool {
18        self.0.exit_if_first_connect_fails
19    }
20    pub(crate) fn on_connect_callback(&self) -> &Arc<dyn Fn() + Send + Sync> {
21        &self.0.on_connect_callback
22    }
23    pub(crate) fn on_disconnect_callback(&self) -> &Arc<dyn Fn() + Send + Sync> {
24        &self.0.on_disconnect_callback
25    }
26    pub(crate) fn on_connect_fail_callback(&self) -> &Arc<dyn Fn() + Send + Sync> {
27        &self.0.on_connect_fail_callback
28    }
29}
30
31#[derive(Clone)]
32struct Inner {
33    retries_to_attempt_fn: Arc<dyn Fn() -> DurationIterator + Send + Sync>,
34    exit_if_first_connect_fails: bool,
35    on_connect_callback: Arc<dyn Fn() + Send + Sync>,
36    on_disconnect_callback: Arc<dyn Fn() + Send + Sync>,
37    on_connect_fail_callback: Arc<dyn Fn() + Send + Sync>,
38}
39
40impl ReconnectOptions {
41    /// By default, the [ReconnectStream](crate::ReconnectStream) will not try to reconnect if the first connect attempt fails.
42    /// By default, the retries iterator waits longer and longer between reconnection attempts,
43    /// until it eventually perpetually tries to reconnect every 30 minutes.
44    #[allow(clippy::new_without_default)]
45    pub fn new() -> Self {
46        ReconnectOptions(Box::new(Inner {
47            retries_to_attempt_fn: Arc::new(get_standard_reconnect_strategy),
48            exit_if_first_connect_fails: true,
49            on_connect_callback: Arc::new(|| {}),
50            on_disconnect_callback: Arc::new(|| {}),
51            on_connect_fail_callback: Arc::new(|| {}),
52        }))
53    }
54
55    /// Represents a function that generates an Iterator
56    /// to schedule the wait between reconnection attempts.
57    /// This method allows the user to provide any function that returns a value
58    /// which is convertible into an iterator, such as an actual iterator or a Vec.
59    /// # Examples
60    ///
61    /// ```
62    /// use std::time::Duration;
63    /// use stream_reconnect::ReconnectOptions;
64    ///
65    /// // With the below vector, the ReconnectStream item will try to reconnect three times,
66    /// // waiting 2 seconds between each attempt. Once all three tries are exhausted,
67    /// // it will stop attempting.
68    /// let options = ReconnectOptions::new().with_retries_generator(|| {
69    ///     vec![
70    ///         Duration::from_secs(2),
71    ///         Duration::from_secs(2),
72    ///         Duration::from_secs(2),
73    ///     ]
74    /// });
75    /// ```
76    pub fn with_retries_generator<F, I, IN>(mut self, retries_generator: F) -> Self
77    where
78        F: 'static + Send + Sync + Fn() -> IN,
79        I: 'static + Send + Sync + Iterator<Item = Duration>,
80        IN: IntoIterator<IntoIter = I, Item = Duration>,
81    {
82        self.0.retries_to_attempt_fn = Arc::new(move || Box::new(retries_generator().into_iter()));
83        self
84    }
85
86    /// If this is set to true, if the initial connect method of the [ReconnectStream](crate::ReconnectStream) item fails,
87    /// then no further reconnects will be attempted
88    pub fn with_exit_if_first_connect_fails(mut self, value: bool) -> Self {
89        self.0.exit_if_first_connect_fails = value;
90        self
91    }
92
93    /// Invoked when the [ReconnectStream](crate::ReconnectStream) establishes a connection
94    pub fn with_on_connect_callback(mut self, cb: impl Fn() + 'static + Send + Sync) -> Self {
95        self.0.on_connect_callback = Arc::new(cb);
96        self
97    }
98
99    /// Invoked when the [ReconnectStream](crate::ReconnectStream) loses its active connection
100    pub fn with_on_disconnect_callback(mut self, cb: impl Fn() + 'static + Send + Sync) -> Self {
101        self.0.on_disconnect_callback = Arc::new(cb);
102        self
103    }
104
105    /// Invoked when the [ReconnectStream](crate::ReconnectStream) fails a connection attempt
106    pub fn with_on_connect_fail_callback(mut self, cb: impl Fn() + 'static + Send + Sync) -> Self {
107        self.0.on_connect_fail_callback = Arc::new(cb);
108        self
109    }
110}
111
112fn get_standard_reconnect_strategy() -> DurationIterator {
113    let initial_attempts = vec![
114        Duration::from_secs(5),
115        Duration::from_secs(10),
116        Duration::from_secs(20),
117        Duration::from_secs(30),
118        Duration::from_secs(40),
119        Duration::from_secs(50),
120        Duration::from_secs(60),
121        Duration::from_secs(60 * 2),
122        Duration::from_secs(60 * 5),
123        Duration::from_secs(60 * 10),
124        Duration::from_secs(60 * 20),
125    ];
126
127    let repeat = std::iter::repeat(Duration::from_secs(60 * 30));
128
129    let forever_iterator = initial_attempts.into_iter().chain(repeat);
130
131    Box::new(forever_iterator)
132}