sea_streamer_socket/
error.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileErr;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaErr;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisErr;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioErr;
9
10use crate::{Backend, SeaStreamerBackend};
11use sea_streamer_types::{StreamErr, StreamResult};
12use thiserror::Error;
13
14/// `sea-streamer-socket` the concrete error type.
15pub type Error = StreamErr<BackendErr>;
16
17#[derive(Error, Debug)]
18/// `sea-streamer-socket` the concrete backend error.
19pub enum BackendErr {
20    #[cfg(feature = "backend-kafka")]
21    #[error("KafkaBackendErr: {0}")]
22    Kafka(KafkaErr),
23    #[cfg(feature = "backend-redis")]
24    #[error("RedisBackendErr: {0}")]
25    Redis(RedisErr),
26    #[cfg(feature = "backend-stdio")]
27    #[error("StdioBackendErr: {0}")]
28    Stdio(StdioErr),
29    #[cfg(feature = "backend-file")]
30    #[error("FileBackendErr: {0}")]
31    File(FileErr),
32}
33
34#[cfg(feature = "backend-kafka")]
35impl From<KafkaErr> for BackendErr {
36    fn from(err: KafkaErr) -> Self {
37        Self::Kafka(err)
38    }
39}
40
41#[cfg(feature = "backend-redis")]
42impl From<RedisErr> for BackendErr {
43    fn from(err: RedisErr) -> Self {
44        Self::Redis(err)
45    }
46}
47
48#[cfg(feature = "backend-stdio")]
49impl From<StdioErr> for BackendErr {
50    fn from(err: StdioErr) -> Self {
51        Self::Stdio(err)
52    }
53}
54
55#[cfg(feature = "backend-file")]
56impl From<FileErr> for BackendErr {
57    fn from(err: FileErr) -> Self {
58        Self::File(err)
59    }
60}
61
62impl SeaStreamerBackend for BackendErr {
63    #[cfg(feature = "backend-kafka")]
64    type Kafka = KafkaErr;
65    #[cfg(feature = "backend-redis")]
66    type Redis = RedisErr;
67    #[cfg(feature = "backend-stdio")]
68    type Stdio = StdioErr;
69    #[cfg(feature = "backend-file")]
70    type File = FileErr;
71
72    fn backend(&self) -> Backend {
73        match self {
74            #[cfg(feature = "backend-kafka")]
75            Self::Kafka(_) => Backend::Kafka,
76            #[cfg(feature = "backend-redis")]
77            Self::Redis(_) => Backend::Redis,
78            #[cfg(feature = "backend-stdio")]
79            Self::Stdio(_) => Backend::Stdio,
80            #[cfg(feature = "backend-file")]
81            Self::File(_) => Backend::File,
82        }
83    }
84
85    #[cfg(feature = "backend-kafka")]
86    fn get_kafka(&mut self) -> Option<&mut KafkaErr> {
87        match self {
88            Self::Kafka(s) => Some(s),
89            #[cfg(feature = "backend-redis")]
90            Self::Redis(_) => None,
91            #[cfg(feature = "backend-stdio")]
92            Self::Stdio(_) => None,
93            #[cfg(feature = "backend-file")]
94            Self::File(_) => None,
95        }
96    }
97
98    #[cfg(feature = "backend-redis")]
99    fn get_redis(&mut self) -> Option<&mut RedisErr> {
100        match self {
101            #[cfg(feature = "backend-kafka")]
102            Self::Kafka(_) => None,
103            Self::Redis(s) => Some(s),
104            #[cfg(feature = "backend-stdio")]
105            Self::Stdio(_) => None,
106            #[cfg(feature = "backend-file")]
107            Self::File(_) => None,
108        }
109    }
110
111    #[cfg(feature = "backend-stdio")]
112    fn get_stdio(&mut self) -> Option<&mut StdioErr> {
113        match self {
114            #[cfg(feature = "backend-kafka")]
115            Self::Kafka(_) => None,
116            #[cfg(feature = "backend-redis")]
117            Self::Redis(_) => None,
118            Self::Stdio(s) => Some(s),
119            #[cfg(feature = "backend-file")]
120            Self::File(_) => None,
121        }
122    }
123
124    #[cfg(feature = "backend-file")]
125    fn get_file(&mut self) -> Option<&mut FileErr> {
126        match self {
127            #[cfg(feature = "backend-kafka")]
128            Self::Kafka(_) => None,
129            #[cfg(feature = "backend-redis")]
130            Self::Redis(_) => None,
131            #[cfg(feature = "backend-stdio")]
132            Self::Stdio(_) => None,
133            Self::File(s) => Some(s),
134        }
135    }
136}
137
138pub(crate) type SeaResult<T> = StreamResult<T, BackendErr>;
139
140pub(crate) fn map_err<E: std::error::Error + Into<BackendErr>>(
141    err: StreamErr<E>,
142) -> StreamErr<BackendErr> {
143    match err {
144        StreamErr::Backend(err) => StreamErr::Backend(err.into()),
145        // sadly here is a lot of boilerplate, but at least the compiler tells us when it breaks
146        StreamErr::Connect(e) => StreamErr::Connect(e),
147        StreamErr::TimeoutNotSet => StreamErr::TimeoutNotSet,
148        StreamErr::AlreadyAnchored => StreamErr::AlreadyAnchored,
149        StreamErr::NotAnchored => StreamErr::NotAnchored,
150        StreamErr::ConsumerGroupIsSet => StreamErr::ConsumerGroupIsSet,
151        StreamErr::ConsumerGroupNotSet => StreamErr::ConsumerGroupNotSet,
152        StreamErr::StreamKeyEmpty => StreamErr::StreamKeyEmpty,
153        StreamErr::StreamKeyNotFound => StreamErr::StreamKeyNotFound,
154        StreamErr::CommitNotAllowed => StreamErr::CommitNotAllowed,
155        StreamErr::Utf8Error(e) => StreamErr::Utf8Error(e),
156        StreamErr::StreamUrlErr(e) => StreamErr::StreamUrlErr(e),
157        StreamErr::StreamKeyErr(e) => StreamErr::StreamKeyErr(e),
158        StreamErr::Unsupported(e) => StreamErr::Unsupported(e),
159        StreamErr::Runtime(e) => StreamErr::Runtime(e),
160    }
161}