laminar_core/sink/
error.rs1use crate::reactor::SinkError as ReactorSinkError;
4
5#[derive(Debug, thiserror::Error)]
7pub enum SinkError {
8 #[error("Transaction already active: {0}")]
10 TransactionAlreadyActive(String),
11
12 #[error("No active transaction")]
14 NoActiveTransaction,
15
16 #[error("Transaction ID mismatch: expected {expected}, got {actual}")]
18 TransactionIdMismatch {
19 expected: String,
21 actual: String,
23 },
24
25 #[error("Transaction commit failed: {0}")]
27 CommitFailed(String),
28
29 #[error("Transaction rollback failed: {0}")]
31 RollbackFailed(String),
32
33 #[error("Write failed: {0}")]
35 WriteFailed(String),
36
37 #[error("Flush failed: {0}")]
39 FlushFailed(String),
40
41 #[error("Checkpoint error: {0}")]
43 CheckpointError(String),
44
45 #[error("Deduplication error: {0}")]
47 DeduplicationError(String),
48
49 #[error("Sink not connected")]
51 NotConnected,
52
53 #[error("Connection error: {0}")]
55 ConnectionError(String),
56
57 #[error("Configuration error: {0}")]
59 ConfigurationError(String),
60
61 #[error("Timeout after {0}ms")]
63 Timeout(u64),
64
65 #[error("Operation not supported: {0}")]
67 NotSupported(String),
68
69 #[error("Internal error: {0}")]
71 Internal(String),
72}
73
74impl From<SinkError> for ReactorSinkError {
75 fn from(err: SinkError) -> Self {
76 match err {
77 SinkError::WriteFailed(msg) => ReactorSinkError::WriteFailed(msg),
78 SinkError::FlushFailed(msg) => ReactorSinkError::FlushFailed(msg),
79 SinkError::NotConnected => ReactorSinkError::Closed,
80 other => ReactorSinkError::WriteFailed(other.to_string()),
81 }
82 }
83}
84
85impl From<ReactorSinkError> for SinkError {
86 fn from(err: ReactorSinkError) -> Self {
87 match err {
88 ReactorSinkError::WriteFailed(msg) => SinkError::WriteFailed(msg),
89 ReactorSinkError::FlushFailed(msg) => SinkError::FlushFailed(msg),
90 ReactorSinkError::Closed => SinkError::NotConnected,
91 }
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98
99 #[test]
100 fn test_error_display() {
101 let err = SinkError::TransactionAlreadyActive("tx-123".to_string());
102 assert!(err.to_string().contains("tx-123"));
103
104 let err = SinkError::TransactionIdMismatch {
105 expected: "tx-1".to_string(),
106 actual: "tx-2".to_string(),
107 };
108 assert!(err.to_string().contains("tx-1"));
109 assert!(err.to_string().contains("tx-2"));
110 }
111
112 #[test]
113 fn test_error_conversion_to_reactor() {
114 let err = SinkError::WriteFailed("test".to_string());
115 let reactor_err: ReactorSinkError = err.into();
116 assert!(matches!(reactor_err, ReactorSinkError::WriteFailed(_)));
117
118 let err = SinkError::NotConnected;
119 let reactor_err: ReactorSinkError = err.into();
120 assert!(matches!(reactor_err, ReactorSinkError::Closed));
121 }
122
123 #[test]
124 fn test_error_conversion_from_reactor() {
125 let reactor_err = ReactorSinkError::WriteFailed("test".to_string());
126 let err: SinkError = reactor_err.into();
127 assert!(matches!(err, SinkError::WriteFailed(_)));
128
129 let reactor_err = ReactorSinkError::Closed;
130 let err: SinkError = reactor_err.into();
131 assert!(matches!(err, SinkError::NotConnected));
132 }
133}