Skip to main content

hyperi_rustlib/transport/
types.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/types.rs
3// Purpose:   Transport data types and configuration
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportError;
10use super::traits::CommitToken;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14/// Transport type selection.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum TransportType {
18    /// Apache Kafka (production default).
19    #[default]
20    Kafka,
21    /// DFE native gRPC (tonic, inter-service mesh).
22    Grpc,
23    /// In-memory tokio channels (unit tests).
24    Memory,
25    /// NDJSON file (debugging, audit trails, replay).
26    File,
27    /// Unix pipe (stdin/stdout, sidecar pattern).
28    Pipe,
29    /// HTTP/HTTPS (webhook delivery, REST ingest).
30    Http,
31    /// Redis/Valkey Streams (lightweight pub/sub).
32    Redis,
33}
34
35impl std::fmt::Display for TransportType {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::Kafka => write!(f, "kafka"),
39            Self::Grpc => write!(f, "grpc"),
40            Self::Memory => write!(f, "memory"),
41            Self::File => write!(f, "file"),
42            Self::Pipe => write!(f, "pipe"),
43            Self::Http => write!(f, "http"),
44            Self::Redis => write!(f, "redis"),
45        }
46    }
47}
48
49/// Payload format (auto-detected or explicit).
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
51#[serde(rename_all = "lowercase")]
52pub enum PayloadFormat {
53    /// Auto-detect from payload bytes.
54    #[default]
55    Auto,
56    /// JSON format.
57    Json,
58    /// MessagePack format.
59    MsgPack,
60}
61
62impl PayloadFormat {
63    /// Detect format from payload bytes.
64    ///
65    /// MsgPack maps start with 0x80-0x8f (fixmap) or 0xde/0xdf (map16/map32).
66    /// JSON objects start with '{' (0x7b).
67    /// JSON arrays start with '[' (0x5b).
68    #[must_use]
69    pub fn detect(payload: &[u8]) -> Self {
70        if payload.is_empty() {
71            return Self::Json; // Default to JSON for empty
72        }
73
74        // MsgPack: fixmap (0x80-0x8f), map16/32 (0xde/0xdf), fixarray (0x90-0x9f), array16/32 (0xdc/0xdd)
75        if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
76            Self::MsgPack
77        } else {
78            // JSON object/array or whitespace-prefixed JSON - default to JSON
79            Self::Json
80        }
81    }
82}
83
84/// A received message with transport metadata.
85#[derive(Debug, Clone)]
86pub struct Message<T: CommitToken> {
87    /// Routing key (Kafka topic, gRPC metadata topic).
88    pub key: Option<Arc<str>>,
89
90    /// Raw payload bytes - JSON or MsgPack, unchanged.
91    pub payload: Vec<u8>,
92
93    /// Transport-specific commit token.
94    pub token: T,
95
96    /// Message timestamp from transport layer (milliseconds since epoch).
97    pub timestamp_ms: Option<i64>,
98
99    /// Detected payload format.
100    pub format: PayloadFormat,
101}
102
103impl<T: CommitToken> Message<T> {
104    /// Create a new message with auto-detected format.
105    #[must_use]
106    pub fn new(
107        key: Option<Arc<str>>,
108        payload: Vec<u8>,
109        token: T,
110        timestamp_ms: Option<i64>,
111    ) -> Self {
112        let format = PayloadFormat::detect(&payload);
113        Self {
114            key,
115            payload,
116            token,
117            timestamp_ms,
118            format,
119        }
120    }
121
122    /// Returns true if payload is JSON.
123    #[must_use]
124    pub fn is_json(&self) -> bool {
125        self.format == PayloadFormat::Json
126    }
127
128    /// Returns true if payload is MsgPack.
129    #[must_use]
130    pub fn is_msgpack(&self) -> bool {
131        self.format == PayloadFormat::MsgPack
132    }
133}
134
135/// Result of a send operation.
136#[derive(Debug)]
137pub enum SendResult {
138    /// Message accepted.
139    Ok,
140    /// Transport is backpressured, retry later.
141    Backpressured,
142    /// Fatal error, cannot continue.
143    Fatal(TransportError),
144    /// Message matched an outbound filter with `action: dlq`.
145    /// Caller is responsible for DLQ routing.
146    FilteredDlq,
147}
148
149impl SendResult {
150    /// Returns true if send was successful.
151    #[must_use]
152    pub fn is_ok(&self) -> bool {
153        matches!(self, Self::Ok)
154    }
155
156    /// Returns true if backpressured (should retry).
157    #[must_use]
158    pub fn is_backpressured(&self) -> bool {
159        matches!(self, Self::Backpressured)
160    }
161
162    /// Returns true if fatal error.
163    #[must_use]
164    pub fn is_fatal(&self) -> bool {
165        matches!(self, Self::Fatal(_))
166    }
167
168    /// Returns true if filtered for DLQ routing.
169    #[must_use]
170    pub fn is_filtered_dlq(&self) -> bool {
171        matches!(self, Self::FilteredDlq)
172    }
173}
174
175/// Top-level transport configuration.
176///
177/// Used by the transport factory to create the right backend from config.
178/// Each transport type has its own optional config section -- only the one
179/// matching `transport_type` is read.
180#[derive(Debug, Clone, Default, Serialize, Deserialize)]
181pub struct TransportConfig {
182    /// Transport type (kafka, grpc, memory, file, pipe, http, redis).
183    #[serde(rename = "type", default)]
184    pub transport_type: TransportType,
185
186    /// Expected payload format (auto-detect by default).
187    #[serde(default)]
188    pub payload_format: PayloadFormat,
189
190    /// Kafka-specific configuration.
191    #[cfg(feature = "transport-kafka")]
192    #[serde(default)]
193    pub kafka: Option<super::kafka::KafkaConfig>,
194
195    /// gRPC transport configuration.
196    #[cfg(feature = "transport-grpc")]
197    #[serde(default)]
198    pub grpc: Option<super::grpc::GrpcConfig>,
199
200    /// Memory transport configuration (for tests).
201    #[cfg(feature = "transport-memory")]
202    #[serde(default)]
203    pub memory: Option<super::memory::MemoryConfig>,
204
205    /// Pipe transport configuration (stdin/stdout).
206    #[cfg(feature = "transport-pipe")]
207    #[serde(default)]
208    pub pipe: Option<super::pipe::PipeTransportConfig>,
209
210    /// File transport configuration (NDJSON file I/O).
211    #[cfg(feature = "transport-file")]
212    #[serde(default)]
213    pub file: Option<super::file::FileTransportConfig>,
214
215    /// HTTP transport configuration (webhook delivery, REST ingest).
216    #[cfg(feature = "transport-http")]
217    #[serde(default)]
218    pub http: Option<super::http::HttpTransportConfig>,
219
220    // Placeholder fields when features are disabled
221    #[cfg(not(feature = "transport-kafka"))]
222    #[serde(default, skip)]
223    pub kafka: Option<()>,
224
225    #[cfg(not(feature = "transport-grpc"))]
226    #[serde(default, skip)]
227    pub grpc: Option<()>,
228
229    #[cfg(not(feature = "transport-memory"))]
230    #[serde(default, skip)]
231    pub memory: Option<()>,
232
233    #[cfg(not(feature = "transport-pipe"))]
234    #[serde(default, skip)]
235    pub pipe: Option<()>,
236
237    #[cfg(not(feature = "transport-file"))]
238    #[serde(default, skip)]
239    pub file: Option<()>,
240
241    #[cfg(not(feature = "transport-http"))]
242    #[serde(default, skip)]
243    pub http: Option<()>,
244
245    /// Redis/Valkey Streams transport configuration.
246    #[cfg(feature = "transport-redis")]
247    #[serde(default)]
248    pub redis: Option<super::redis_transport::RedisTransportConfig>,
249
250    #[cfg(not(feature = "transport-redis"))]
251    #[serde(default, skip)]
252    pub redis: Option<()>,
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn detect_json_object() {
261        assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
262    }
263
264    #[test]
265    fn detect_json_array() {
266        assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
267    }
268
269    #[test]
270    fn detect_msgpack_fixmap() {
271        // fixmap with 1 element: 0x81
272        assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
273    }
274
275    #[test]
276    fn detect_msgpack_map16() {
277        assert_eq!(
278            PayloadFormat::detect(&[0xde, 0x00, 0x10]),
279            PayloadFormat::MsgPack
280        );
281    }
282
283    #[test]
284    fn detect_empty_defaults_json() {
285        assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
286    }
287}