sea_streamer_socket/
error.rs1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileErr;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaErr;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisErr;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioErr;
9
10use crate::{Backend, SeaStreamerBackend};
11use sea_streamer_types::{StreamErr, StreamResult};
12use thiserror::Error;
13
14pub type Error = StreamErr<BackendErr>;
16
17#[derive(Error, Debug)]
18pub enum BackendErr {
20 #[cfg(feature = "backend-kafka")]
21 #[error("KafkaBackendErr: {0}")]
22 Kafka(KafkaErr),
23 #[cfg(feature = "backend-redis")]
24 #[error("RedisBackendErr: {0}")]
25 Redis(RedisErr),
26 #[cfg(feature = "backend-stdio")]
27 #[error("StdioBackendErr: {0}")]
28 Stdio(StdioErr),
29 #[cfg(feature = "backend-file")]
30 #[error("FileBackendErr: {0}")]
31 File(FileErr),
32}
33
34#[cfg(feature = "backend-kafka")]
35impl From<KafkaErr> for BackendErr {
36 fn from(err: KafkaErr) -> Self {
37 Self::Kafka(err)
38 }
39}
40
41#[cfg(feature = "backend-redis")]
42impl From<RedisErr> for BackendErr {
43 fn from(err: RedisErr) -> Self {
44 Self::Redis(err)
45 }
46}
47
48#[cfg(feature = "backend-stdio")]
49impl From<StdioErr> for BackendErr {
50 fn from(err: StdioErr) -> Self {
51 Self::Stdio(err)
52 }
53}
54
55#[cfg(feature = "backend-file")]
56impl From<FileErr> for BackendErr {
57 fn from(err: FileErr) -> Self {
58 Self::File(err)
59 }
60}
61
62impl SeaStreamerBackend for BackendErr {
63 #[cfg(feature = "backend-kafka")]
64 type Kafka = KafkaErr;
65 #[cfg(feature = "backend-redis")]
66 type Redis = RedisErr;
67 #[cfg(feature = "backend-stdio")]
68 type Stdio = StdioErr;
69 #[cfg(feature = "backend-file")]
70 type File = FileErr;
71
72 fn backend(&self) -> Backend {
73 match self {
74 #[cfg(feature = "backend-kafka")]
75 Self::Kafka(_) => Backend::Kafka,
76 #[cfg(feature = "backend-redis")]
77 Self::Redis(_) => Backend::Redis,
78 #[cfg(feature = "backend-stdio")]
79 Self::Stdio(_) => Backend::Stdio,
80 #[cfg(feature = "backend-file")]
81 Self::File(_) => Backend::File,
82 }
83 }
84
85 #[cfg(feature = "backend-kafka")]
86 fn get_kafka(&mut self) -> Option<&mut KafkaErr> {
87 match self {
88 Self::Kafka(s) => Some(s),
89 #[cfg(feature = "backend-redis")]
90 Self::Redis(_) => None,
91 #[cfg(feature = "backend-stdio")]
92 Self::Stdio(_) => None,
93 #[cfg(feature = "backend-file")]
94 Self::File(_) => None,
95 }
96 }
97
98 #[cfg(feature = "backend-redis")]
99 fn get_redis(&mut self) -> Option<&mut RedisErr> {
100 match self {
101 #[cfg(feature = "backend-kafka")]
102 Self::Kafka(_) => None,
103 Self::Redis(s) => Some(s),
104 #[cfg(feature = "backend-stdio")]
105 Self::Stdio(_) => None,
106 #[cfg(feature = "backend-file")]
107 Self::File(_) => None,
108 }
109 }
110
111 #[cfg(feature = "backend-stdio")]
112 fn get_stdio(&mut self) -> Option<&mut StdioErr> {
113 match self {
114 #[cfg(feature = "backend-kafka")]
115 Self::Kafka(_) => None,
116 #[cfg(feature = "backend-redis")]
117 Self::Redis(_) => None,
118 Self::Stdio(s) => Some(s),
119 #[cfg(feature = "backend-file")]
120 Self::File(_) => None,
121 }
122 }
123
124 #[cfg(feature = "backend-file")]
125 fn get_file(&mut self) -> Option<&mut FileErr> {
126 match self {
127 #[cfg(feature = "backend-kafka")]
128 Self::Kafka(_) => None,
129 #[cfg(feature = "backend-redis")]
130 Self::Redis(_) => None,
131 #[cfg(feature = "backend-stdio")]
132 Self::Stdio(_) => None,
133 Self::File(s) => Some(s),
134 }
135 }
136}
137
138pub(crate) type SeaResult<T> = StreamResult<T, BackendErr>;
139
140pub(crate) fn map_err<E: std::error::Error + Into<BackendErr>>(
141 err: StreamErr<E>,
142) -> StreamErr<BackendErr> {
143 match err {
144 StreamErr::Backend(err) => StreamErr::Backend(err.into()),
145 StreamErr::Connect(e) => StreamErr::Connect(e),
147 StreamErr::TimeoutNotSet => StreamErr::TimeoutNotSet,
148 StreamErr::AlreadyAnchored => StreamErr::AlreadyAnchored,
149 StreamErr::NotAnchored => StreamErr::NotAnchored,
150 StreamErr::ConsumerGroupIsSet => StreamErr::ConsumerGroupIsSet,
151 StreamErr::ConsumerGroupNotSet => StreamErr::ConsumerGroupNotSet,
152 StreamErr::StreamKeyEmpty => StreamErr::StreamKeyEmpty,
153 StreamErr::StreamKeyNotFound => StreamErr::StreamKeyNotFound,
154 StreamErr::CommitNotAllowed => StreamErr::CommitNotAllowed,
155 StreamErr::Utf8Error(e) => StreamErr::Utf8Error(e),
156 StreamErr::StreamUrlErr(e) => StreamErr::StreamUrlErr(e),
157 StreamErr::StreamKeyErr(e) => StreamErr::StreamKeyErr(e),
158 StreamErr::Unsupported(e) => StreamErr::Unsupported(e),
159 StreamErr::Runtime(e) => StreamErr::Runtime(e),
160 }
161}