Skip to main content

force_pubsub/
config.rs

1//! Configuration types for the Salesforce Pub/Sub API client.
2
3use std::time::Duration;
4
5/// Configuration for the Pub/Sub client.
6#[derive(Debug, Clone)]
7pub struct PubSubConfig {
8    /// gRPC endpoint for the Pub/Sub API.
9    pub endpoint: String,
10    /// Number of events to request per FetchRequest batch.
11    ///
12    /// Must be between 1 and 100 (inclusive). Values outside this range will
13    /// cause [`PubSubHandler::connect`] to return a [`PubSubError::Config`] error.
14    /// The Salesforce Pub/Sub API rejects requests with 0 or negative values.
15    pub batch_size: i32,
16    /// Reconnection policy for the subscribe stream.
17    pub reconnect_policy: ReconnectPolicy,
18}
19
20impl Default for PubSubConfig {
21    fn default() -> Self {
22        Self {
23            endpoint: "https://api.pubsub.salesforce.com:7443".to_string(),
24            batch_size: 100,
25            reconnect_policy: ReconnectPolicy::default(),
26        }
27    }
28}
29
30/// Controls reconnection behaviour when the subscribe stream drops.
31#[derive(Debug, Clone)]
32pub enum ReconnectPolicy {
33    /// No automatic reconnection — errors surface to the stream consumer.
34    None,
35    /// Reconnect automatically, resuming from the last seen replay ID.
36    Auto {
37        /// Maximum number of reconnection attempts before giving up.
38        max_retries: u32,
39        /// Backoff configuration between attempts.
40        backoff: BackoffConfig,
41    },
42}
43
44impl Default for ReconnectPolicy {
45    fn default() -> Self {
46        Self::Auto {
47            max_retries: 5,
48            backoff: BackoffConfig::default(),
49        }
50    }
51}
52
53/// Exponential backoff with configurable parameters.
54#[derive(Debug, Clone)]
55pub struct BackoffConfig {
56    /// Initial delay before the first retry.
57    pub initial_delay: Duration,
58    /// Maximum delay cap.
59    pub max_delay: Duration,
60    /// Multiplier applied after each attempt.
61    pub multiplier: f64,
62}
63
64impl Default for BackoffConfig {
65    fn default() -> Self {
66        Self {
67            initial_delay: Duration::from_millis(500),
68            max_delay: Duration::from_secs(30),
69            multiplier: 2.0,
70        }
71    }
72}
73
74impl BackoffConfig {
75    /// Compute the delay for a given attempt number (0-indexed).
76    #[must_use]
77    #[allow(clippy::cast_possible_truncation)]
78    #[allow(clippy::cast_possible_wrap)]
79    pub fn delay_for(&self, attempt: u32) -> Duration {
80        let exp = attempt.min(63) as i32;
81        let multiplied = self.initial_delay.as_secs_f64() * self.multiplier.powi(exp);
82        let capped = multiplied.min(self.max_delay.as_secs_f64());
83        Duration::from_secs_f64(capped)
84    }
85}
86
87/// Where to start replaying events when subscribing.
88#[derive(Debug, Clone)]
89pub enum ReplayPreset {
90    /// Only events published after subscribing.
91    Latest,
92    /// All events within the 72-hour retention window.
93    Earliest,
94    /// Resume from a specific replay ID.
95    Custom(crate::types::ReplayId),
96}
97
98#[cfg(test)]
99#[allow(clippy::no_effect_underscore_binding)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_default_config() {
105        let config = PubSubConfig::default();
106        assert_eq!(config.endpoint, "https://api.pubsub.salesforce.com:7443");
107        assert_eq!(config.batch_size, 100);
108        assert!(matches!(
109            config.reconnect_policy,
110            ReconnectPolicy::Auto { .. }
111        ));
112    }
113
114    #[test]
115    fn test_default_reconnect_policy() {
116        let policy = ReconnectPolicy::default();
117        match policy {
118            ReconnectPolicy::Auto { max_retries, .. } => assert_eq!(max_retries, 5),
119            ReconnectPolicy::None => panic!("expected Auto"),
120        }
121    }
122
123    #[test]
124    fn test_backoff_delay_for_attempt_0() {
125        let backoff = BackoffConfig::default();
126        assert_eq!(backoff.delay_for(0), Duration::from_millis(500));
127    }
128
129    #[test]
130    fn test_backoff_delay_for_attempt_1() {
131        let backoff = BackoffConfig::default();
132        assert_eq!(backoff.delay_for(1), Duration::from_secs(1));
133    }
134
135    #[test]
136    fn test_backoff_delay_capped_at_max() {
137        let backoff = BackoffConfig::default();
138        let delay = backoff.delay_for(20);
139        assert_eq!(delay, Duration::from_secs(30));
140    }
141
142    #[test]
143    fn test_replay_preset_variants_exist() {
144        let _latest = ReplayPreset::Latest;
145        let _earliest = ReplayPreset::Earliest;
146    }
147}