Skip to main content

laminar_core/streaming/
error.rs

1//! Streaming API error types.
2//!
3//! This module defines error types for the streaming API including
4//! source, sink, and channel operations.
5
6use std::fmt;
7
8/// Error type for streaming operations.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum StreamingError {
11    /// Channel is full and backpressure strategy is Error.
12    ChannelFull,
13
14    /// Channel is closed (all receivers dropped).
15    ChannelClosed,
16
17    /// Channel is disconnected (all senders dropped).
18    Disconnected,
19
20    /// Invalid configuration provided.
21    InvalidConfig(String),
22
23    /// Schema mismatch during `push_arrow` operation.
24    SchemaMismatch {
25        /// Expected schema field names.
26        expected: Vec<String>,
27        /// Actual schema field names.
28        actual: Vec<String>,
29    },
30
31    /// Operation timed out.
32    Timeout,
33
34    /// Internal error.
35    Internal(String),
36}
37
38impl fmt::Display for StreamingError {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            Self::ChannelFull => write!(f, "channel is full"),
42            Self::ChannelClosed => write!(f, "channel is closed"),
43            Self::Disconnected => write!(f, "channel is disconnected"),
44            Self::InvalidConfig(msg) => write!(f, "invalid configuration: {msg}"),
45            Self::SchemaMismatch { expected, actual } => {
46                write!(f, "schema mismatch: expected {expected:?}, got {actual:?}")
47            }
48            Self::Timeout => write!(f, "operation timed out"),
49            Self::Internal(msg) => write!(f, "internal error: {msg}"),
50        }
51    }
52}
53
54impl std::error::Error for StreamingError {}
55
56/// Error returned from `try_push` operations.
57#[derive(Debug)]
58pub struct TryPushError<T> {
59    /// The value that could not be pushed.
60    pub value: T,
61    /// The error that occurred.
62    pub error: StreamingError,
63}
64
65impl<T> TryPushError<T> {
66    /// Creates a new error indicating the channel is full.
67    #[must_use]
68    pub fn full(value: T) -> Self {
69        Self {
70            value,
71            error: StreamingError::ChannelFull,
72        }
73    }
74
75    /// Creates a new error indicating the channel is closed.
76    #[must_use]
77    pub fn closed(value: T) -> Self {
78        Self {
79            value,
80            error: StreamingError::ChannelClosed,
81        }
82    }
83
84    /// Returns true if the error is due to a full channel.
85    #[must_use]
86    pub fn is_full(&self) -> bool {
87        matches!(self.error, StreamingError::ChannelFull)
88    }
89
90    /// Returns true if the error is due to a closed channel.
91    #[must_use]
92    pub fn is_closed(&self) -> bool {
93        matches!(self.error, StreamingError::ChannelClosed)
94    }
95
96    /// Consumes the error and returns the value that could not be pushed.
97    #[must_use]
98    pub fn into_inner(self) -> T {
99        self.value
100    }
101}
102
103impl<T: fmt::Debug> fmt::Display for TryPushError<T> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        write!(f, "try_push failed: {}", self.error)
106    }
107}
108
109impl<T: fmt::Debug> std::error::Error for TryPushError<T> {}
110
111/// Error returned from `recv` operations.
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum RecvError {
114    /// Channel is disconnected (all senders dropped).
115    Disconnected,
116
117    /// Operation timed out.
118    Timeout,
119}
120
121impl fmt::Display for RecvError {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        match self {
124            Self::Disconnected => write!(f, "channel disconnected"),
125            Self::Timeout => write!(f, "recv timed out"),
126        }
127    }
128}
129
130impl std::error::Error for RecvError {}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135
136    #[test]
137    fn test_streaming_error_display() {
138        assert_eq!(StreamingError::ChannelFull.to_string(), "channel is full");
139        assert_eq!(
140            StreamingError::ChannelClosed.to_string(),
141            "channel is closed"
142        );
143        assert_eq!(
144            StreamingError::Disconnected.to_string(),
145            "channel is disconnected"
146        );
147        assert_eq!(
148            StreamingError::InvalidConfig("bad".to_string()).to_string(),
149            "invalid configuration: bad"
150        );
151        assert_eq!(StreamingError::Timeout.to_string(), "operation timed out");
152    }
153
154    #[test]
155    fn test_try_push_error() {
156        let err = TryPushError::full(42);
157        assert!(err.is_full());
158        assert!(!err.is_closed());
159        assert_eq!(err.into_inner(), 42);
160
161        let err = TryPushError::closed("test");
162        assert!(!err.is_full());
163        assert!(err.is_closed());
164        assert_eq!(err.into_inner(), "test");
165    }
166
167    #[test]
168    fn test_recv_error_display() {
169        assert_eq!(RecvError::Disconnected.to_string(), "channel disconnected");
170        assert_eq!(RecvError::Timeout.to_string(), "recv timed out");
171    }
172
173    #[test]
174    fn test_schema_mismatch_display() {
175        let err = StreamingError::SchemaMismatch {
176            expected: vec!["a".to_string(), "b".to_string()],
177            actual: vec!["x".to_string(), "y".to_string()],
178        };
179        assert!(err.to_string().contains("schema mismatch"));
180    }
181}