hyperi_rustlib/transport/traits.rs
1// Project: hyperi-rustlib
2// File: src/transport/traits.rs
3// Purpose: Transport trait definitions (sender + receiver split)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use std::fmt::{Debug, Display};
13use std::future::Future;
14
15/// Transport-specific token for commit/acknowledgment.
16///
17/// Each transport implementation provides its own token type that
18/// captures the information needed to acknowledge message processing.
19pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
20 /// Get a string representation for logging/debugging.
21 fn as_str(&self) -> String {
22 format!("{self}")
23 }
24}
25
26/// Result of a [`TransportReceiver::recv`] call.
27///
28/// Carries the messages that passed inbound filtering AND any entries those
29/// filters routed to DLQ, so a caller cannot accidentally lose dead-letters by
30/// forgetting a separate drain step (the previous two-call
31/// `recv()` + `take_filtered_dlq_entries()` contract). The caller routes
32/// `dlq_entries` onward via its own DLQ handle.
33#[derive(Debug)]
34pub struct RecvBatch<T: CommitToken> {
35 /// Messages that passed all inbound filters (or had no filter match).
36 pub messages: Vec<Message<T>>,
37 /// Entries matched by `action: dlq` inbound filters. Caller routes to DLQ.
38 pub dlq_entries: Vec<FilteredDlqEntry>,
39}
40
41impl<T: CommitToken> RecvBatch<T> {
42 /// An empty batch (no messages, no DLQ entries).
43 #[must_use]
44 pub fn empty() -> Self {
45 Self {
46 messages: Vec::new(),
47 dlq_entries: Vec::new(),
48 }
49 }
50
51 /// A batch of messages with no DLQ entries (e.g. filters disabled).
52 #[must_use]
53 pub fn from_messages(messages: Vec<Message<T>>) -> Self {
54 Self {
55 messages,
56 dlq_entries: Vec::new(),
57 }
58 }
59
60 /// Whether the batch has no messages (DLQ entries may still be present).
61 #[must_use]
62 pub fn is_empty(&self) -> bool {
63 self.messages.is_empty()
64 }
65
66 /// Number of passing messages.
67 #[must_use]
68 pub fn len(&self) -> usize {
69 self.messages.len()
70 }
71}
72
73/// Common transport operations shared by senders and receivers.
74///
75/// Every transport implementation provides these lifecycle and
76/// introspection methods regardless of direction.
77pub trait TransportBase: Send + Sync {
78 /// Shutdown the transport gracefully.
79 fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
80
81 /// Check if the transport is healthy and connected.
82 fn is_healthy(&self) -> bool;
83
84 /// Get transport name for logging/metrics.
85 fn name(&self) -> &'static str;
86}
87
88/// Send-side transport.
89///
90/// Extends `TransportBase` with send capability. The factory returns
91/// `AnySender` (enum dispatch) for runtime transport selection.
92///
93/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
94/// when a `MetricsManager` recorder is installed.
95pub trait TransportSender: TransportBase {
96 /// Send raw bytes to a key/destination.
97 ///
98 /// The `key` semantics depend on the transport:
99 /// - Kafka: topic name
100 /// - gRPC: metadata routing key
101 /// - HTTP: URL path suffix or ignored
102 /// - File: filename suffix or ignored
103 /// - Redis: stream name
104 /// - Pipe: ignored (single stdout)
105 fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
106}
107
108/// Receive-side transport -- generic over commit token type.
109///
110/// Extends `TransportBase` with receive and commit capability.
111/// Input stages (receiver, fetcher) use concrete implementations
112/// directly for type-safe token handling.
113pub trait TransportReceiver: TransportBase {
114 /// The token type for this transport.
115 type Token: CommitToken;
116
117 /// Receive up to `max` messages.
118 ///
119 /// Returns immediately with available messages (may be fewer than `max`).
120 /// Returns an empty batch if no messages are available.
121 ///
122 /// **Filter behaviour:** if the transport has inbound filters configured,
123 /// `recv()` removes messages matching `action: drop` filters and returns
124 /// messages matching `action: dlq` filters in [`RecvBatch`]`.dlq_entries`
125 /// alongside the passing [`RecvBatch`]`.messages`. Route the DLQ entries via
126 /// your own DLQ handle -- they cannot be silently lost.
127 ///
128 /// # Example
129 ///
130 /// ```rust,ignore
131 /// let batch = transport.recv(100).await?;
132 /// for entry in batch.dlq_entries {
133 /// dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
134 /// }
135 /// for msg in batch.messages { /* process */ }
136 /// ```
137 fn recv(
138 &self,
139 max: usize,
140 ) -> impl Future<Output = TransportResult<RecvBatch<Self::Token>>> + Send;
141
142 /// Commit/acknowledge processed messages.
143 ///
144 /// - Kafka: commits consumer offsets
145 /// - gRPC: no-op (no persistence)
146 /// - Redis: XACK
147 /// - File: advances read position
148 /// - Memory: advances internal sequence
149 fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
150}
151
152/// Combined transport -- implements both send and receive.
153///
154/// Convenience trait for transports that support bidirectional communication.
155/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
156/// implement this. Automatically implemented via blanket impl.
157pub trait Transport: TransportSender + TransportReceiver {}
158
159/// Blanket impl: anything that implements both traits is a Transport.
160impl<T: TransportSender + TransportReceiver> Transport for T {}
161
162/// Load a transport config from the cascade under a fixed key.
163///
164/// Consolidates the byte-identical `from_cascade()` bodies that each transport
165/// config previously repeated (try the cascade under a key, register the
166/// section, fall back to `Default`). Implementors only name their key; the
167/// loading logic lives here once. Without the `config` feature the default
168/// method just returns `Default::default()`.
169pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
170 /// Load `Self` from the config cascade under `key`, registering the section
171 /// in the global registry; falls back to `Default` if the cascade is
172 /// unavailable or the key is absent/invalid.
173 #[must_use]
174 fn from_cascade_key(key: &str) -> Self {
175 #[cfg(feature = "config")]
176 {
177 if let Some(cfg) = crate::config::try_get()
178 && let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
179 {
180 return value;
181 }
182 }
183 // Without `config`, or on any cascade miss, use defaults.
184 #[cfg(not(feature = "config"))]
185 let _ = key;
186 Self::default()
187 }
188}