Skip to main content

marketdata_core/websocket/
config.rs

1//! WebSocket connection configuration types
2
3use crate::models::AuthRequest;
4use crate::tls::TlsConfig;
5use std::fmt;
6use std::time::Duration;
7
8/// Configuration for WebSocket connection.
9///
10/// `Debug` is implemented manually so embedded credentials cannot leak.
11/// The `auth` field is rendered as `<redacted>` and any query-string
12/// parameter on `url` whose name case-insensitively matches a known
13/// secret key (`token`, `key`, `apikey`, `api_key`, `secret`, `password`)
14/// has its value replaced with `***`.
15#[derive(Clone)]
16pub struct ConnectionConfig {
17    /// WebSocket endpoint URL
18    pub url: String,
19
20    /// Authentication credentials
21    pub auth: AuthRequest,
22
23    /// Connection timeout (default: 30 seconds)
24    pub connect_timeout: Duration,
25
26    /// Read timeout for messages (default: 30 seconds)
27    pub read_timeout: Duration,
28
29    /// Optional TLS customization (custom CA / accept invalid certs).
30    /// Default means "use the OS trust store" — identical to pre-3.0.1
31    /// behaviour.
32    pub tls: TlsConfig,
33
34    /// Capacity of the inbound message channel that backs `messages()` and
35    /// `message_stream()`. Defaults to [`DEFAULT_MESSAGE_BUFFER`]. Use
36    /// [`ConnectionConfigBuilder::message_buffer`] to override.
37    pub message_buffer: usize,
38
39    /// Capacity of the lifecycle event channel that backs `events()` and
40    /// `state_events()`. Defaults to [`DEFAULT_EVENT_BUFFER`]. Use
41    /// [`ConnectionConfigBuilder::event_buffer`] to override.
42    pub event_buffer: usize,
43
44    /// Optional caller-supplied identifier used as a metric label on the
45    /// `metrics` feature. Defaults to `None`. **Low-cardinality only** —
46    /// suitable for deployment / instance / service identifiers, never for
47    /// per-request UUIDs (Prometheus storage explodes on high-cardinality
48    /// labels). Values longer than [`CLIENT_ID_MAX_LEN`] are truncated by
49    /// the builder.
50    pub client_id: Option<String>,
51}
52
53/// Default capacity for the inbound message channel (`message_buffer`).
54///
55/// Sized for multi-symbol consumers (50–200 symbols across all channels at
56/// the TWSE 9:00 open burst, ~2000 msg/s). Bounded mpsc channels in tokio
57/// and std do not pre-allocate, so a higher cap costs nothing at idle and
58/// only manifests memory cost on saturation. At 4096 this provides ~2 s
59/// of headroom at 2000 msg/s before drop-newest backpressure kicks in.
60///
61/// Pre-0.4.0 the cap was hardcoded to 1024. Increased to 4096 in 0.4.0.
62pub const DEFAULT_MESSAGE_BUFFER: usize = 4096;
63
64/// Default capacity for the lifecycle event channel (`event_buffer`).
65///
66/// Lifecycle events fire on the order of one per heartbeat period (30 s)
67/// plus reconnect/error bursts, so 1024 is generous; kept at 1024 for
68/// "no event left behind" safety. May be reduced in a future release once
69/// production telemetry confirms it is consistently underused.
70pub const DEFAULT_EVENT_BUFFER: usize = 1024;
71
72/// Maximum byte length accepted for [`ConnectionConfig::client_id`].
73///
74/// Values supplied to [`ConnectionConfigBuilder::client_id`] longer than this
75/// are truncated to `CLIENT_ID_MAX_LEN` bytes; truncation emits a
76/// `tracing::warn!` (no-op without the `tracing` feature). The cap exists
77/// to bound metric-label cardinality and Prometheus storage cost.
78pub const CLIENT_ID_MAX_LEN: usize = 64;
79
80/// Names whose value should be redacted when seen as a URL query parameter.
81/// Matched case-insensitively.
82const SENSITIVE_QUERY_KEYS: &[&str] =
83    &["token", "key", "apikey", "api_key", "secret", "password"];
84
85/// Return a copy of `url` with sensitive query-parameter values masked as
86/// `***`. Falls back to the original string if `url` is not parseable.
87fn redact_url_query(url: &str) -> String {
88    let mut parsed = match url::Url::parse(url) {
89        Ok(u) => u,
90        Err(_) => return url.to_string(),
91    };
92
93    let original_pairs: Vec<(String, String)> = parsed
94        .query_pairs()
95        .map(|(k, v)| (k.into_owned(), v.into_owned()))
96        .collect();
97
98    if original_pairs.is_empty() {
99        return parsed.to_string();
100    }
101
102    let mut serializer = parsed.query_pairs_mut();
103    serializer.clear();
104    for (k, v) in &original_pairs {
105        let redacted = SENSITIVE_QUERY_KEYS
106            .iter()
107            .any(|s| k.eq_ignore_ascii_case(s));
108        if redacted {
109            serializer.append_pair(k, "***");
110        } else {
111            serializer.append_pair(k, v);
112        }
113    }
114    drop(serializer);
115
116    parsed.to_string()
117}
118
119impl fmt::Debug for ConnectionConfig {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_struct("ConnectionConfig")
122            .field("url", &redact_url_query(&self.url))
123            .field("auth", &"<redacted>")
124            .field("connect_timeout", &self.connect_timeout)
125            .field("read_timeout", &self.read_timeout)
126            .field("tls", &self.tls)
127            .field("message_buffer", &self.message_buffer)
128            .field("event_buffer", &self.event_buffer)
129            .field("client_id", &self.client_id)
130            .finish()
131    }
132}
133
134impl ConnectionConfig {
135    /// Create a new connection configuration
136    pub fn new(url: impl Into<String>, auth: AuthRequest) -> Self {
137        Self {
138            url: url.into(),
139            auth,
140            connect_timeout: Duration::from_secs(30),
141            read_timeout: Duration::from_secs(30),
142            tls: TlsConfig::default(),
143            message_buffer: DEFAULT_MESSAGE_BUFFER,
144            event_buffer: DEFAULT_EVENT_BUFFER,
145            client_id: None,
146        }
147    }
148
149    /// Create a builder for fluent configuration
150    pub fn builder(url: impl Into<String>, auth: AuthRequest) -> ConnectionConfigBuilder {
151        ConnectionConfigBuilder {
152            url: url.into(),
153            auth,
154            connect_timeout: Duration::from_secs(30),
155            read_timeout: Duration::from_secs(30),
156            tls: TlsConfig::default(),
157            message_buffer: DEFAULT_MESSAGE_BUFFER,
158            event_buffer: DEFAULT_EVENT_BUFFER,
159            client_id: None,
160        }
161    }
162
163    /// Caller-supplied identifier used as a metric label, or `None` if
164    /// unset. See [`ConnectionConfigBuilder::client_id`].
165    #[must_use]
166    pub fn client_id(&self) -> Option<&str> {
167        self.client_id.as_deref()
168    }
169
170    /// Create configuration for Fugle stock WebSocket endpoint
171    ///
172    /// # Example
173    ///
174    /// ```rust
175    /// use marketdata_core::websocket::ConnectionConfig;
176    /// use marketdata_core::AuthRequest;
177    ///
178    /// let config = ConnectionConfig::fugle_stock(
179    ///     AuthRequest::with_api_key("my-api-key")
180    /// );
181    /// assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/stock/streaming");
182    /// ```
183    pub fn fugle_stock(auth: AuthRequest) -> Self {
184        Self::new(crate::urls::STOCK_WS, auth)
185    }
186
187    /// Create configuration for Fugle futures/options WebSocket endpoint
188    ///
189    /// # Example
190    ///
191    /// ```rust
192    /// use marketdata_core::websocket::ConnectionConfig;
193    /// use marketdata_core::AuthRequest;
194    ///
195    /// let config = ConnectionConfig::fugle_futopt(
196    ///     AuthRequest::with_api_key("my-api-key")
197    /// );
198    /// assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/futopt/streaming");
199    /// ```
200    pub fn fugle_futopt(auth: AuthRequest) -> Self {
201        Self::new(crate::urls::FUTOPT_WS, auth)
202    }
203}
204
205/// Builder for ConnectionConfig with fluent API
206pub struct ConnectionConfigBuilder {
207    url: String,
208    auth: AuthRequest,
209    connect_timeout: Duration,
210    read_timeout: Duration,
211    tls: TlsConfig,
212    message_buffer: usize,
213    event_buffer: usize,
214    client_id: Option<String>,
215}
216
217impl ConnectionConfigBuilder {
218    /// Set connection timeout
219    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
220        self.connect_timeout = timeout;
221        self
222    }
223
224    /// Set read timeout
225    pub fn read_timeout(mut self, timeout: Duration) -> Self {
226        self.read_timeout = timeout;
227        self
228    }
229
230    /// Replace the TLS config wholesale
231    pub fn tls(mut self, tls: TlsConfig) -> Self {
232        self.tls = tls;
233        self
234    }
235
236    /// Override the inbound message-channel capacity.
237    ///
238    /// Defaults to [`DEFAULT_MESSAGE_BUFFER`] (4096). The channel uses
239    /// drop-newest backpressure on saturation; tune this when the consumer
240    /// can experience long pauses (e.g. trade peaks while a UI thread is
241    /// blocked) or when subscribing to many high-volume symbols at once.
242    ///
243    /// # Panics
244    ///
245    /// Panics if `cap` is zero — a zero-capacity channel cannot make
246    /// progress and is always a configuration mistake.
247    pub fn message_buffer(mut self, cap: usize) -> Self {
248        assert!(cap > 0, "message_buffer must be greater than zero");
249        self.message_buffer = cap;
250        self
251    }
252
253    /// Override the lifecycle event-channel capacity.
254    ///
255    /// Defaults to [`DEFAULT_EVENT_BUFFER`] (1024). Event volume is
256    /// orders of magnitude lower than message volume; tune only if you
257    /// retain raw events for an extended period without consuming them.
258    ///
259    /// # Panics
260    ///
261    /// Panics if `cap` is zero.
262    pub fn event_buffer(mut self, cap: usize) -> Self {
263        assert!(cap > 0, "event_buffer must be greater than zero");
264        self.event_buffer = cap;
265        self
266    }
267
268    /// Set a low-cardinality identifier used as a `client_id` metric label
269    /// when the `metrics` feature is enabled.
270    ///
271    /// **Cardinality**: pass a deployment, instance, or service identifier
272    /// (e.g. `"monitor-stock-probe"`, `"trader-prod-3"`). Per-request UUIDs
273    /// or any value derived from request data will explode Prometheus
274    /// storage and break the metrics pipeline.
275    ///
276    /// **Length**: values longer than [`CLIENT_ID_MAX_LEN`] are truncated
277    /// to that many bytes; truncation emits `tracing::warn!` so the
278    /// overflow is observable in operational logs.
279    pub fn client_id(mut self, id: impl Into<String>) -> Self {
280        let mut id = id.into();
281        if id.len() > CLIENT_ID_MAX_LEN {
282            // Truncate at a UTF-8 char boundary at or below CLIENT_ID_MAX_LEN
283            // so the resulting string is always valid UTF-8. The tracing
284            // warning fires on the original length so the operator sees the
285            // intended overflow size.
286            let _original_len = id.len();
287            let mut cut = CLIENT_ID_MAX_LEN;
288            while cut > 0 && !id.is_char_boundary(cut) {
289                cut -= 1;
290            }
291            id.truncate(cut);
292            crate::tracing_compat::warn!(
293                target: "fugle_marketdata::config",
294                original_len = _original_len,
295                truncated_len = cut,
296                max_len = CLIENT_ID_MAX_LEN,
297                "client_id exceeded CLIENT_ID_MAX_LEN; truncated"
298            );
299        }
300        self.client_id = Some(id);
301        self
302    }
303
304    /// Convenience for `Option<impl Into<String>>` callers (e.g. forwarding
305    /// from a binding layer that may have an `Option<String>` in hand).
306    /// Equivalent to `client_id(...)` when `Some`; no-op when `None`.
307    pub fn maybe_client_id(self, id: Option<impl Into<String>>) -> Self {
308        match id {
309            Some(id) => self.client_id(id),
310            None => self,
311        }
312    }
313
314    /// Build the configuration
315    pub fn build(self) -> ConnectionConfig {
316        ConnectionConfig {
317            url: self.url,
318            auth: self.auth,
319            connect_timeout: self.connect_timeout,
320            read_timeout: self.read_timeout,
321            tls: self.tls,
322            message_buffer: self.message_buffer,
323            event_buffer: self.event_buffer,
324            client_id: self.client_id,
325        }
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_connection_config_new() {
335        let auth = AuthRequest::with_api_key("test-key");
336        let config = ConnectionConfig::new("wss://example.com", auth);
337
338        assert_eq!(config.url, "wss://example.com");
339        assert_eq!(config.connect_timeout, Duration::from_secs(30));
340        assert_eq!(config.read_timeout, Duration::from_secs(30));
341    }
342
343    #[test]
344    fn test_connection_config_builder() {
345        let auth = AuthRequest::with_api_key("test-key");
346        let config = ConnectionConfig::builder("wss://example.com", auth)
347            .connect_timeout(Duration::from_secs(10))
348            .read_timeout(Duration::from_secs(20))
349            .build();
350
351        assert_eq!(config.url, "wss://example.com");
352        assert_eq!(config.connect_timeout, Duration::from_secs(10));
353        assert_eq!(config.read_timeout, Duration::from_secs(20));
354    }
355
356    #[test]
357    fn test_fugle_stock_config() {
358        let auth = AuthRequest::with_api_key("test-key");
359        let config = ConnectionConfig::fugle_stock(auth);
360
361        assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/stock/streaming");
362    }
363
364    #[test]
365    fn test_fugle_futopt_config() {
366        let auth = AuthRequest::with_api_key("test-key");
367        let config = ConnectionConfig::fugle_futopt(auth);
368
369        assert_eq!(config.url, "wss://api.fugle.tw/marketdata/v1.0/futopt/streaming");
370    }
371
372    #[test]
373    fn test_debug_redacts_auth() {
374        let auth = AuthRequest::with_api_key("super-secret-api-key");
375        let config = ConnectionConfig::fugle_stock(auth);
376        let rendered = format!("{:?}", config);
377
378        assert!(rendered.contains("auth: \"<redacted>\""));
379        assert!(!rendered.contains("super-secret-api-key"));
380    }
381
382    #[test]
383    fn test_debug_redacts_url_query_token() {
384        let auth = AuthRequest::with_api_key("k");
385        let config =
386            ConnectionConfig::new("wss://example.com/stream?token=secret&v=1", auth);
387        let rendered = format!("{:?}", config);
388
389        assert!(rendered.contains("token=***"));
390        assert!(rendered.contains("v=1"));
391        assert!(!rendered.contains("token=secret"));
392    }
393
394    #[test]
395    fn test_debug_redacts_multiple_sensitive_keys() {
396        let auth = AuthRequest::with_api_key("k");
397        let config = ConnectionConfig::new(
398            "wss://example.com/stream?api_key=AAA&secret=BBB&safe=CCC",
399            auth,
400        );
401        let rendered = format!("{:?}", config);
402
403        assert!(rendered.contains("api_key=***"));
404        assert!(rendered.contains("secret=***"));
405        assert!(rendered.contains("safe=CCC"));
406        assert!(!rendered.contains("AAA"));
407        assert!(!rendered.contains("BBB"));
408    }
409
410    #[test]
411    fn test_debug_handles_unparseable_url() {
412        let auth = AuthRequest::with_api_key("k");
413        let config = ConnectionConfig::new("not a real url", auth);
414        let rendered = format!("{:?}", config);
415        assert!(rendered.contains("not a real url"));
416    }
417}