Skip to main content

deribit_websocket/
config.rs

1//! Configuration for WebSocket client
2
3use std::env;
4use std::time::Duration;
5use url::Url;
6
7use crate::constants;
8
9/// WebSocket client configuration
10#[derive(Debug, Clone)]
11pub struct WebSocketConfig {
12    /// WebSocket URL
13    pub ws_url: Url,
14    /// Heartbeat interval
15    pub heartbeat_interval: Duration,
16    /// Maximum reconnection attempts
17    pub max_reconnect_attempts: u32,
18    /// Reconnection delay
19    pub reconnect_delay: Duration,
20    /// Connection timeout
21    pub connection_timeout: Duration,
22    /// Enable logging
23    pub enable_logging: bool,
24    /// Log level
25    pub log_level: String,
26    /// Test mode
27    pub test_mode: bool,
28    /// Client ID for authentication
29    pub client_id: Option<String>,
30    /// Client secret for authentication
31    pub client_secret: Option<String>,
32    /// Per-request timeout for [`crate::client::DeribitWebSocketClient::send_request`].
33    ///
34    /// Every call that awaits a matching JSON-RPC response is bounded by
35    /// this duration. If the response does not arrive in time, the call
36    /// returns [`crate::error::WebSocketError::Timeout`].
37    pub request_timeout: Duration,
38    /// Notification channel capacity (frames buffered for the consumer).
39    ///
40    /// Depth of the bounded `tokio::sync::mpsc` that carries server-pushed
41    /// notifications (and any unmatched frames) from the dispatcher task
42    /// to [`crate::client::DeribitWebSocketClient::receive_message`] /
43    /// `start_message_processing_loop`.
44    ///
45    /// # Backpressure — Strategy A (await-send)
46    ///
47    /// When the channel is full the dispatcher task blocks on
48    /// `send().await`; it therefore stops polling the WebSocket stream,
49    /// the TCP recv buffer fills, and the Deribit server applies flow
50    /// control. **No frames are dropped due to backpressure; if the
51    /// notification receiver has been closed (for example during
52    /// shutdown or disconnect), the affected frames are discarded.**
53    /// Every full-channel event emits a `tracing::warn!` with the
54    /// channel capacity so slow consumers are visible in logs.
55    ///
56    /// Sizing: the default of `1024` is sufficient for normal liquid
57    /// instruments. Raise it when the consumer performs heavy synchronous
58    /// work between `next_notification` calls; lower it to tighten
59    /// end-to-end memory bounds at the cost of more frequent
60    /// backpressure warnings.
61    pub notification_channel_capacity: usize,
62    /// Dispatcher command channel capacity (in-flight outbound commands).
63    ///
64    /// Depth of the bounded `tokio::sync::mpsc` that carries outbound
65    /// commands (request sends, cancel-request on timeout, shutdown)
66    /// from callers to the dispatcher task.
67    ///
68    /// # Backpressure — Strategy A (await-send)
69    ///
70    /// When the channel is full, callers of
71    /// [`crate::client::DeribitWebSocketClient::send_request`] /
72    /// [`crate::client::DeribitWebSocketClient::disconnect`] block on `send().await`
73    /// until the dispatcher drains a slot. Blocking here means the
74    /// application is issuing requests faster than the dispatcher can
75    /// write them to the socket; the `request_timeout` bound on
76    /// `send_request` still applies, so the caller sees a
77    /// [`crate::error::WebSocketError::Timeout`] if the deadline elapses while
78    /// waiting on the command channel.
79    pub dispatcher_command_capacity: usize,
80}
81
82impl Default for WebSocketConfig {
83    fn default() -> Self {
84        Self::try_new().unwrap_or_else(|_| {
85            // Reached only when a user-supplied `DERIBIT_WS_URL` is invalid.
86            // Fall back to the compile-time constant
87            // [`constants::PRODUCTION_WS_URL`], whose parse-ability is locked
88            // by the `test_production_ws_url_parses` unit test — making this
89            // branch unreachable in practice.
90            #[allow(
91                clippy::expect_used,
92                reason = "PRODUCTION_WS_URL is a compile-time constant validated by test_production_ws_url_parses"
93            )]
94            let ws_url = Url::parse(constants::PRODUCTION_WS_URL)
95                .expect("PRODUCTION_WS_URL is a compile-time constant validated by tests");
96            Self::from_parts(ws_url)
97        })
98    }
99}
100
101impl WebSocketConfig {
102    /// Construct a configuration from environment variables, propagating
103    /// parse errors for any user-supplied URL.
104    ///
105    /// Loads `.env` once via `Self::load_env`, reads `DERIBIT_WS_URL`
106    /// (falling back to [`constants::PRODUCTION_WS_URL`] when unset), and
107    /// parses it. All other fields follow the same env-or-default strategy as
108    /// [`Default`] but never fail.
109    ///
110    /// Prefer this over [`Default::default`] when the caller needs to surface
111    /// an invalid `DERIBIT_WS_URL` as an error instead of silently falling
112    /// back to the production URL.
113    ///
114    /// # Errors
115    ///
116    /// Returns [`url::ParseError`] when `DERIBIT_WS_URL` is set to a value
117    /// that cannot be parsed as a URL.
118    pub fn try_new() -> Result<Self, url::ParseError> {
119        Self::load_env();
120        let ws_url_str =
121            env::var("DERIBIT_WS_URL").unwrap_or_else(|_| constants::PRODUCTION_WS_URL.to_string());
122        let ws_url = Url::parse(&ws_url_str)?;
123        Ok(Self::from_parts(ws_url))
124    }
125
126    /// Create a new configuration with a custom URL.
127    ///
128    /// Non-URL fields are populated from environment variables using the
129    /// same rules as [`Default`]; only the URL is overridden. `.env` is
130    /// loaded once via `Self::load_env` before any env var is read.
131    ///
132    /// # Errors
133    ///
134    /// Returns [`url::ParseError`] when `url` cannot be parsed.
135    pub fn with_url(url: &str) -> Result<Self, url::ParseError> {
136        Self::load_env();
137        let ws_url = Url::parse(url)?;
138        Ok(Self::from_parts(ws_url))
139    }
140
141    /// Centralised `.env` loader for every public constructor.
142    ///
143    /// Idempotent and harmless when called multiple times per process. Every
144    /// public entry point ([`Self::try_new`], [`Self::with_url`], and
145    /// [`Default::default`] via `try_new`) calls this exactly once before
146    /// reading any env var, so [`Self::from_parts`] can assume the environment
147    /// is already loaded.
148    fn load_env() {
149        let _ = dotenv::dotenv();
150    }
151
152    /// Private helper: populate every field except `ws_url` from environment
153    /// variables (with sensible defaults) and combine them with the given URL.
154    ///
155    /// The caller is responsible for calling `Self::load_env` beforehand so
156    /// that `.env` overrides are visible to `std::env::var`. Every public
157    /// constructor satisfies this invariant.
158    fn from_parts(ws_url: Url) -> Self {
159        let heartbeat_interval = env::var("DERIBIT_HEARTBEAT_INTERVAL")
160            .ok()
161            .and_then(|s| s.parse().ok())
162            .map(Duration::from_secs)
163            .unwrap_or_else(|| Duration::from_secs(30));
164
165        let max_reconnect_attempts = env::var("DERIBIT_RECONNECT_ATTEMPTS")
166            .ok()
167            .and_then(|s| s.parse().ok())
168            .unwrap_or(3);
169
170        let reconnect_delay = env::var("DERIBIT_RECONNECT_DELAY")
171            .ok()
172            .and_then(|s| s.parse().ok())
173            .map(Duration::from_secs)
174            .unwrap_or_else(|| Duration::from_secs(5));
175
176        let connection_timeout = env::var("DERIBIT_CONNECTION_TIMEOUT")
177            .ok()
178            .and_then(|s| s.parse().ok())
179            .map(Duration::from_secs)
180            .unwrap_or_else(|| Duration::from_secs(10));
181
182        let enable_logging = env::var("DERIBIT_ENABLE_LOGGING")
183            .ok()
184            .and_then(|s| s.parse().ok())
185            .unwrap_or(true);
186
187        let log_level = env::var("DERIBIT_LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
188
189        let test_mode = env::var("DERIBIT_TEST_MODE")
190            .ok()
191            .and_then(|s| s.parse().ok())
192            .unwrap_or(false);
193
194        let client_id = env::var("DERIBIT_CLIENT_ID").ok();
195        let client_secret = env::var("DERIBIT_CLIENT_SECRET").ok();
196
197        let request_timeout = env::var("DERIBIT_REQUEST_TIMEOUT")
198            .ok()
199            .and_then(|s| s.parse().ok())
200            .map(Duration::from_secs)
201            .unwrap_or_else(|| Duration::from_secs(30));
202
203        let notification_channel_capacity = env::var("DERIBIT_NOTIFICATION_CAPACITY")
204            .ok()
205            .and_then(|s| s.parse().ok())
206            .unwrap_or(1024);
207
208        let dispatcher_command_capacity = env::var("DERIBIT_DISPATCHER_CAPACITY")
209            .ok()
210            .and_then(|s| s.parse().ok())
211            .unwrap_or(64);
212
213        Self {
214            ws_url,
215            heartbeat_interval,
216            max_reconnect_attempts,
217            reconnect_delay,
218            connection_timeout,
219            enable_logging,
220            log_level,
221            test_mode,
222            client_id,
223            client_secret,
224            request_timeout,
225            notification_channel_capacity,
226            dispatcher_command_capacity,
227        }
228    }
229
230    /// Set heartbeat interval
231    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
232        self.heartbeat_interval = interval;
233        self
234    }
235
236    /// Set maximum reconnection attempts
237    pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
238        self.max_reconnect_attempts = attempts;
239        self
240    }
241
242    /// Set reconnection delay
243    pub fn with_reconnect_delay(mut self, delay: Duration) -> Self {
244        self.reconnect_delay = delay;
245        self
246    }
247
248    /// Set connection timeout
249    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
250        self.connection_timeout = timeout;
251        self
252    }
253
254    /// Set client credentials
255    pub fn with_credentials(mut self, client_id: String, client_secret: String) -> Self {
256        self.client_id = Some(client_id);
257        self.client_secret = Some(client_secret);
258        self
259    }
260
261    /// Set client ID
262    pub fn with_client_id(mut self, client_id: String) -> Self {
263        self.client_id = Some(client_id);
264        self
265    }
266
267    /// Set client secret
268    pub fn with_client_secret(mut self, client_secret: String) -> Self {
269        self.client_secret = Some(client_secret);
270        self
271    }
272
273    /// Enable or disable logging
274    pub fn with_logging(mut self, enable: bool) -> Self {
275        self.enable_logging = enable;
276        self
277    }
278
279    /// Set log level
280    pub fn with_log_level(mut self, level: String) -> Self {
281        self.log_level = level;
282        self
283    }
284
285    /// Set test mode
286    pub fn with_test_mode(mut self, test_mode: bool) -> Self {
287        self.test_mode = test_mode;
288        self
289    }
290
291    /// Check if credentials are available
292    pub fn has_credentials(&self) -> bool {
293        self.client_id.is_some() && self.client_secret.is_some()
294    }
295
296    /// Get client credentials as tuple
297    pub fn get_credentials(&self) -> Option<(&str, &str)> {
298        match (&self.client_id, &self.client_secret) {
299            (Some(id), Some(secret)) => Some((id, secret)),
300            _ => None,
301        }
302    }
303
304    /// Set the per-request timeout awaiting a matching JSON-RPC response.
305    #[must_use]
306    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
307        self.request_timeout = timeout;
308        self
309    }
310
311    /// Set the notification channel capacity.
312    ///
313    /// This bounds the number of server-pushed frames buffered between the
314    /// dispatcher task and the consumer.
315    #[must_use]
316    pub fn with_notification_channel_capacity(mut self, capacity: usize) -> Self {
317        self.notification_channel_capacity = capacity;
318        self
319    }
320
321    /// Set the dispatcher command channel capacity.
322    ///
323    /// Caps the number of outbound commands queued waiting for the
324    /// dispatcher task to process them.
325    #[must_use]
326    pub fn with_dispatcher_command_capacity(mut self, capacity: usize) -> Self {
327        self.dispatcher_command_capacity = capacity;
328        self
329    }
330}