deribit_websocket/config.rs
1//! Configuration for WebSocket client
2
3use std::env;
4use std::time::Duration;
5use url::Url;
6
7use crate::constants;
8
9/// WebSocket client configuration
10#[derive(Debug, Clone)]
11pub struct WebSocketConfig {
12 /// WebSocket URL
13 pub ws_url: Url,
14 /// Heartbeat interval
15 pub heartbeat_interval: Duration,
16 /// Maximum reconnection attempts
17 pub max_reconnect_attempts: u32,
18 /// Reconnection delay
19 pub reconnect_delay: Duration,
20 /// Connection timeout
21 pub connection_timeout: Duration,
22 /// Enable logging
23 pub enable_logging: bool,
24 /// Log level
25 pub log_level: String,
26 /// Test mode
27 pub test_mode: bool,
28 /// Client ID for authentication
29 pub client_id: Option<String>,
30 /// Client secret for authentication
31 pub client_secret: Option<String>,
32 /// Per-request timeout for [`crate::client::DeribitWebSocketClient::send_request`].
33 ///
34 /// Every call that awaits a matching JSON-RPC response is bounded by
35 /// this duration. If the response does not arrive in time, the call
36 /// returns [`crate::error::WebSocketError::Timeout`].
37 pub request_timeout: Duration,
38 /// Notification channel capacity (frames buffered for the consumer).
39 ///
40 /// Depth of the bounded `tokio::sync::mpsc` that carries server-pushed
41 /// notifications (and any unmatched frames) from the dispatcher task
42 /// to [`crate::client::DeribitWebSocketClient::receive_message`] /
43 /// `start_message_processing_loop`.
44 ///
45 /// # Backpressure — Strategy A (await-send)
46 ///
47 /// When the channel is full the dispatcher task blocks on
48 /// `send().await`; it therefore stops polling the WebSocket stream,
49 /// the TCP recv buffer fills, and the Deribit server applies flow
50 /// control. **No frames are dropped due to backpressure; if the
51 /// notification receiver has been closed (for example during
52 /// shutdown or disconnect), the affected frames are discarded.**
53 /// Every full-channel event emits a `tracing::warn!` with the
54 /// channel capacity so slow consumers are visible in logs.
55 ///
56 /// Sizing: the default of `1024` is sufficient for normal liquid
57 /// instruments. Raise it when the consumer performs heavy synchronous
58 /// work between `next_notification` calls; lower it to tighten
59 /// end-to-end memory bounds at the cost of more frequent
60 /// backpressure warnings.
61 pub notification_channel_capacity: usize,
62 /// Dispatcher command channel capacity (in-flight outbound commands).
63 ///
64 /// Depth of the bounded `tokio::sync::mpsc` that carries outbound
65 /// commands (request sends, cancel-request on timeout, shutdown)
66 /// from callers to the dispatcher task.
67 ///
68 /// # Backpressure — Strategy A (await-send)
69 ///
70 /// When the channel is full, callers of
71 /// [`crate::client::DeribitWebSocketClient::send_request`] /
72 /// [`crate::client::DeribitWebSocketClient::disconnect`] block on `send().await`
73 /// until the dispatcher drains a slot. Blocking here means the
74 /// application is issuing requests faster than the dispatcher can
75 /// write them to the socket; the `request_timeout` bound on
76 /// `send_request` still applies, so the caller sees a
77 /// [`crate::error::WebSocketError::Timeout`] if the deadline elapses while
78 /// waiting on the command channel.
79 pub dispatcher_command_capacity: usize,
80}
81
82impl Default for WebSocketConfig {
83 fn default() -> Self {
84 Self::try_new().unwrap_or_else(|_| {
85 // Reached only when a user-supplied `DERIBIT_WS_URL` is invalid.
86 // Fall back to the compile-time constant
87 // [`constants::PRODUCTION_WS_URL`], whose parse-ability is locked
88 // by the `test_production_ws_url_parses` unit test — making this
89 // branch unreachable in practice.
90 #[allow(
91 clippy::expect_used,
92 reason = "PRODUCTION_WS_URL is a compile-time constant validated by test_production_ws_url_parses"
93 )]
94 let ws_url = Url::parse(constants::PRODUCTION_WS_URL)
95 .expect("PRODUCTION_WS_URL is a compile-time constant validated by tests");
96 Self::from_parts(ws_url)
97 })
98 }
99}
100
101impl WebSocketConfig {
102 /// Construct a configuration from environment variables, propagating
103 /// parse errors for any user-supplied URL.
104 ///
105 /// Loads `.env` once via `Self::load_env`, reads `DERIBIT_WS_URL`
106 /// (falling back to [`constants::PRODUCTION_WS_URL`] when unset), and
107 /// parses it. All other fields follow the same env-or-default strategy as
108 /// [`Default`] but never fail.
109 ///
110 /// Prefer this over [`Default::default`] when the caller needs to surface
111 /// an invalid `DERIBIT_WS_URL` as an error instead of silently falling
112 /// back to the production URL.
113 ///
114 /// # Errors
115 ///
116 /// Returns [`url::ParseError`] when `DERIBIT_WS_URL` is set to a value
117 /// that cannot be parsed as a URL.
118 pub fn try_new() -> Result<Self, url::ParseError> {
119 Self::load_env();
120 let ws_url_str =
121 env::var("DERIBIT_WS_URL").unwrap_or_else(|_| constants::PRODUCTION_WS_URL.to_string());
122 let ws_url = Url::parse(&ws_url_str)?;
123 Ok(Self::from_parts(ws_url))
124 }
125
126 /// Create a new configuration with a custom URL.
127 ///
128 /// Non-URL fields are populated from environment variables using the
129 /// same rules as [`Default`]; only the URL is overridden. `.env` is
130 /// loaded once via `Self::load_env` before any env var is read.
131 ///
132 /// # Errors
133 ///
134 /// Returns [`url::ParseError`] when `url` cannot be parsed.
135 pub fn with_url(url: &str) -> Result<Self, url::ParseError> {
136 Self::load_env();
137 let ws_url = Url::parse(url)?;
138 Ok(Self::from_parts(ws_url))
139 }
140
141 /// Centralised `.env` loader for every public constructor.
142 ///
143 /// Idempotent and harmless when called multiple times per process. Every
144 /// public entry point ([`Self::try_new`], [`Self::with_url`], and
145 /// [`Default::default`] via `try_new`) calls this exactly once before
146 /// reading any env var, so [`Self::from_parts`] can assume the environment
147 /// is already loaded.
148 fn load_env() {
149 let _ = dotenv::dotenv();
150 }
151
152 /// Private helper: populate every field except `ws_url` from environment
153 /// variables (with sensible defaults) and combine them with the given URL.
154 ///
155 /// The caller is responsible for calling `Self::load_env` beforehand so
156 /// that `.env` overrides are visible to `std::env::var`. Every public
157 /// constructor satisfies this invariant.
158 fn from_parts(ws_url: Url) -> Self {
159 let heartbeat_interval = env::var("DERIBIT_HEARTBEAT_INTERVAL")
160 .ok()
161 .and_then(|s| s.parse().ok())
162 .map(Duration::from_secs)
163 .unwrap_or_else(|| Duration::from_secs(30));
164
165 let max_reconnect_attempts = env::var("DERIBIT_RECONNECT_ATTEMPTS")
166 .ok()
167 .and_then(|s| s.parse().ok())
168 .unwrap_or(3);
169
170 let reconnect_delay = env::var("DERIBIT_RECONNECT_DELAY")
171 .ok()
172 .and_then(|s| s.parse().ok())
173 .map(Duration::from_secs)
174 .unwrap_or_else(|| Duration::from_secs(5));
175
176 let connection_timeout = env::var("DERIBIT_CONNECTION_TIMEOUT")
177 .ok()
178 .and_then(|s| s.parse().ok())
179 .map(Duration::from_secs)
180 .unwrap_or_else(|| Duration::from_secs(10));
181
182 let enable_logging = env::var("DERIBIT_ENABLE_LOGGING")
183 .ok()
184 .and_then(|s| s.parse().ok())
185 .unwrap_or(true);
186
187 let log_level = env::var("DERIBIT_LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
188
189 let test_mode = env::var("DERIBIT_TEST_MODE")
190 .ok()
191 .and_then(|s| s.parse().ok())
192 .unwrap_or(false);
193
194 let client_id = env::var("DERIBIT_CLIENT_ID").ok();
195 let client_secret = env::var("DERIBIT_CLIENT_SECRET").ok();
196
197 let request_timeout = env::var("DERIBIT_REQUEST_TIMEOUT")
198 .ok()
199 .and_then(|s| s.parse().ok())
200 .map(Duration::from_secs)
201 .unwrap_or_else(|| Duration::from_secs(30));
202
203 let notification_channel_capacity = env::var("DERIBIT_NOTIFICATION_CAPACITY")
204 .ok()
205 .and_then(|s| s.parse().ok())
206 .unwrap_or(1024);
207
208 let dispatcher_command_capacity = env::var("DERIBIT_DISPATCHER_CAPACITY")
209 .ok()
210 .and_then(|s| s.parse().ok())
211 .unwrap_or(64);
212
213 Self {
214 ws_url,
215 heartbeat_interval,
216 max_reconnect_attempts,
217 reconnect_delay,
218 connection_timeout,
219 enable_logging,
220 log_level,
221 test_mode,
222 client_id,
223 client_secret,
224 request_timeout,
225 notification_channel_capacity,
226 dispatcher_command_capacity,
227 }
228 }
229
230 /// Set heartbeat interval
231 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
232 self.heartbeat_interval = interval;
233 self
234 }
235
236 /// Set maximum reconnection attempts
237 pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
238 self.max_reconnect_attempts = attempts;
239 self
240 }
241
242 /// Set reconnection delay
243 pub fn with_reconnect_delay(mut self, delay: Duration) -> Self {
244 self.reconnect_delay = delay;
245 self
246 }
247
248 /// Set connection timeout
249 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
250 self.connection_timeout = timeout;
251 self
252 }
253
254 /// Set client credentials
255 pub fn with_credentials(mut self, client_id: String, client_secret: String) -> Self {
256 self.client_id = Some(client_id);
257 self.client_secret = Some(client_secret);
258 self
259 }
260
261 /// Set client ID
262 pub fn with_client_id(mut self, client_id: String) -> Self {
263 self.client_id = Some(client_id);
264 self
265 }
266
267 /// Set client secret
268 pub fn with_client_secret(mut self, client_secret: String) -> Self {
269 self.client_secret = Some(client_secret);
270 self
271 }
272
273 /// Enable or disable logging
274 pub fn with_logging(mut self, enable: bool) -> Self {
275 self.enable_logging = enable;
276 self
277 }
278
279 /// Set log level
280 pub fn with_log_level(mut self, level: String) -> Self {
281 self.log_level = level;
282 self
283 }
284
285 /// Set test mode
286 pub fn with_test_mode(mut self, test_mode: bool) -> Self {
287 self.test_mode = test_mode;
288 self
289 }
290
291 /// Check if credentials are available
292 pub fn has_credentials(&self) -> bool {
293 self.client_id.is_some() && self.client_secret.is_some()
294 }
295
296 /// Get client credentials as tuple
297 pub fn get_credentials(&self) -> Option<(&str, &str)> {
298 match (&self.client_id, &self.client_secret) {
299 (Some(id), Some(secret)) => Some((id, secret)),
300 _ => None,
301 }
302 }
303
304 /// Set the per-request timeout awaiting a matching JSON-RPC response.
305 #[must_use]
306 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
307 self.request_timeout = timeout;
308 self
309 }
310
311 /// Set the notification channel capacity.
312 ///
313 /// This bounds the number of server-pushed frames buffered between the
314 /// dispatcher task and the consumer.
315 #[must_use]
316 pub fn with_notification_channel_capacity(mut self, capacity: usize) -> Self {
317 self.notification_channel_capacity = capacity;
318 self
319 }
320
321 /// Set the dispatcher command channel capacity.
322 ///
323 /// Caps the number of outbound commands queued waiting for the
324 /// dispatcher task to process them.
325 #[must_use]
326 pub fn with_dispatcher_command_capacity(mut self, capacity: usize) -> Self {
327 self.dispatcher_command_capacity = capacity;
328 self
329 }
330}