Skip to main content

fin_stream/
error.rs

1//! Typed error hierarchy for fin-stream.
2//!
3//! All fallible operations in the pipeline return `StreamError`. Variants are
4//! grouped by subsystem: connection/WebSocket, tick parsing, order book,
5//! backpressure, and the streaming pipeline internals (ring buffer, aggregation,
6//! normalization, transforms).
7
8/// Unified error type for all fin-stream pipeline operations.
9///
10/// Each variant carries enough context to reconstruct the failure site without
11/// inspecting internal state. The `Display` impl is machine-parseable: field
12/// values never contain the literal substring used as a delimiter.
13#[derive(Debug, thiserror::Error)]
14pub enum StreamError {
15    /// WebSocket connection failed.
16    #[error("WebSocket connection failed to '{url}': {reason}")]
17    ConnectionFailed {
18        /// The WebSocket URL that could not be reached.
19        url: String,
20        /// Human-readable description of the failure.
21        reason: String,
22    },
23
24    /// WebSocket disconnected unexpectedly.
25    #[error("WebSocket disconnected from '{url}'")]
26    Disconnected {
27        /// The WebSocket URL that was disconnected.
28        url: String,
29    },
30
31    /// Reconnection attempts exhausted.
32    #[error("Reconnection exhausted after {attempts} attempts to '{url}'")]
33    ReconnectExhausted {
34        /// The target URL for reconnection.
35        url: String,
36        /// Total number of reconnect attempts made.
37        attempts: u32,
38    },
39
40    /// Tick deserialization failed.
41    #[error("Tick parse error from {exchange}: {reason}")]
42    ParseError {
43        /// Name of the exchange that sent the unparseable tick.
44        exchange: String,
45        /// Description of the parse failure.
46        reason: String,
47    },
48
49    /// Feed is stale -- no data received within staleness threshold.
50    #[error("Feed '{feed_id}' is stale: last tick was {elapsed_ms}ms ago (threshold: {threshold_ms}ms)")]
51    StaleFeed {
52        /// Identifier of the stale feed.
53        feed_id: String,
54        /// Milliseconds since the last tick was received.
55        elapsed_ms: u64,
56        /// Configured staleness threshold in milliseconds.
57        threshold_ms: u64,
58    },
59
60    /// Order book reconstruction failed.
61    #[error("Order book reconstruction failed for '{symbol}': {reason}")]
62    BookReconstructionFailed {
63        /// Symbol whose order book could not be reconstructed.
64        symbol: String,
65        /// Description of the reconstruction failure.
66        reason: String,
67    },
68
69    /// Order book is crossed (bid >= ask).
70    #[error("Order book crossed for '{symbol}': best bid {bid} >= best ask {ask}")]
71    BookCrossed {
72        /// Symbol with the crossed book.
73        symbol: String,
74        /// Best bid price as a string.
75        bid: String,
76        /// Best ask price as a string.
77        ask: String,
78    },
79
80    /// Backpressure: the downstream channel is full.
81    #[error("Backpressure on channel '{channel}': {depth}/{capacity} slots used")]
82    Backpressure {
83        /// Name or URL of the backpressured channel.
84        channel: String,
85        /// Current number of items queued.
86        depth: usize,
87        /// Maximum capacity of the channel.
88        capacity: usize,
89    },
90
91    /// Invalid exchange format.
92    #[error("Unknown exchange format: '{0}'")]
93    UnknownExchange(String),
94
95    /// I/O error.
96    #[error("I/O error: {0}")]
97    Io(String),
98
99    /// WebSocket protocol error.
100    #[error("WebSocket error: {0}")]
101    WebSocket(String),
102
103    // ── Pipeline-internal errors ─────────────────────────────────────────────
104
105    /// SPSC ring buffer is full; the producer must back off or drop the item.
106    ///
107    /// This variant is returned by [`crate::ring::SpscRing::push`] when the
108    /// buffer has no free slots. It never panics.
109    #[error("SPSC ring buffer is full (capacity: {capacity})")]
110    RingBufferFull {
111        /// Configured usable capacity of the ring buffer (N - 1 slots).
112        capacity: usize,
113    },
114
115    /// SPSC ring buffer is empty; no item is available for the consumer.
116    ///
117    /// This variant is returned by [`crate::ring::SpscRing::pop`] when there
118    /// are no pending items. Callers should retry or park the consumer thread.
119    #[error("SPSC ring buffer is empty")]
120    RingBufferEmpty,
121
122    /// An error occurred during OHLCV bar aggregation.
123    ///
124    /// Wraps structural errors such as receiving a tick for the wrong symbol or
125    /// a timeframe with a zero-duration period.
126    #[error("OHLCV aggregation error: {reason}")]
127    AggregationError {
128        /// Description of the aggregation failure.
129        reason: String,
130    },
131
132    /// An error occurred during coordinate normalization.
133    ///
134    /// Typically indicates that the normalizer received a value outside the
135    /// expected numeric range, or that the rolling window is not yet seeded.
136    #[error("Normalization error: {reason}")]
137    NormalizationError {
138        /// Description of the normalization failure.
139        reason: String,
140    },
141
142    /// A tick failed structural validation before entering the pipeline.
143    ///
144    /// Examples: negative price, zero quantity, timestamp in the past beyond
145    /// the configured tolerance.
146    #[error("Invalid tick: {reason}")]
147    InvalidTick {
148        /// Description of the validation failure.
149        reason: String,
150    },
151
152    /// The Lorentz transform configuration is invalid.
153    ///
154    /// The relativistic velocity parameter beta (v/c) must satisfy 0 <= beta < 1.
155    /// A beta of exactly 1 (or above) would produce a division by zero in the
156    /// Lorentz factor gamma = 1 / sqrt(1 - beta^2).
157    #[error("Lorentz config error: {reason}")]
158    LorentzConfigError {
159        /// Description of the configuration error.
160        reason: String,
161    },
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167
168    #[test]
169    fn test_connection_failed_display() {
170        let e = StreamError::ConnectionFailed {
171            url: "wss://example.com".into(),
172            reason: "timeout".into(),
173        };
174        assert!(e.to_string().contains("example.com"));
175        assert!(e.to_string().contains("timeout"));
176    }
177
178    #[test]
179    fn test_disconnected_display() {
180        let e = StreamError::Disconnected { url: "wss://feed.io".into() };
181        assert!(e.to_string().contains("feed.io"));
182    }
183
184    #[test]
185    fn test_reconnect_exhausted_display() {
186        let e = StreamError::ReconnectExhausted { url: "wss://x.io".into(), attempts: 5 };
187        assert!(e.to_string().contains("5"));
188    }
189
190    #[test]
191    fn test_parse_error_display() {
192        let e = StreamError::ParseError { exchange: "Binance".into(), reason: "missing field".into() };
193        assert!(e.to_string().contains("Binance"));
194    }
195
196    #[test]
197    fn test_stale_feed_display() {
198        let e = StreamError::StaleFeed {
199            feed_id: "BTC-USD".into(),
200            elapsed_ms: 5000,
201            threshold_ms: 2000,
202        };
203        assert!(e.to_string().contains("BTC-USD"));
204        assert!(e.to_string().contains("5000"));
205    }
206
207    #[test]
208    fn test_book_reconstruction_failed_display() {
209        let e = StreamError::BookReconstructionFailed {
210            symbol: "ETH-USD".into(),
211            reason: "gap in sequence".into(),
212        };
213        assert!(e.to_string().contains("ETH-USD"));
214    }
215
216    #[test]
217    fn test_book_crossed_display() {
218        let e = StreamError::BookCrossed {
219            symbol: "BTC-USD".into(),
220            bid: "50001".into(),
221            ask: "50000".into(),
222        };
223        assert!(e.to_string().contains("crossed"));
224    }
225
226    #[test]
227    fn test_backpressure_display() {
228        let e = StreamError::Backpressure { channel: "ticks".into(), depth: 1000, capacity: 1000 };
229        assert!(e.to_string().contains("1000"));
230    }
231
232    #[test]
233    fn test_unknown_exchange_display() {
234        let e = StreamError::UnknownExchange("Kraken".into());
235        assert!(e.to_string().contains("Kraken"));
236    }
237
238    #[test]
239    fn test_ring_buffer_full_display() {
240        let e = StreamError::RingBufferFull { capacity: 1024 };
241        assert!(e.to_string().contains("1024"));
242        assert!(e.to_string().contains("full"));
243    }
244
245    #[test]
246    fn test_ring_buffer_empty_display() {
247        let e = StreamError::RingBufferEmpty;
248        assert!(e.to_string().contains("empty"));
249    }
250
251    #[test]
252    fn test_aggregation_error_display() {
253        let e = StreamError::AggregationError { reason: "wrong symbol".into() };
254        assert!(e.to_string().contains("wrong symbol"));
255    }
256
257    #[test]
258    fn test_normalization_error_display() {
259        let e = StreamError::NormalizationError { reason: "window not seeded".into() };
260        assert!(e.to_string().contains("window not seeded"));
261    }
262
263    #[test]
264    fn test_invalid_tick_display() {
265        let e = StreamError::InvalidTick { reason: "negative price".into() };
266        assert!(e.to_string().contains("negative price"));
267    }
268
269    #[test]
270    fn test_lorentz_config_error_display() {
271        let e = StreamError::LorentzConfigError { reason: "beta >= 1".into() };
272        assert!(e.to_string().contains("beta >= 1"));
273    }
274}