hyperi_rustlib/transport/
types.rs1use super::error::TransportError;
10use super::traits::CommitToken;
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum TransportType {
19 #[default]
21 Kafka,
22 Grpc,
24 Memory,
26 File,
28 Pipe,
30 Http,
32 Redis,
34}
35
36impl std::fmt::Display for TransportType {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::Kafka => write!(f, "kafka"),
40 Self::Grpc => write!(f, "grpc"),
41 Self::Memory => write!(f, "memory"),
42 Self::File => write!(f, "file"),
43 Self::Pipe => write!(f, "pipe"),
44 Self::Http => write!(f, "http"),
45 Self::Redis => write!(f, "redis"),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase")]
53pub enum PayloadFormat {
54 #[default]
56 Auto,
57 Json,
59 MsgPack,
61}
62
63impl PayloadFormat {
64 #[must_use]
70 pub fn detect(payload: &[u8]) -> Self {
71 if payload.is_empty() {
72 return Self::Json; }
74
75 if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
77 Self::MsgPack
78 } else {
79 Self::Json
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct Message<T: CommitToken> {
88 pub key: Option<Arc<str>>,
90
91 pub payload: Bytes,
93
94 pub token: T,
96
97 pub timestamp_ms: Option<i64>,
99
100 pub format: PayloadFormat,
102}
103
104impl<T: CommitToken> Message<T> {
105 #[must_use]
110 pub fn new(
111 key: Option<Arc<str>>,
112 payload: impl Into<Bytes>,
113 token: T,
114 timestamp_ms: Option<i64>,
115 ) -> Self {
116 let payload = payload.into();
117 let format = PayloadFormat::detect(&payload);
118 Self {
119 key,
120 payload,
121 token,
122 timestamp_ms,
123 format,
124 }
125 }
126
127 #[must_use]
129 pub fn is_json(&self) -> bool {
130 self.format == PayloadFormat::Json
131 }
132
133 #[must_use]
135 pub fn is_msgpack(&self) -> bool {
136 self.format == PayloadFormat::MsgPack
137 }
138}
139
140#[derive(Debug)]
142pub enum SendResult {
143 Ok,
145 Backpressured,
147 Fatal(TransportError),
149 FilteredDlq,
152}
153
154impl SendResult {
155 #[must_use]
157 pub fn is_ok(&self) -> bool {
158 matches!(self, Self::Ok)
159 }
160
161 #[must_use]
163 pub fn is_backpressured(&self) -> bool {
164 matches!(self, Self::Backpressured)
165 }
166
167 #[must_use]
169 pub fn is_fatal(&self) -> bool {
170 matches!(self, Self::Fatal(_))
171 }
172
173 #[must_use]
175 pub fn is_filtered_dlq(&self) -> bool {
176 matches!(self, Self::FilteredDlq)
177 }
178}
179
180#[derive(Debug, Clone, Default, Serialize, Deserialize)]
186pub struct TransportConfig {
187 #[serde(rename = "type", default)]
189 pub transport_type: TransportType,
190
191 #[serde(default)]
193 pub payload_format: PayloadFormat,
194
195 #[cfg(feature = "transport-kafka")]
197 #[serde(default)]
198 pub kafka: Option<super::kafka::KafkaConfig>,
199
200 #[cfg(feature = "transport-grpc")]
202 #[serde(default)]
203 pub grpc: Option<super::grpc::GrpcConfig>,
204
205 #[cfg(feature = "transport-memory")]
207 #[serde(default)]
208 pub memory: Option<super::memory::MemoryConfig>,
209
210 #[cfg(feature = "transport-pipe")]
212 #[serde(default)]
213 pub pipe: Option<super::pipe::PipeTransportConfig>,
214
215 #[cfg(feature = "transport-file")]
217 #[serde(default)]
218 pub file: Option<super::file::FileTransportConfig>,
219
220 #[cfg(feature = "transport-http")]
222 #[serde(default)]
223 pub http: Option<super::http::HttpTransportConfig>,
224
225 #[cfg(not(feature = "transport-kafka"))]
227 #[serde(default, skip)]
228 pub kafka: Option<()>,
229
230 #[cfg(not(feature = "transport-grpc"))]
231 #[serde(default, skip)]
232 pub grpc: Option<()>,
233
234 #[cfg(not(feature = "transport-memory"))]
235 #[serde(default, skip)]
236 pub memory: Option<()>,
237
238 #[cfg(not(feature = "transport-pipe"))]
239 #[serde(default, skip)]
240 pub pipe: Option<()>,
241
242 #[cfg(not(feature = "transport-file"))]
243 #[serde(default, skip)]
244 pub file: Option<()>,
245
246 #[cfg(not(feature = "transport-http"))]
247 #[serde(default, skip)]
248 pub http: Option<()>,
249
250 #[cfg(feature = "transport-redis")]
252 #[serde(default)]
253 pub redis: Option<super::redis_transport::RedisTransportConfig>,
254
255 #[cfg(not(feature = "transport-redis"))]
256 #[serde(default, skip)]
257 pub redis: Option<()>,
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn detect_json_object() {
266 assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
267 }
268
269 #[test]
270 fn detect_json_array() {
271 assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
272 }
273
274 #[test]
275 fn detect_msgpack_fixmap() {
276 assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
278 }
279
280 #[test]
281 fn detect_msgpack_map16() {
282 assert_eq!(
283 PayloadFormat::detect(&[0xde, 0x00, 0x10]),
284 PayloadFormat::MsgPack
285 );
286 }
287
288 #[test]
289 fn detect_empty_defaults_json() {
290 assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
291 }
292}