sea_streamer_socket/
message.rs1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileMessage;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaMessage;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisMessage;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioMessage;
9
10use crate::{Backend, SeaStreamerBackend};
11use sea_streamer_types::{Message, Payload, SeqNo, ShardId, StreamKey, Timestamp};
12
13#[derive(Debug)]
14pub enum SeaMessage<'a> {
16 #[cfg(feature = "backend-kafka")]
17 Kafka(KafkaMessage<'a>),
18 #[cfg(feature = "backend-redis")]
19 Redis(RedisMessage),
20 #[cfg(feature = "backend-stdio")]
21 Stdio(StdioMessage),
22 #[cfg(feature = "backend-file")]
23 File(FileMessage),
24 #[cfg(not(feature = "backend-kafka"))]
25 None(std::marker::PhantomData<&'a ()>),
26}
27
28impl<'a> SeaStreamerBackend for SeaMessage<'a> {
29 #[cfg(feature = "backend-kafka")]
30 type Kafka = KafkaMessage<'a>;
31 #[cfg(feature = "backend-redis")]
32 type Redis = RedisMessage;
33 #[cfg(feature = "backend-stdio")]
34 type Stdio = StdioMessage;
35 #[cfg(feature = "backend-file")]
36 type File = FileMessage;
37
38 fn backend(&self) -> Backend {
39 match self {
40 #[cfg(feature = "backend-kafka")]
41 Self::Kafka(_) => Backend::Kafka,
42 #[cfg(feature = "backend-redis")]
43 Self::Redis(_) => Backend::Redis,
44 #[cfg(feature = "backend-stdio")]
45 Self::Stdio(_) => Backend::Stdio,
46 #[cfg(feature = "backend-file")]
47 Self::File(_) => Backend::File,
48 #[cfg(not(feature = "backend-kafka"))]
49 Self::None(_) => unreachable!(),
50 }
51 }
52
53 #[cfg(feature = "backend-kafka")]
54 fn get_kafka(&mut self) -> Option<&mut KafkaMessage<'a>> {
55 match self {
56 Self::Kafka(s) => Some(s),
57 #[cfg(feature = "backend-redis")]
58 Self::Redis(_) => None,
59 #[cfg(feature = "backend-stdio")]
60 Self::Stdio(_) => None,
61 #[cfg(feature = "backend-file")]
62 Self::File(_) => None,
63 }
64 }
65
66 #[cfg(feature = "backend-redis")]
67 fn get_redis(&mut self) -> Option<&mut RedisMessage> {
68 match self {
69 #[cfg(feature = "backend-kafka")]
70 Self::Kafka(_) => None,
71 Self::Redis(s) => Some(s),
72 #[cfg(feature = "backend-stdio")]
73 Self::Stdio(_) => None,
74 #[cfg(feature = "backend-file")]
75 Self::File(_) => None,
76 #[cfg(not(feature = "backend-kafka"))]
77 Self::None(_) => None,
78 }
79 }
80
81 #[cfg(feature = "backend-stdio")]
82 fn get_stdio(&mut self) -> Option<&mut StdioMessage> {
83 match self {
84 #[cfg(feature = "backend-kafka")]
85 Self::Kafka(_) => None,
86 #[cfg(feature = "backend-redis")]
87 Self::Redis(_) => None,
88 Self::Stdio(s) => Some(s),
89 #[cfg(feature = "backend-file")]
90 Self::File(_) => None,
91 #[cfg(not(feature = "backend-kafka"))]
92 Self::None(_) => None,
93 }
94 }
95
96 #[cfg(feature = "backend-file")]
97 fn get_file(&mut self) -> Option<&mut FileMessage> {
98 match self {
99 #[cfg(feature = "backend-kafka")]
100 Self::Kafka(_) => None,
101 #[cfg(feature = "backend-redis")]
102 Self::Redis(_) => None,
103 #[cfg(feature = "backend-stdio")]
104 Self::Stdio(_) => None,
105 Self::File(s) => Some(s),
106 #[cfg(not(feature = "backend-kafka"))]
107 Self::None(_) => None,
108 }
109 }
110}
111
112impl<'a> Message for SeaMessage<'a> {
113 fn stream_key(&self) -> StreamKey {
114 match self {
115 #[cfg(feature = "backend-kafka")]
116 Self::Kafka(i) => i.stream_key(),
117 #[cfg(feature = "backend-redis")]
118 Self::Redis(i) => i.stream_key(),
119 #[cfg(feature = "backend-stdio")]
120 Self::Stdio(i) => i.stream_key(),
121 #[cfg(feature = "backend-file")]
122 Self::File(i) => i.stream_key(),
123 #[cfg(not(feature = "backend-kafka"))]
124 Self::None(_) => unreachable!(),
125 }
126 }
127
128 fn shard_id(&self) -> ShardId {
129 match self {
130 #[cfg(feature = "backend-kafka")]
131 Self::Kafka(i) => i.shard_id(),
132 #[cfg(feature = "backend-redis")]
133 Self::Redis(i) => i.shard_id(),
134 #[cfg(feature = "backend-stdio")]
135 Self::Stdio(i) => i.shard_id(),
136 #[cfg(feature = "backend-file")]
137 Self::File(i) => i.shard_id(),
138 #[cfg(not(feature = "backend-kafka"))]
139 Self::None(_) => unreachable!(),
140 }
141 }
142
143 fn sequence(&self) -> SeqNo {
144 match self {
145 #[cfg(feature = "backend-kafka")]
146 Self::Kafka(i) => i.sequence(),
147 #[cfg(feature = "backend-redis")]
148 Self::Redis(i) => i.sequence(),
149 #[cfg(feature = "backend-stdio")]
150 Self::Stdio(i) => i.sequence(),
151 #[cfg(feature = "backend-file")]
152 Self::File(i) => i.sequence(),
153 #[cfg(not(feature = "backend-kafka"))]
154 Self::None(_) => unreachable!(),
155 }
156 }
157
158 fn timestamp(&self) -> Timestamp {
159 match self {
160 #[cfg(feature = "backend-kafka")]
161 Self::Kafka(i) => i.timestamp(),
162 #[cfg(feature = "backend-redis")]
163 Self::Redis(i) => i.timestamp(),
164 #[cfg(feature = "backend-stdio")]
165 Self::Stdio(i) => i.timestamp(),
166 #[cfg(feature = "backend-file")]
167 Self::File(i) => i.timestamp(),
168 #[cfg(not(feature = "backend-kafka"))]
169 Self::None(_) => unreachable!(),
170 }
171 }
172
173 fn message(&self) -> Payload {
174 match self {
175 #[cfg(feature = "backend-kafka")]
176 Self::Kafka(i) => i.message(),
177 #[cfg(feature = "backend-redis")]
178 Self::Redis(i) => i.message(),
179 #[cfg(feature = "backend-stdio")]
180 Self::Stdio(i) => i.message(),
181 #[cfg(feature = "backend-file")]
182 Self::File(i) => i.message(),
183 #[cfg(not(feature = "backend-kafka"))]
184 Self::None(_) => unreachable!(),
185 }
186 }
187}