Skip to main content

hyperi_rustlib/transport/
types.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/types.rs
3// Purpose:   Transport data types and configuration
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportError;
10use super::traits::CommitToken;
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15/// Transport type selection.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum TransportType {
19    /// Apache Kafka (production default).
20    #[default]
21    Kafka,
22    /// DFE native gRPC (tonic, inter-service mesh).
23    Grpc,
24    /// In-memory tokio channels (unit tests).
25    Memory,
26    /// NDJSON file (debugging, audit trails, replay).
27    File,
28    /// Unix pipe (stdin/stdout, sidecar pattern).
29    Pipe,
30    /// HTTP/HTTPS (webhook delivery, REST ingest).
31    Http,
32    /// Redis/Valkey Streams (lightweight pub/sub).
33    Redis,
34}
35
36impl std::fmt::Display for TransportType {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::Kafka => write!(f, "kafka"),
40            Self::Grpc => write!(f, "grpc"),
41            Self::Memory => write!(f, "memory"),
42            Self::File => write!(f, "file"),
43            Self::Pipe => write!(f, "pipe"),
44            Self::Http => write!(f, "http"),
45            Self::Redis => write!(f, "redis"),
46        }
47    }
48}
49
50/// Payload format (auto-detected or explicit).
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase")]
53pub enum PayloadFormat {
54    /// Auto-detect from payload bytes.
55    #[default]
56    Auto,
57    /// JSON format.
58    Json,
59    /// MessagePack format.
60    MsgPack,
61}
62
63impl PayloadFormat {
64    /// Detect format from the first payload byte.
65    ///
66    /// MsgPack map/array lead bytes (0x80-0x9f, 0xdc-0xdf) -> MsgPack;
67    /// everything else (incl. empty) -> JSON.
68    #[must_use]
69    pub fn detect(payload: &[u8]) -> Self {
70        if payload.is_empty() {
71            return Self::Json;
72        }
73
74        // MsgPack: fixmap (0x80-0x8f), map16/32 (0xde/0xdf), fixarray (0x90-0x9f), array16/32 (0xdc/0xdd)
75        if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
76            Self::MsgPack
77        } else {
78            // JSON object/array or whitespace-prefixed JSON - default to JSON
79            Self::Json
80        }
81    }
82}
83
84/// A received message with transport metadata.
85#[derive(Debug, Clone)]
86pub struct Message<T: CommitToken> {
87    /// Routing key (Kafka topic, gRPC metadata topic).
88    pub key: Option<Arc<str>>,
89
90    /// Raw payload bytes -- zero-copy / refcounted. JSON or MsgPack, unchanged.
91    pub payload: Bytes,
92
93    /// Transport-specific commit token.
94    pub token: T,
95
96    /// Message timestamp from transport layer (milliseconds since epoch).
97    pub timestamp_ms: Option<i64>,
98
99    /// Detected payload format.
100    pub format: PayloadFormat,
101}
102
103impl<T: CommitToken> Message<T> {
104    /// Create a new message with auto-detected format.
105    ///
106    /// `payload` accepts any `impl Into<Bytes>` -- pass a `Vec<u8>` and it is
107    /// moved (zero copy); pass a `Bytes` slice and the refcount is bumped.
108    #[must_use]
109    pub fn new(
110        key: Option<Arc<str>>,
111        payload: impl Into<Bytes>,
112        token: T,
113        timestamp_ms: Option<i64>,
114    ) -> Self {
115        let payload = payload.into();
116        let format = PayloadFormat::detect(&payload);
117        Self {
118            key,
119            payload,
120            token,
121            timestamp_ms,
122            format,
123        }
124    }
125
126    /// Returns true if payload is JSON.
127    #[must_use]
128    pub fn is_json(&self) -> bool {
129        self.format == PayloadFormat::Json
130    }
131
132    /// Returns true if payload is MsgPack.
133    #[must_use]
134    pub fn is_msgpack(&self) -> bool {
135        self.format == PayloadFormat::MsgPack
136    }
137}
138
139/// Result of a send operation.
140#[derive(Debug)]
141pub enum SendResult {
142    /// Message accepted.
143    Ok,
144    /// Transport is backpressured, retry later.
145    Backpressured,
146    /// Fatal error, cannot continue.
147    Fatal(TransportError),
148    /// Message matched an outbound filter with `action: dlq`.
149    /// Caller is responsible for DLQ routing.
150    FilteredDlq,
151}
152
153impl SendResult {
154    /// Returns true if send was successful.
155    #[must_use]
156    pub fn is_ok(&self) -> bool {
157        matches!(self, Self::Ok)
158    }
159
160    /// Returns true if backpressured (should retry).
161    #[must_use]
162    pub fn is_backpressured(&self) -> bool {
163        matches!(self, Self::Backpressured)
164    }
165
166    /// Returns true if fatal error.
167    #[must_use]
168    pub fn is_fatal(&self) -> bool {
169        matches!(self, Self::Fatal(_))
170    }
171
172    /// Returns true if filtered for DLQ routing.
173    #[must_use]
174    pub fn is_filtered_dlq(&self) -> bool {
175        matches!(self, Self::FilteredDlq)
176    }
177}
178
179/// Top-level transport configuration.
180///
181/// Used by the transport factory to create the right backend from config.
182/// Each transport type has its own optional config section -- only the one
183/// matching `transport_type` is read.
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185pub struct TransportConfig {
186    /// Transport type (kafka, grpc, memory, file, pipe, http, redis).
187    #[serde(rename = "type", default)]
188    pub transport_type: TransportType,
189
190    /// Expected payload format (auto-detect by default).
191    #[serde(default)]
192    pub payload_format: PayloadFormat,
193
194    /// Kafka-specific configuration.
195    #[cfg(feature = "transport-kafka")]
196    #[serde(default)]
197    pub kafka: Option<super::kafka::KafkaConfig>,
198
199    /// gRPC transport configuration.
200    #[cfg(feature = "transport-grpc")]
201    #[serde(default)]
202    pub grpc: Option<super::grpc::GrpcConfig>,
203
204    /// Memory transport configuration (for tests).
205    #[cfg(feature = "transport-memory")]
206    #[serde(default)]
207    pub memory: Option<super::memory::MemoryConfig>,
208
209    /// Pipe transport configuration (stdin/stdout).
210    #[cfg(feature = "transport-pipe")]
211    #[serde(default)]
212    pub pipe: Option<super::pipe::PipeTransportConfig>,
213
214    /// File transport configuration (NDJSON file I/O).
215    #[cfg(feature = "transport-file")]
216    #[serde(default)]
217    pub file: Option<super::file::FileTransportConfig>,
218
219    /// HTTP transport configuration (webhook delivery, REST ingest).
220    #[cfg(feature = "transport-http")]
221    #[serde(default)]
222    pub http: Option<super::http::HttpTransportConfig>,
223
224    // Placeholder fields when features are disabled
225    #[cfg(not(feature = "transport-kafka"))]
226    #[serde(default, skip)]
227    pub kafka: Option<()>,
228
229    #[cfg(not(feature = "transport-grpc"))]
230    #[serde(default, skip)]
231    pub grpc: Option<()>,
232
233    #[cfg(not(feature = "transport-memory"))]
234    #[serde(default, skip)]
235    pub memory: Option<()>,
236
237    #[cfg(not(feature = "transport-pipe"))]
238    #[serde(default, skip)]
239    pub pipe: Option<()>,
240
241    #[cfg(not(feature = "transport-file"))]
242    #[serde(default, skip)]
243    pub file: Option<()>,
244
245    #[cfg(not(feature = "transport-http"))]
246    #[serde(default, skip)]
247    pub http: Option<()>,
248
249    /// Redis/Valkey Streams transport configuration.
250    #[cfg(feature = "transport-redis")]
251    #[serde(default)]
252    pub redis: Option<super::redis_transport::RedisTransportConfig>,
253
254    #[cfg(not(feature = "transport-redis"))]
255    #[serde(default, skip)]
256    pub redis: Option<()>,
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn detect_json_object() {
265        assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
266    }
267
268    #[test]
269    fn detect_json_array() {
270        assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
271    }
272
273    #[test]
274    fn detect_msgpack_fixmap() {
275        // fixmap with 1 element: 0x81
276        assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
277    }
278
279    #[test]
280    fn detect_msgpack_map16() {
281        assert_eq!(
282            PayloadFormat::detect(&[0xde, 0x00, 0x10]),
283            PayloadFormat::MsgPack
284        );
285    }
286
287    #[test]
288    fn detect_empty_defaults_json() {
289        assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
290    }
291}