Skip to main content

fin_stream/
error.rs

1//! Typed error hierarchy for fin-stream.
2//!
3//! All fallible operations in the pipeline return `StreamError`. Variants are
4//! grouped by subsystem: connection/WebSocket, tick parsing, order book,
5//! backpressure, and the streaming pipeline internals (ring buffer, aggregation,
6//! normalization, transforms).
7
8/// Unified error type for all fin-stream pipeline operations.
9///
10/// Each variant carries enough context to reconstruct the failure site without
11/// inspecting internal state. The `Display` impl is machine-parseable: field
12/// values never contain the literal substring used as a delimiter.
13#[derive(Debug, thiserror::Error)]
14pub enum StreamError {
15    /// WebSocket connection failed.
16    #[error("WebSocket connection failed to '{url}': {reason}")]
17    ConnectionFailed {
18        /// The WebSocket URL that could not be reached.
19        url: String,
20        /// Human-readable description of the failure.
21        reason: String,
22    },
23
24    /// WebSocket disconnected unexpectedly.
25    #[error("WebSocket disconnected from '{url}'")]
26    Disconnected {
27        /// The WebSocket URL that was disconnected.
28        url: String,
29    },
30
31    /// Reconnection attempts exhausted.
32    #[error("Reconnection exhausted after {attempts} attempts to '{url}'")]
33    ReconnectExhausted {
34        /// The target URL for reconnection.
35        url: String,
36        /// Total number of reconnect attempts made.
37        attempts: u32,
38    },
39
40    /// Tick deserialization failed.
41    #[error("Tick parse error from {exchange}: {reason}")]
42    ParseError {
43        /// Name of the exchange that sent the unparseable tick.
44        exchange: String,
45        /// Description of the parse failure.
46        reason: String,
47    },
48
49    /// Feed is stale -- no data received within staleness threshold.
50    #[error(
51        "Feed '{feed_id}' is stale: last tick was {elapsed_ms}ms ago (threshold: {threshold_ms}ms)"
52    )]
53    StaleFeed {
54        /// Identifier of the stale feed.
55        feed_id: String,
56        /// Milliseconds since the last tick was received.
57        elapsed_ms: u64,
58        /// Configured staleness threshold in milliseconds.
59        threshold_ms: u64,
60    },
61
62    /// Order book reconstruction failed.
63    #[error("Order book reconstruction failed for '{symbol}': {reason}")]
64    BookReconstructionFailed {
65        /// Symbol whose order book could not be reconstructed.
66        symbol: String,
67        /// Description of the reconstruction failure.
68        reason: String,
69    },
70
71    /// Order book is crossed (bid >= ask).
72    #[error("Order book crossed for '{symbol}': best bid {bid} >= best ask {ask}")]
73    BookCrossed {
74        /// Symbol with the crossed book.
75        symbol: String,
76        /// Best bid price as a string.
77        bid: String,
78        /// Best ask price as a string.
79        ask: String,
80    },
81
82    /// Backpressure: the downstream channel is full.
83    #[error("Backpressure on channel '{channel}': {depth}/{capacity} slots used")]
84    Backpressure {
85        /// Name or URL of the backpressured channel.
86        channel: String,
87        /// Current number of items queued.
88        depth: usize,
89        /// Maximum capacity of the channel.
90        capacity: usize,
91    },
92
93    /// Invalid exchange format.
94    #[error("Unknown exchange format: '{0}'")]
95    UnknownExchange(String),
96
97    /// I/O error.
98    #[error("I/O error: {0}")]
99    Io(String),
100
101    /// WebSocket protocol error.
102    #[error("WebSocket error: {0}")]
103    WebSocket(String),
104
105    // ── Pipeline-internal errors ─────────────────────────────────────────────
106    /// SPSC ring buffer is full; the producer must back off or drop the item.
107    ///
108    /// This variant is returned by [`crate::ring::SpscRing::push`] when the
109    /// buffer has no free slots. It never panics.
110    #[error("SPSC ring buffer is full (capacity: {capacity})")]
111    RingBufferFull {
112        /// Configured usable capacity of the ring buffer (N - 1 slots).
113        capacity: usize,
114    },
115
116    /// SPSC ring buffer is empty; no item is available for the consumer.
117    ///
118    /// This variant is returned by [`crate::ring::SpscRing::pop`] when there
119    /// are no pending items. Callers should retry or park the consumer thread.
120    #[error("SPSC ring buffer is empty")]
121    RingBufferEmpty,
122
123    /// An error occurred during OHLCV bar aggregation.
124    ///
125    /// Wraps structural errors such as receiving a tick for the wrong symbol or
126    /// a timeframe with a zero-duration period.
127    #[error("OHLCV aggregation error: {reason}")]
128    AggregationError {
129        /// Description of the aggregation failure.
130        reason: String,
131    },
132
133    /// An error occurred during coordinate normalization.
134    ///
135    /// Typically indicates that the normalizer received a value outside the
136    /// expected numeric range, or that the rolling window is not yet seeded.
137    #[error("Normalization error: {reason}")]
138    NormalizationError {
139        /// Description of the normalization failure.
140        reason: String,
141    },
142
143    /// A tick failed structural validation before entering the pipeline.
144    ///
145    /// Examples: negative price, zero quantity, timestamp in the past beyond
146    /// the configured tolerance.
147    #[error("Invalid tick: {reason}")]
148    InvalidTick {
149        /// Description of the validation failure.
150        reason: String,
151    },
152
153    /// The Lorentz transform configuration is invalid.
154    ///
155    /// The relativistic velocity parameter beta (v/c) must satisfy 0 <= beta < 1.
156    /// A beta of exactly 1 (or above) would produce a division by zero in the
157    /// Lorentz factor gamma = 1 / sqrt(1 - beta^2).
158    #[error("Lorentz config error: {reason}")]
159    LorentzConfigError {
160        /// Description of the configuration error.
161        reason: String,
162    },
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn test_connection_failed_display() {
171        let e = StreamError::ConnectionFailed {
172            url: "wss://example.com".into(),
173            reason: "timeout".into(),
174        };
175        assert!(e.to_string().contains("example.com"));
176        assert!(e.to_string().contains("timeout"));
177    }
178
179    #[test]
180    fn test_disconnected_display() {
181        let e = StreamError::Disconnected {
182            url: "wss://feed.io".into(),
183        };
184        assert!(e.to_string().contains("feed.io"));
185    }
186
187    #[test]
188    fn test_reconnect_exhausted_display() {
189        let e = StreamError::ReconnectExhausted {
190            url: "wss://x.io".into(),
191            attempts: 5,
192        };
193        assert!(e.to_string().contains("5"));
194    }
195
196    #[test]
197    fn test_parse_error_display() {
198        let e = StreamError::ParseError {
199            exchange: "Binance".into(),
200            reason: "missing field".into(),
201        };
202        assert!(e.to_string().contains("Binance"));
203    }
204
205    #[test]
206    fn test_stale_feed_display() {
207        let e = StreamError::StaleFeed {
208            feed_id: "BTC-USD".into(),
209            elapsed_ms: 5000,
210            threshold_ms: 2000,
211        };
212        assert!(e.to_string().contains("BTC-USD"));
213        assert!(e.to_string().contains("5000"));
214    }
215
216    #[test]
217    fn test_book_reconstruction_failed_display() {
218        let e = StreamError::BookReconstructionFailed {
219            symbol: "ETH-USD".into(),
220            reason: "gap in sequence".into(),
221        };
222        assert!(e.to_string().contains("ETH-USD"));
223    }
224
225    #[test]
226    fn test_book_crossed_display() {
227        let e = StreamError::BookCrossed {
228            symbol: "BTC-USD".into(),
229            bid: "50001".into(),
230            ask: "50000".into(),
231        };
232        assert!(e.to_string().contains("crossed"));
233    }
234
235    #[test]
236    fn test_backpressure_display() {
237        let e = StreamError::Backpressure {
238            channel: "ticks".into(),
239            depth: 1000,
240            capacity: 1000,
241        };
242        assert!(e.to_string().contains("1000"));
243    }
244
245    #[test]
246    fn test_unknown_exchange_display() {
247        let e = StreamError::UnknownExchange("Kraken".into());
248        assert!(e.to_string().contains("Kraken"));
249    }
250
251    #[test]
252    fn test_ring_buffer_full_display() {
253        let e = StreamError::RingBufferFull { capacity: 1024 };
254        assert!(e.to_string().contains("1024"));
255        assert!(e.to_string().contains("full"));
256    }
257
258    #[test]
259    fn test_ring_buffer_empty_display() {
260        let e = StreamError::RingBufferEmpty;
261        assert!(e.to_string().contains("empty"));
262    }
263
264    #[test]
265    fn test_aggregation_error_display() {
266        let e = StreamError::AggregationError {
267            reason: "wrong symbol".into(),
268        };
269        assert!(e.to_string().contains("wrong symbol"));
270    }
271
272    #[test]
273    fn test_normalization_error_display() {
274        let e = StreamError::NormalizationError {
275            reason: "window not seeded".into(),
276        };
277        assert!(e.to_string().contains("window not seeded"));
278    }
279
280    #[test]
281    fn test_invalid_tick_display() {
282        let e = StreamError::InvalidTick {
283            reason: "negative price".into(),
284        };
285        assert!(e.to_string().contains("negative price"));
286    }
287
288    #[test]
289    fn test_lorentz_config_error_display() {
290        let e = StreamError::LorentzConfigError {
291            reason: "beta >= 1".into(),
292        };
293        assert!(e.to_string().contains("beta >= 1"));
294    }
295}