Skip to main content

hyperi_rustlib/transport/
types.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/types.rs
3// Purpose:   Transport data types and configuration
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportError;
10use super::traits::CommitToken;
11use bytes::Bytes;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15/// Transport type selection.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum TransportType {
19    /// Apache Kafka (production default).
20    #[default]
21    Kafka,
22    /// DFE native gRPC (tonic, inter-service mesh).
23    Grpc,
24    /// In-memory tokio channels (unit tests).
25    Memory,
26    /// NDJSON file (debugging, audit trails, replay).
27    File,
28    /// Unix pipe (stdin/stdout, sidecar pattern).
29    Pipe,
30    /// HTTP/HTTPS (webhook delivery, REST ingest).
31    Http,
32    /// Redis/Valkey Streams (lightweight pub/sub).
33    Redis,
34}
35
36impl std::fmt::Display for TransportType {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::Kafka => write!(f, "kafka"),
40            Self::Grpc => write!(f, "grpc"),
41            Self::Memory => write!(f, "memory"),
42            Self::File => write!(f, "file"),
43            Self::Pipe => write!(f, "pipe"),
44            Self::Http => write!(f, "http"),
45            Self::Redis => write!(f, "redis"),
46        }
47    }
48}
49
50/// Payload format (auto-detected or explicit).
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
52#[serde(rename_all = "lowercase")]
53pub enum PayloadFormat {
54    /// Auto-detect from payload bytes.
55    #[default]
56    Auto,
57    /// JSON format.
58    Json,
59    /// MessagePack format.
60    MsgPack,
61}
62
63impl PayloadFormat {
64    /// Detect format from payload bytes.
65    ///
66    /// MsgPack maps start with 0x80-0x8f (fixmap) or 0xde/0xdf (map16/map32).
67    /// JSON objects start with '{' (0x7b).
68    /// JSON arrays start with '[' (0x5b).
69    #[must_use]
70    pub fn detect(payload: &[u8]) -> Self {
71        if payload.is_empty() {
72            return Self::Json; // Default to JSON for empty
73        }
74
75        // MsgPack: fixmap (0x80-0x8f), map16/32 (0xde/0xdf), fixarray (0x90-0x9f), array16/32 (0xdc/0xdd)
76        if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
77            Self::MsgPack
78        } else {
79            // JSON object/array or whitespace-prefixed JSON - default to JSON
80            Self::Json
81        }
82    }
83}
84
85/// A received message with transport metadata.
86#[derive(Debug, Clone)]
87pub struct Message<T: CommitToken> {
88    /// Routing key (Kafka topic, gRPC metadata topic).
89    pub key: Option<Arc<str>>,
90
91    /// Raw payload bytes -- zero-copy / refcounted. JSON or MsgPack, unchanged.
92    pub payload: Bytes,
93
94    /// Transport-specific commit token.
95    pub token: T,
96
97    /// Message timestamp from transport layer (milliseconds since epoch).
98    pub timestamp_ms: Option<i64>,
99
100    /// Detected payload format.
101    pub format: PayloadFormat,
102}
103
104impl<T: CommitToken> Message<T> {
105    /// Create a new message with auto-detected format.
106    ///
107    /// `payload` accepts any `impl Into<Bytes>` -- pass a `Vec<u8>` and it is
108    /// moved (zero copy); pass a `Bytes` slice and the refcount is bumped.
109    #[must_use]
110    pub fn new(
111        key: Option<Arc<str>>,
112        payload: impl Into<Bytes>,
113        token: T,
114        timestamp_ms: Option<i64>,
115    ) -> Self {
116        let payload = payload.into();
117        let format = PayloadFormat::detect(&payload);
118        Self {
119            key,
120            payload,
121            token,
122            timestamp_ms,
123            format,
124        }
125    }
126
127    /// Returns true if payload is JSON.
128    #[must_use]
129    pub fn is_json(&self) -> bool {
130        self.format == PayloadFormat::Json
131    }
132
133    /// Returns true if payload is MsgPack.
134    #[must_use]
135    pub fn is_msgpack(&self) -> bool {
136        self.format == PayloadFormat::MsgPack
137    }
138}
139
140/// Result of a send operation.
141#[derive(Debug)]
142pub enum SendResult {
143    /// Message accepted.
144    Ok,
145    /// Transport is backpressured, retry later.
146    Backpressured,
147    /// Fatal error, cannot continue.
148    Fatal(TransportError),
149    /// Message matched an outbound filter with `action: dlq`.
150    /// Caller is responsible for DLQ routing.
151    FilteredDlq,
152}
153
154impl SendResult {
155    /// Returns true if send was successful.
156    #[must_use]
157    pub fn is_ok(&self) -> bool {
158        matches!(self, Self::Ok)
159    }
160
161    /// Returns true if backpressured (should retry).
162    #[must_use]
163    pub fn is_backpressured(&self) -> bool {
164        matches!(self, Self::Backpressured)
165    }
166
167    /// Returns true if fatal error.
168    #[must_use]
169    pub fn is_fatal(&self) -> bool {
170        matches!(self, Self::Fatal(_))
171    }
172
173    /// Returns true if filtered for DLQ routing.
174    #[must_use]
175    pub fn is_filtered_dlq(&self) -> bool {
176        matches!(self, Self::FilteredDlq)
177    }
178}
179
180/// Top-level transport configuration.
181///
182/// Used by the transport factory to create the right backend from config.
183/// Each transport type has its own optional config section -- only the one
184/// matching `transport_type` is read.
185#[derive(Debug, Clone, Default, Serialize, Deserialize)]
186pub struct TransportConfig {
187    /// Transport type (kafka, grpc, memory, file, pipe, http, redis).
188    #[serde(rename = "type", default)]
189    pub transport_type: TransportType,
190
191    /// Expected payload format (auto-detect by default).
192    #[serde(default)]
193    pub payload_format: PayloadFormat,
194
195    /// Kafka-specific configuration.
196    #[cfg(feature = "transport-kafka")]
197    #[serde(default)]
198    pub kafka: Option<super::kafka::KafkaConfig>,
199
200    /// gRPC transport configuration.
201    #[cfg(feature = "transport-grpc")]
202    #[serde(default)]
203    pub grpc: Option<super::grpc::GrpcConfig>,
204
205    /// Memory transport configuration (for tests).
206    #[cfg(feature = "transport-memory")]
207    #[serde(default)]
208    pub memory: Option<super::memory::MemoryConfig>,
209
210    /// Pipe transport configuration (stdin/stdout).
211    #[cfg(feature = "transport-pipe")]
212    #[serde(default)]
213    pub pipe: Option<super::pipe::PipeTransportConfig>,
214
215    /// File transport configuration (NDJSON file I/O).
216    #[cfg(feature = "transport-file")]
217    #[serde(default)]
218    pub file: Option<super::file::FileTransportConfig>,
219
220    /// HTTP transport configuration (webhook delivery, REST ingest).
221    #[cfg(feature = "transport-http")]
222    #[serde(default)]
223    pub http: Option<super::http::HttpTransportConfig>,
224
225    // Placeholder fields when features are disabled
226    #[cfg(not(feature = "transport-kafka"))]
227    #[serde(default, skip)]
228    pub kafka: Option<()>,
229
230    #[cfg(not(feature = "transport-grpc"))]
231    #[serde(default, skip)]
232    pub grpc: Option<()>,
233
234    #[cfg(not(feature = "transport-memory"))]
235    #[serde(default, skip)]
236    pub memory: Option<()>,
237
238    #[cfg(not(feature = "transport-pipe"))]
239    #[serde(default, skip)]
240    pub pipe: Option<()>,
241
242    #[cfg(not(feature = "transport-file"))]
243    #[serde(default, skip)]
244    pub file: Option<()>,
245
246    #[cfg(not(feature = "transport-http"))]
247    #[serde(default, skip)]
248    pub http: Option<()>,
249
250    /// Redis/Valkey Streams transport configuration.
251    #[cfg(feature = "transport-redis")]
252    #[serde(default)]
253    pub redis: Option<super::redis_transport::RedisTransportConfig>,
254
255    #[cfg(not(feature = "transport-redis"))]
256    #[serde(default, skip)]
257    pub redis: Option<()>,
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn detect_json_object() {
266        assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
267    }
268
269    #[test]
270    fn detect_json_array() {
271        assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
272    }
273
274    #[test]
275    fn detect_msgpack_fixmap() {
276        // fixmap with 1 element: 0x81
277        assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
278    }
279
280    #[test]
281    fn detect_msgpack_map16() {
282        assert_eq!(
283            PayloadFormat::detect(&[0xde, 0x00, 0x10]),
284            PayloadFormat::MsgPack
285        );
286    }
287
288    #[test]
289    fn detect_empty_defaults_json() {
290        assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
291    }
292}