marketdata_core/websocket/config.rs
1//! WebSocket connection configuration types
2
3use crate::models::AuthRequest;
4use crate::tls::TlsConfig;
5use std::fmt;
6use std::time::Duration;
7
8/// Configuration for WebSocket connection.
9///
10/// `Debug` is implemented manually so embedded credentials cannot leak.
11/// The `auth` field is rendered as `<redacted>` and any query-string
12/// parameter on `url` whose name case-insensitively matches a known
13/// secret key (`token`, `key`, `apikey`, `api_key`, `secret`, `password`)
14/// has its value replaced with `***`.
15#[derive(Clone)]
16pub struct ConnectionConfig {
17 /// WebSocket endpoint URL
18 pub url: String,
19
20 /// Authentication credentials
21 pub auth: AuthRequest,
22
23 /// Connection timeout (default: 30 seconds)
24 pub connect_timeout: Duration,
25
26 /// Read timeout for messages (default: 30 seconds)
27 pub read_timeout: Duration,
28
29 /// Optional TLS customization (custom CA / accept invalid certs).
30 /// Default means "use the OS trust store" — identical to pre-3.0.1
31 /// behaviour.
32 pub tls: TlsConfig,
33
34 /// Capacity of the inbound message channel that backs `messages()` and
35 /// `message_stream()`. Defaults to [`DEFAULT_MESSAGE_BUFFER`]. Use
36 /// [`ConnectionConfigBuilder::message_buffer`] to override.
37 pub message_buffer: usize,
38
39 /// Capacity of the lifecycle event channel that backs `events()` and
40 /// `state_events()`. Defaults to [`DEFAULT_EVENT_BUFFER`]. Use
41 /// [`ConnectionConfigBuilder::event_buffer`] to override.
42 pub event_buffer: usize,
43
44 /// Optional caller-supplied identifier used as a metric label on the
45 /// `metrics` feature. Defaults to `None`. **Low-cardinality only** —
46 /// suitable for deployment / instance / service identifiers, never for
47 /// per-request UUIDs (Prometheus storage explodes on high-cardinality
48 /// labels). Values longer than [`CLIENT_ID_MAX_LEN`] are truncated by
49 /// the builder.
50 pub client_id: Option<String>,
51}
52
53/// Default capacity for the inbound message channel (`message_buffer`).
54///
55/// Sized for multi-symbol consumers (50–200 symbols across all channels at
56/// the TWSE 9:00 open burst, ~2000 msg/s). Bounded mpsc channels in tokio
57/// and std do not pre-allocate, so a higher cap costs nothing at idle and
58/// only manifests memory cost on saturation. At 4096 this provides ~2 s
59/// of headroom at 2000 msg/s before drop-newest backpressure kicks in.
60///
61/// Pre-0.4.0 the cap was hardcoded to 1024. Increased to 4096 in 0.4.0.
62pub const DEFAULT_MESSAGE_BUFFER: usize = 4096;
63
64/// Default capacity for the lifecycle event channel (`event_buffer`).
65///
66/// Lifecycle events fire on the order of one per heartbeat period (30 s)
67/// plus reconnect/error bursts, so 1024 is generous; kept at 1024 for
68/// "no event left behind" safety. May be reduced in a future release once
69/// production telemetry confirms it is consistently underused.
70pub const DEFAULT_EVENT_BUFFER: usize = 1024;
71
72/// Maximum byte length accepted for [`ConnectionConfig::client_id`].
73///
74/// Values supplied to [`ConnectionConfigBuilder::client_id`] longer than this
75/// are truncated to `CLIENT_ID_MAX_LEN` bytes; truncation emits a
76/// `tracing::warn!` (no-op without the `tracing` feature). The cap exists
77/// to bound metric-label cardinality and Prometheus storage cost.
78pub const CLIENT_ID_MAX_LEN: usize = 64;
79
80/// Names whose value should be redacted when seen as a URL query parameter.
81/// Matched case-insensitively.
82const SENSITIVE_QUERY_KEYS: &[&str] =
83 &["token", "key", "apikey", "api_key", "secret", "password"];
84
85/// Return a copy of `url` with sensitive query-parameter values masked as
86/// `***`. Falls back to the original string if `url` is not parseable.
87fn redact_url_query(url: &str) -> String {
88 let mut parsed = match url::Url::parse(url) {
89 Ok(u) => u,
90 Err(_) => return url.to_string(),
91 };
92
93 let original_pairs: Vec<(String, String)> = parsed
94 .query_pairs()
95 .map(|(k, v)| (k.into_owned(), v.into_owned()))
96 .collect();
97
98 if original_pairs.is_empty() {
99 return parsed.to_string();
100 }
101
102 let mut serializer = parsed.query_pairs_mut();
103 serializer.clear();
104 for (k, v) in &original_pairs {
105 let redacted = SENSITIVE_QUERY_KEYS
106 .iter()
107 .any(|s| k.eq_ignore_ascii_case(s));
108 if redacted {
109 serializer.append_pair(k, "***");
110 } else {
111 serializer.append_pair(k, v);
112 }
113 }
114 drop(serializer);
115
116 parsed.to_string()
117}
118
119impl fmt::Debug for ConnectionConfig {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("ConnectionConfig")
122 .field("url", &redact_url_query(&self.url))
123 .field("auth", &"<redacted>")
124 .field("connect_timeout", &self.connect_timeout)
125 .field("read_timeout", &self.read_timeout)
126 .field("tls", &self.tls)
127 .field("message_buffer", &self.message_buffer)
128 .field("event_buffer", &self.event_buffer)
129 .field("client_id", &self.client_id)
130 .finish()
131 }
132}
133
134impl ConnectionConfig {
135 /// Create a new connection configuration
136 pub fn new(url: impl Into<String>, auth: AuthRequest) -> Self {
137 Self {
138 url: url.into(),
139 auth,
140 connect_timeout: Duration::from_secs(30),
141 read_timeout: Duration::from_secs(30),
142 tls: TlsConfig::default(),
143 message_buffer: DEFAULT_MESSAGE_BUFFER,
144 event_buffer: DEFAULT_EVENT_BUFFER,
145 client_id: None,
146 }
147 }
148
149 /// Create a builder for fluent configuration
150 pub fn builder(url: impl Into<String>, auth: AuthRequest) -> ConnectionConfigBuilder {
151 ConnectionConfigBuilder {
152 url: url.into(),
153 auth,
154 connect_timeout: Duration::from_secs(30),
155 read_timeout: Duration::from_secs(30),
156 tls: TlsConfig::default(),
157 message_buffer: DEFAULT_MESSAGE_BUFFER,
158 event_buffer: DEFAULT_EVENT_BUFFER,
159 client_id: None,
160 }
161 }
162
163 /// Caller-supplied identifier used as a metric label, or `None` if
164 /// unset. See [`ConnectionConfigBuilder::client_id`].
165 #[must_use]
166 pub fn client_id(&self) -> Option<&str> {
167 self.client_id.as_deref()
168 }
169
170 /// Create configuration for Fugle stock WebSocket endpoint
171 ///
172 /// # Example
173 ///
174 /// ```rust
175 /// use marketdata_core::websocket::ConnectionConfig;
176 /// use marketdata_core::AuthRequest;
177 ///
178 /// let config = ConnectionConfig::fugle_stock(
179 /// AuthRequest::with_api_key("my-api-key")
180 /// );
181 /// assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/stock/streaming");
182 /// ```
183 pub fn fugle_stock(auth: AuthRequest) -> Self {
184 Self::new(crate::urls::STOCK_WS, auth)
185 }
186
187 /// Create configuration for Fugle futures/options WebSocket endpoint
188 ///
189 /// # Example
190 ///
191 /// ```rust
192 /// use marketdata_core::websocket::ConnectionConfig;
193 /// use marketdata_core::AuthRequest;
194 ///
195 /// let config = ConnectionConfig::fugle_futopt(
196 /// AuthRequest::with_api_key("my-api-key")
197 /// );
198 /// assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/futopt/streaming");
199 /// ```
200 pub fn fugle_futopt(auth: AuthRequest) -> Self {
201 Self::new(crate::urls::FUTOPT_WS, auth)
202 }
203}
204
205/// Builder for ConnectionConfig with fluent API
206pub struct ConnectionConfigBuilder {
207 url: String,
208 auth: AuthRequest,
209 connect_timeout: Duration,
210 read_timeout: Duration,
211 tls: TlsConfig,
212 message_buffer: usize,
213 event_buffer: usize,
214 client_id: Option<String>,
215}
216
217impl ConnectionConfigBuilder {
218 /// Set connection timeout
219 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
220 self.connect_timeout = timeout;
221 self
222 }
223
224 /// Set read timeout
225 pub fn read_timeout(mut self, timeout: Duration) -> Self {
226 self.read_timeout = timeout;
227 self
228 }
229
230 /// Replace the TLS config wholesale
231 pub fn tls(mut self, tls: TlsConfig) -> Self {
232 self.tls = tls;
233 self
234 }
235
236 /// Override the inbound message-channel capacity.
237 ///
238 /// Defaults to [`DEFAULT_MESSAGE_BUFFER`] (4096). The channel uses
239 /// drop-newest backpressure on saturation; tune this when the consumer
240 /// can experience long pauses (e.g. trade peaks while a UI thread is
241 /// blocked) or when subscribing to many high-volume symbols at once.
242 ///
243 /// # Panics
244 ///
245 /// Panics if `cap` is zero — a zero-capacity channel cannot make
246 /// progress and is always a configuration mistake.
247 pub fn message_buffer(mut self, cap: usize) -> Self {
248 assert!(cap > 0, "message_buffer must be greater than zero");
249 self.message_buffer = cap;
250 self
251 }
252
253 /// Override the lifecycle event-channel capacity.
254 ///
255 /// Defaults to [`DEFAULT_EVENT_BUFFER`] (1024). Event volume is
256 /// orders of magnitude lower than message volume; tune only if you
257 /// retain raw events for an extended period without consuming them.
258 ///
259 /// # Panics
260 ///
261 /// Panics if `cap` is zero.
262 pub fn event_buffer(mut self, cap: usize) -> Self {
263 assert!(cap > 0, "event_buffer must be greater than zero");
264 self.event_buffer = cap;
265 self
266 }
267
268 /// Set a low-cardinality identifier used as a `client_id` metric label
269 /// when the `metrics` feature is enabled.
270 ///
271 /// **Cardinality**: pass a deployment, instance, or service identifier
272 /// (e.g. `"monitor-stock-probe"`, `"trader-prod-3"`). Per-request UUIDs
273 /// or any value derived from request data will explode Prometheus
274 /// storage and break the metrics pipeline.
275 ///
276 /// **Length**: values longer than [`CLIENT_ID_MAX_LEN`] are truncated
277 /// to that many bytes; truncation emits `tracing::warn!` so the
278 /// overflow is observable in operational logs.
279 pub fn client_id(mut self, id: impl Into<String>) -> Self {
280 let mut id = id.into();
281 if id.len() > CLIENT_ID_MAX_LEN {
282 // Truncate at a UTF-8 char boundary at or below CLIENT_ID_MAX_LEN
283 // so the resulting string is always valid UTF-8. The tracing
284 // warning fires on the original length so the operator sees the
285 // intended overflow size.
286 let _original_len = id.len();
287 let mut cut = CLIENT_ID_MAX_LEN;
288 while cut > 0 && !id.is_char_boundary(cut) {
289 cut -= 1;
290 }
291 id.truncate(cut);
292 crate::tracing_compat::warn!(
293 target: "fugle_marketdata::config",
294 original_len = _original_len,
295 truncated_len = cut,
296 max_len = CLIENT_ID_MAX_LEN,
297 "client_id exceeded CLIENT_ID_MAX_LEN; truncated"
298 );
299 }
300 self.client_id = Some(id);
301 self
302 }
303
304 /// Convenience for `Option<impl Into<String>>` callers (e.g. forwarding
305 /// from a binding layer that may have an `Option<String>` in hand).
306 /// Equivalent to `client_id(...)` when `Some`; no-op when `None`.
307 pub fn maybe_client_id(self, id: Option<impl Into<String>>) -> Self {
308 match id {
309 Some(id) => self.client_id(id),
310 None => self,
311 }
312 }
313
314 /// Build the configuration
315 pub fn build(self) -> ConnectionConfig {
316 ConnectionConfig {
317 url: self.url,
318 auth: self.auth,
319 connect_timeout: self.connect_timeout,
320 read_timeout: self.read_timeout,
321 tls: self.tls,
322 message_buffer: self.message_buffer,
323 event_buffer: self.event_buffer,
324 client_id: self.client_id,
325 }
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_connection_config_new() {
335 let auth = AuthRequest::with_api_key("test-key");
336 let config = ConnectionConfig::new("wss://example.com", auth);
337
338 assert_eq!(config.url, "wss://example.com");
339 assert_eq!(config.connect_timeout, Duration::from_secs(30));
340 assert_eq!(config.read_timeout, Duration::from_secs(30));
341 }
342
343 #[test]
344 fn test_connection_config_builder() {
345 let auth = AuthRequest::with_api_key("test-key");
346 let config = ConnectionConfig::builder("wss://example.com", auth)
347 .connect_timeout(Duration::from_secs(10))
348 .read_timeout(Duration::from_secs(20))
349 .build();
350
351 assert_eq!(config.url, "wss://example.com");
352 assert_eq!(config.connect_timeout, Duration::from_secs(10));
353 assert_eq!(config.read_timeout, Duration::from_secs(20));
354 }
355
356 #[test]
357 fn test_fugle_stock_config() {
358 let auth = AuthRequest::with_api_key("test-key");
359 let config = ConnectionConfig::fugle_stock(auth);
360
361 assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/stock/streaming");
362 }
363
364 #[test]
365 fn test_fugle_futopt_config() {
366 let auth = AuthRequest::with_api_key("test-key");
367 let config = ConnectionConfig::fugle_futopt(auth);
368
369 assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/futopt/streaming");
370 }
371
372 #[test]
373 fn test_debug_redacts_auth() {
374 let auth = AuthRequest::with_api_key("super-secret-api-key");
375 let config = ConnectionConfig::fugle_stock(auth);
376 let rendered = format!("{:?}", config);
377
378 assert!(rendered.contains("auth: \"<redacted>\""));
379 assert!(!rendered.contains("super-secret-api-key"));
380 }
381
382 #[test]
383 fn test_debug_redacts_url_query_token() {
384 let auth = AuthRequest::with_api_key("k");
385 let config =
386 ConnectionConfig::new("wss://example.com/stream?token=secret&v=1", auth);
387 let rendered = format!("{:?}", config);
388
389 assert!(rendered.contains("token=***"));
390 assert!(rendered.contains("v=1"));
391 assert!(!rendered.contains("token=secret"));
392 }
393
394 #[test]
395 fn test_debug_redacts_multiple_sensitive_keys() {
396 let auth = AuthRequest::with_api_key("k");
397 let config = ConnectionConfig::new(
398 "wss://example.com/stream?api_key=AAA&secret=BBB&safe=CCC",
399 auth,
400 );
401 let rendered = format!("{:?}", config);
402
403 assert!(rendered.contains("api_key=***"));
404 assert!(rendered.contains("secret=***"));
405 assert!(rendered.contains("safe=CCC"));
406 assert!(!rendered.contains("AAA"));
407 assert!(!rendered.contains("BBB"));
408 }
409
410 #[test]
411 fn test_debug_handles_unparseable_url() {
412 let auth = AuthRequest::with_api_key("k");
413 let config = ConnectionConfig::new("not a real url", auth);
414 let rendered = format!("{:?}", config);
415 assert!(rendered.contains("not a real url"));
416 }
417}