Skip to main content

laminar_core/sink/
error.rs

1//! Error types for exactly-once sinks
2
3use crate::reactor::SinkError as ReactorSinkError;
4
5/// Errors that can occur in exactly-once sinks
6#[derive(Debug, thiserror::Error)]
7pub enum SinkError {
8    /// Transaction has already started
9    #[error("Transaction already active: {0}")]
10    TransactionAlreadyActive(String),
11
12    /// No active transaction
13    #[error("No active transaction")]
14    NoActiveTransaction,
15
16    /// Transaction ID mismatch
17    #[error("Transaction ID mismatch: expected {expected}, got {actual}")]
18    TransactionIdMismatch {
19        /// Expected transaction ID
20        expected: String,
21        /// Actual transaction ID
22        actual: String,
23    },
24
25    /// Transaction commit failed
26    #[error("Transaction commit failed: {0}")]
27    CommitFailed(String),
28
29    /// Transaction rollback failed
30    #[error("Transaction rollback failed: {0}")]
31    RollbackFailed(String),
32
33    /// Write operation failed
34    #[error("Write failed: {0}")]
35    WriteFailed(String),
36
37    /// Flush operation failed
38    #[error("Flush failed: {0}")]
39    FlushFailed(String),
40
41    /// Checkpoint serialization/deserialization failed
42    #[error("Checkpoint error: {0}")]
43    CheckpointError(String),
44
45    /// Deduplication store error
46    #[error("Deduplication error: {0}")]
47    DeduplicationError(String),
48
49    /// Sink is not connected
50    #[error("Sink not connected")]
51    NotConnected,
52
53    /// Sink connection error
54    #[error("Connection error: {0}")]
55    ConnectionError(String),
56
57    /// Configuration error
58    #[error("Configuration error: {0}")]
59    ConfigurationError(String),
60
61    /// Timeout occurred
62    #[error("Timeout after {0}ms")]
63    Timeout(u64),
64
65    /// Sink does not support the requested operation
66    #[error("Operation not supported: {0}")]
67    NotSupported(String),
68
69    /// Internal error
70    #[error("Internal error: {0}")]
71    Internal(String),
72}
73
74impl From<SinkError> for ReactorSinkError {
75    fn from(err: SinkError) -> Self {
76        match err {
77            SinkError::WriteFailed(msg) => ReactorSinkError::WriteFailed(msg),
78            SinkError::FlushFailed(msg) => ReactorSinkError::FlushFailed(msg),
79            SinkError::NotConnected => ReactorSinkError::Closed,
80            other => ReactorSinkError::WriteFailed(other.to_string()),
81        }
82    }
83}
84
85impl From<ReactorSinkError> for SinkError {
86    fn from(err: ReactorSinkError) -> Self {
87        match err {
88            ReactorSinkError::WriteFailed(msg) => SinkError::WriteFailed(msg),
89            ReactorSinkError::FlushFailed(msg) => SinkError::FlushFailed(msg),
90            ReactorSinkError::Closed => SinkError::NotConnected,
91        }
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98
99    #[test]
100    fn test_error_display() {
101        let err = SinkError::TransactionAlreadyActive("tx-123".to_string());
102        assert!(err.to_string().contains("tx-123"));
103
104        let err = SinkError::TransactionIdMismatch {
105            expected: "tx-1".to_string(),
106            actual: "tx-2".to_string(),
107        };
108        assert!(err.to_string().contains("tx-1"));
109        assert!(err.to_string().contains("tx-2"));
110    }
111
112    #[test]
113    fn test_error_conversion_to_reactor() {
114        let err = SinkError::WriteFailed("test".to_string());
115        let reactor_err: ReactorSinkError = err.into();
116        assert!(matches!(reactor_err, ReactorSinkError::WriteFailed(_)));
117
118        let err = SinkError::NotConnected;
119        let reactor_err: ReactorSinkError = err.into();
120        assert!(matches!(reactor_err, ReactorSinkError::Closed));
121    }
122
123    #[test]
124    fn test_error_conversion_from_reactor() {
125        let reactor_err = ReactorSinkError::WriteFailed("test".to_string());
126        let err: SinkError = reactor_err.into();
127        assert!(matches!(err, SinkError::WriteFailed(_)));
128
129        let reactor_err = ReactorSinkError::Closed;
130        let err: SinkError = reactor_err.into();
131        assert!(matches!(err, SinkError::NotConnected));
132    }
133}