hyperi_rustlib/transport/
types.rs1use super::error::TransportError;
10use super::traits::CommitToken;
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum TransportType {
19 #[default]
21 Kafka,
22 Grpc,
24 Memory,
26 File,
28 Pipe,
30 Http,
32 Redis,
34}
35
36impl std::fmt::Display for TransportType {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::Kafka => write!(f, "kafka"),
40 Self::Grpc => write!(f, "grpc"),
41 Self::Memory => write!(f, "memory"),
42 Self::File => write!(f, "file"),
43 Self::Pipe => write!(f, "pipe"),
44 Self::Http => write!(f, "http"),
45 Self::Redis => write!(f, "redis"),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase")]
53pub enum PayloadFormat {
54 #[default]
56 Auto,
57 Json,
59 MsgPack,
61}
62
63impl PayloadFormat {
64 #[must_use]
69 pub fn detect(payload: &[u8]) -> Self {
70 if payload.is_empty() {
71 return Self::Json;
72 }
73
74 if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
76 Self::MsgPack
77 } else {
78 Self::Json
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct Message<T: CommitToken> {
87 pub key: Option<Arc<str>>,
89
90 pub payload: Bytes,
92
93 pub token: T,
95
96 pub timestamp_ms: Option<i64>,
98
99 pub format: PayloadFormat,
101}
102
103impl<T: CommitToken> Message<T> {
104 #[must_use]
109 pub fn new(
110 key: Option<Arc<str>>,
111 payload: impl Into<Bytes>,
112 token: T,
113 timestamp_ms: Option<i64>,
114 ) -> Self {
115 let payload = payload.into();
116 let format = PayloadFormat::detect(&payload);
117 Self {
118 key,
119 payload,
120 token,
121 timestamp_ms,
122 format,
123 }
124 }
125
126 #[must_use]
128 pub fn is_json(&self) -> bool {
129 self.format == PayloadFormat::Json
130 }
131
132 #[must_use]
134 pub fn is_msgpack(&self) -> bool {
135 self.format == PayloadFormat::MsgPack
136 }
137}
138
139#[derive(Debug)]
141pub enum SendResult {
142 Ok,
144 Backpressured,
146 Fatal(TransportError),
148 FilteredDlq,
151}
152
153impl SendResult {
154 #[must_use]
156 pub fn is_ok(&self) -> bool {
157 matches!(self, Self::Ok)
158 }
159
160 #[must_use]
162 pub fn is_backpressured(&self) -> bool {
163 matches!(self, Self::Backpressured)
164 }
165
166 #[must_use]
168 pub fn is_fatal(&self) -> bool {
169 matches!(self, Self::Fatal(_))
170 }
171
172 #[must_use]
174 pub fn is_filtered_dlq(&self) -> bool {
175 matches!(self, Self::FilteredDlq)
176 }
177}
178
179#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185pub struct TransportConfig {
186 #[serde(rename = "type", default)]
188 pub transport_type: TransportType,
189
190 #[serde(default)]
192 pub payload_format: PayloadFormat,
193
194 #[cfg(feature = "transport-kafka")]
196 #[serde(default)]
197 pub kafka: Option<super::kafka::KafkaConfig>,
198
199 #[cfg(feature = "transport-grpc")]
201 #[serde(default)]
202 pub grpc: Option<super::grpc::GrpcConfig>,
203
204 #[cfg(feature = "transport-memory")]
206 #[serde(default)]
207 pub memory: Option<super::memory::MemoryConfig>,
208
209 #[cfg(feature = "transport-pipe")]
211 #[serde(default)]
212 pub pipe: Option<super::pipe::PipeTransportConfig>,
213
214 #[cfg(feature = "transport-file")]
216 #[serde(default)]
217 pub file: Option<super::file::FileTransportConfig>,
218
219 #[cfg(feature = "transport-http")]
221 #[serde(default)]
222 pub http: Option<super::http::HttpTransportConfig>,
223
224 #[cfg(not(feature = "transport-kafka"))]
226 #[serde(default, skip)]
227 pub kafka: Option<()>,
228
229 #[cfg(not(feature = "transport-grpc"))]
230 #[serde(default, skip)]
231 pub grpc: Option<()>,
232
233 #[cfg(not(feature = "transport-memory"))]
234 #[serde(default, skip)]
235 pub memory: Option<()>,
236
237 #[cfg(not(feature = "transport-pipe"))]
238 #[serde(default, skip)]
239 pub pipe: Option<()>,
240
241 #[cfg(not(feature = "transport-file"))]
242 #[serde(default, skip)]
243 pub file: Option<()>,
244
245 #[cfg(not(feature = "transport-http"))]
246 #[serde(default, skip)]
247 pub http: Option<()>,
248
249 #[cfg(feature = "transport-redis")]
251 #[serde(default)]
252 pub redis: Option<super::redis_transport::RedisTransportConfig>,
253
254 #[cfg(not(feature = "transport-redis"))]
255 #[serde(default, skip)]
256 pub redis: Option<()>,
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn detect_json_object() {
265 assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
266 }
267
268 #[test]
269 fn detect_json_array() {
270 assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
271 }
272
273 #[test]
274 fn detect_msgpack_fixmap() {
275 assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
277 }
278
279 #[test]
280 fn detect_msgpack_map16() {
281 assert_eq!(
282 PayloadFormat::detect(&[0xde, 0x00, 0x10]),
283 PayloadFormat::MsgPack
284 );
285 }
286
287 #[test]
288 fn detect_empty_defaults_json() {
289 assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
290 }
291}