hyperi_rustlib/transport/
types.rs1use super::error::TransportError;
10use super::traits::CommitToken;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum TransportType {
18 #[default]
20 Kafka,
21 Grpc,
23 Memory,
25 File,
27 Pipe,
29 Http,
31 Redis,
33}
34
35impl std::fmt::Display for TransportType {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 Self::Kafka => write!(f, "kafka"),
39 Self::Grpc => write!(f, "grpc"),
40 Self::Memory => write!(f, "memory"),
41 Self::File => write!(f, "file"),
42 Self::Pipe => write!(f, "pipe"),
43 Self::Http => write!(f, "http"),
44 Self::Redis => write!(f, "redis"),
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
51#[serde(rename_all = "lowercase")]
52pub enum PayloadFormat {
53 #[default]
55 Auto,
56 Json,
58 MsgPack,
60}
61
62impl PayloadFormat {
63 #[must_use]
69 pub fn detect(payload: &[u8]) -> Self {
70 if payload.is_empty() {
71 return Self::Json; }
73
74 if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
76 Self::MsgPack
77 } else {
78 Self::Json
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct Message<T: CommitToken> {
87 pub key: Option<Arc<str>>,
89
90 pub payload: Vec<u8>,
92
93 pub token: T,
95
96 pub timestamp_ms: Option<i64>,
98
99 pub format: PayloadFormat,
101}
102
103impl<T: CommitToken> Message<T> {
104 #[must_use]
106 pub fn new(
107 key: Option<Arc<str>>,
108 payload: Vec<u8>,
109 token: T,
110 timestamp_ms: Option<i64>,
111 ) -> Self {
112 let format = PayloadFormat::detect(&payload);
113 Self {
114 key,
115 payload,
116 token,
117 timestamp_ms,
118 format,
119 }
120 }
121
122 #[must_use]
124 pub fn is_json(&self) -> bool {
125 self.format == PayloadFormat::Json
126 }
127
128 #[must_use]
130 pub fn is_msgpack(&self) -> bool {
131 self.format == PayloadFormat::MsgPack
132 }
133}
134
135#[derive(Debug)]
137pub enum SendResult {
138 Ok,
140 Backpressured,
142 Fatal(TransportError),
144 FilteredDlq,
147}
148
149impl SendResult {
150 #[must_use]
152 pub fn is_ok(&self) -> bool {
153 matches!(self, Self::Ok)
154 }
155
156 #[must_use]
158 pub fn is_backpressured(&self) -> bool {
159 matches!(self, Self::Backpressured)
160 }
161
162 #[must_use]
164 pub fn is_fatal(&self) -> bool {
165 matches!(self, Self::Fatal(_))
166 }
167
168 #[must_use]
170 pub fn is_filtered_dlq(&self) -> bool {
171 matches!(self, Self::FilteredDlq)
172 }
173}
174
175#[derive(Debug, Clone, Default, Serialize, Deserialize)]
181pub struct TransportConfig {
182 #[serde(rename = "type", default)]
184 pub transport_type: TransportType,
185
186 #[serde(default)]
188 pub payload_format: PayloadFormat,
189
190 #[cfg(feature = "transport-kafka")]
192 #[serde(default)]
193 pub kafka: Option<super::kafka::KafkaConfig>,
194
195 #[cfg(feature = "transport-grpc")]
197 #[serde(default)]
198 pub grpc: Option<super::grpc::GrpcConfig>,
199
200 #[cfg(feature = "transport-memory")]
202 #[serde(default)]
203 pub memory: Option<super::memory::MemoryConfig>,
204
205 #[cfg(feature = "transport-pipe")]
207 #[serde(default)]
208 pub pipe: Option<super::pipe::PipeTransportConfig>,
209
210 #[cfg(feature = "transport-file")]
212 #[serde(default)]
213 pub file: Option<super::file::FileTransportConfig>,
214
215 #[cfg(feature = "transport-http")]
217 #[serde(default)]
218 pub http: Option<super::http::HttpTransportConfig>,
219
220 #[cfg(not(feature = "transport-kafka"))]
222 #[serde(default, skip)]
223 pub kafka: Option<()>,
224
225 #[cfg(not(feature = "transport-grpc"))]
226 #[serde(default, skip)]
227 pub grpc: Option<()>,
228
229 #[cfg(not(feature = "transport-memory"))]
230 #[serde(default, skip)]
231 pub memory: Option<()>,
232
233 #[cfg(not(feature = "transport-pipe"))]
234 #[serde(default, skip)]
235 pub pipe: Option<()>,
236
237 #[cfg(not(feature = "transport-file"))]
238 #[serde(default, skip)]
239 pub file: Option<()>,
240
241 #[cfg(not(feature = "transport-http"))]
242 #[serde(default, skip)]
243 pub http: Option<()>,
244
245 #[cfg(feature = "transport-redis")]
247 #[serde(default)]
248 pub redis: Option<super::redis_transport::RedisTransportConfig>,
249
250 #[cfg(not(feature = "transport-redis"))]
251 #[serde(default, skip)]
252 pub redis: Option<()>,
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn detect_json_object() {
261 assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
262 }
263
264 #[test]
265 fn detect_json_array() {
266 assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
267 }
268
269 #[test]
270 fn detect_msgpack_fixmap() {
271 assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
273 }
274
275 #[test]
276 fn detect_msgpack_map16() {
277 assert_eq!(
278 PayloadFormat::detect(&[0xde, 0x00, 0x10]),
279 PayloadFormat::MsgPack
280 );
281 }
282
283 #[test]
284 fn detect_empty_defaults_json() {
285 assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
286 }
287}