hyperi_rustlib/transport/traits.rs
1// Project: hyperi-rustlib
2// File: src/transport/traits.rs
3// Purpose: Transport trait definitions (sender + receiver split)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use super::work_batch::{Record, WorkBatch};
13use std::fmt::{Debug, Display};
14use std::future::Future;
15
16/// Transport-specific token for commit/acknowledgment.
17///
18/// Each transport implementation provides its own token type that
19/// captures the information needed to acknowledge message processing.
20pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
21 /// Get a string representation for logging/debugging.
22 fn as_str(&self) -> String {
23 format!("{self}")
24 }
25}
26
27/// Result of a [`TransportReceiver::recv`] call.
28///
29/// Carries the messages that passed inbound filtering AND any entries those
30/// filters routed to DLQ, so a caller cannot accidentally lose dead-letters by
31/// forgetting a separate drain step (the previous two-call
32/// `recv()` + `take_filtered_dlq_entries()` contract). The caller routes
33/// `dlq_entries` onward via its own DLQ handle.
34#[derive(Debug)]
35pub struct RecvBatch<T: CommitToken> {
36 /// Messages that passed all inbound filters (or had no filter match).
37 pub messages: Vec<Message<T>>,
38 /// Entries matched by `action: dlq` inbound filters. Caller routes to DLQ.
39 pub dlq_entries: Vec<FilteredDlqEntry>,
40}
41
42impl<T: CommitToken> RecvBatch<T> {
43 /// An empty batch (no messages, no DLQ entries).
44 #[must_use]
45 pub fn empty() -> Self {
46 Self {
47 messages: Vec::new(),
48 dlq_entries: Vec::new(),
49 }
50 }
51
52 /// A batch of messages with no DLQ entries (e.g. filters disabled).
53 #[must_use]
54 pub fn from_messages(messages: Vec<Message<T>>) -> Self {
55 Self {
56 messages,
57 dlq_entries: Vec::new(),
58 }
59 }
60
61 /// Whether the batch has no messages (DLQ entries may still be present).
62 #[must_use]
63 pub fn is_empty(&self) -> bool {
64 self.messages.is_empty()
65 }
66
67 /// Number of passing messages.
68 #[must_use]
69 pub fn len(&self) -> usize {
70 self.messages.len()
71 }
72}
73
74/// Common transport operations shared by senders and receivers.
75///
76/// Every transport implementation provides these lifecycle and
77/// introspection methods regardless of direction.
78pub trait TransportBase: Send + Sync {
79 /// Shutdown the transport gracefully.
80 fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
81
82 /// Check if the transport is healthy and connected.
83 fn is_healthy(&self) -> bool;
84
85 /// Get transport name for logging/metrics.
86 fn name(&self) -> &'static str;
87}
88
89/// Send-side transport.
90///
91/// Extends `TransportBase` with send capability. The factory returns
92/// `AnySender` (enum dispatch) for runtime transport selection.
93///
94/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
95/// when a `MetricsManager` recorder is installed.
96pub trait TransportSender: TransportBase {
97 /// Send raw bytes to a key/destination.
98 ///
99 /// The `key` semantics depend on the transport:
100 /// - Kafka: topic name
101 /// - gRPC: metadata routing key
102 /// - HTTP: URL path suffix or ignored
103 /// - File: filename suffix or ignored
104 /// - Redis: stream name
105 /// - Pipe: ignored (single stdout)
106 fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
107
108 /// Send a whole block of [`Record`]s in one shot.
109 ///
110 /// The default sends each record individually via [`send`](Self::send),
111 /// using the record's own `key` (empty string when `None`) and its payload
112 /// `Bytes` (a refcount bump, not a copy). Transports with a native batch RPC
113 /// (e.g. gRPC's `RouteBatch`) override this to send the whole block in a
114 /// single call -- serde-less and round-trip-cheaper.
115 ///
116 /// Commit tokens and inline-DLQ entries are NOT sent: they are the SENDER's
117 /// local concern. Pass the records (e.g. `&workbatch.records`) and fire the
118 /// commit tokens locally after this returns [`SendResult::Ok`].
119 ///
120 /// ## At-least-once caveat -- per-record fallback can partially send
121 ///
122 /// The default is NOT atomic. If record `k` of `n` returns a non-`Ok`
123 /// result, records `0..k` are already on the wire and this returns that
124 /// first non-`Ok` result WITHOUT unsending them. The caller treats the whole
125 /// block as not-yet-committed and retries it; the already-sent prefix is
126 /// re-delivered (at-least-once -- duplicates, never loss). A `Backpressured`
127 /// or `Fatal` short-circuits (no further records are attempted) so the caller
128 /// retries the remainder rather than skipping past a transient failure. A
129 /// native batch override (gRPC) avoids the partial-send window entirely: the
130 /// whole block is one RPC, accepted or not.
131 fn send_batch(&self, records: &[Record]) -> impl Future<Output = SendResult> + Send {
132 async move {
133 for record in records {
134 let key = record.key.as_deref().unwrap_or("");
135 let result = self.send(key, record.payload.clone()).await;
136 if !result.is_ok() {
137 // Backpressured / Fatal / FilteredDlq -- stop here so the
138 // caller retries the (unconfirmed) remainder of the block.
139 return result;
140 }
141 }
142 SendResult::Ok
143 }
144 }
145}
146
147/// Limits for a single byte-aware [`TransportReceiver::recv_limited`] poll.
148///
149/// A bare [`recv`](TransportReceiver::recv) takes only a RECORD cap, so a single
150/// poll can build a [`WorkBatch`] whose total bytes are arbitrarily larger than
151/// any memory budget (and, for the Kafka recv-arena, allocate one arena for the
152/// whole poll). `RecvLimits` adds the missing BYTE bound: the governed driver
153/// passes the self-regulation byte budget here so a single governed recv never
154/// retains more than `max_bytes` (plus the one-oversized-record floor) of
155/// inbound payload before the sub-block split runs.
156///
157/// `max_records` is the existing poll-safety record cap (a tiny-record flood
158/// cannot blow the count even within the byte budget). `max_bytes` is the new
159/// payload-bytes ceiling for the poll.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub struct RecvLimits {
162 /// Hard cap on the number of records returned by one poll (`>= 1`).
163 pub max_records: usize,
164 /// Soft cap on the SUM of `payload.len()` returned by one poll. A transport
165 /// that has accumulated at least one record stops draining once the
166 /// accumulated payload bytes reach this value, so the poll never retains
167 /// more than `max_bytes + one oversized record`. A transport with no
168 /// byte-aware drain (the default impl) ignores this and bounds by
169 /// `max_records` only.
170 pub max_bytes: u64,
171}
172
173/// Receive-side transport -- generic over commit token type.
174///
175/// Extends `TransportBase` with receive and commit capability.
176/// Input stages (receiver, fetcher) use concrete implementations
177/// directly for type-safe token handling.
178pub trait TransportReceiver: TransportBase {
179 /// The token type for this transport.
180 type Token: CommitToken;
181
182 /// Receive up to `max` records as one [`WorkBatch`].
183 ///
184 /// Returns immediately with available records (may be fewer than `max`).
185 /// Returns an empty batch if no records are available. The source acks for
186 /// the whole block live on [`WorkBatch::commit_tokens`] -- they are decoupled
187 /// from `records.len()` so a downstream fan-out cannot disturb them.
188 ///
189 /// **Filter behaviour:** if the transport has inbound filters configured,
190 /// `recv()` removes records matching `action: drop` filters and carries
191 /// records matching `action: dlq` filters in [`WorkBatch`]`.dlq_entries`
192 /// alongside the passing [`WorkBatch`]`.records`. Route the DLQ entries via
193 /// your own DLQ handle -- they cannot be silently lost.
194 ///
195 /// # Example
196 ///
197 /// ```rust,ignore
198 /// let batch = transport.recv(100).await?;
199 /// for entry in batch.dlq_entries {
200 /// dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
201 /// }
202 /// for record in batch.records { /* process */ }
203 /// ```
204 fn recv(
205 &self,
206 max: usize,
207 ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
208
209 /// Byte-aware receive: bound a single poll by BOTH a record cap and a
210 /// payload-byte cap (see [`RecvLimits`]).
211 ///
212 /// This is the receive limit the governed driver uses so the
213 /// self-regulation byte budget actually constrains RECEIVE memory -- not
214 /// just the post-recv sub-block lease. A single poll retains at most
215 /// `limits.max_bytes` of payload (plus one oversized record under the
216 /// floor), so the in-flight inbound footprint is bounded BEFORE the
217 /// sub-block split, never after.
218 ///
219 /// **Default impl:** falls back to [`recv`](Self::recv)`(limits.max_records)`
220 /// -- record-bounded only, byte cap ignored. This keeps every transport
221 /// green with no churn; only transports that buffer a whole poll's bytes in
222 /// one allocation (Kafka's recv-arena) override it to honour `max_bytes`.
223 /// Channel/stream transports (Memory, gRPC, ...) already retain only one
224 /// record's bytes at a time, so the record-bounded fallback is correct for
225 /// them.
226 ///
227 /// Filter behaviour and the `commit_tokens` contract are identical to
228 /// [`recv`](Self::recv).
229 fn recv_limited(
230 &self,
231 limits: RecvLimits,
232 ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send {
233 self.recv(limits.max_records)
234 }
235
236 /// Commit/acknowledge processed messages.
237 ///
238 /// - Kafka: commits consumer offsets
239 /// - gRPC: no-op (no persistence)
240 /// - Redis: XACK
241 /// - File: advances read position
242 /// - Memory: advances internal sequence
243 fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
244}
245
246/// Combined transport -- implements both send and receive.
247///
248/// Convenience trait for transports that support bidirectional communication.
249/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
250/// implement this. Automatically implemented via blanket impl.
251pub trait Transport: TransportSender + TransportReceiver {}
252
253/// Blanket impl: anything that implements both traits is a Transport.
254impl<T: TransportSender + TransportReceiver> Transport for T {}
255
256/// Load a transport config from the cascade under a fixed key.
257///
258/// Consolidates the byte-identical `from_cascade()` bodies that each transport
259/// config previously repeated (try the cascade under a key, register the
260/// section, fall back to `Default`). Implementors only name their key; the
261/// loading logic lives here once. Without the `config` feature the default
262/// method just returns `Default::default()`.
263pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
264 /// Load `Self` from the config cascade under `key`, registering the section
265 /// in the global registry; falls back to `Default` if the cascade is
266 /// unavailable or the key is absent/invalid.
267 #[must_use]
268 fn from_cascade_key(key: &str) -> Self {
269 #[cfg(feature = "config")]
270 {
271 if let Some(cfg) = crate::config::try_get()
272 && let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
273 {
274 return value;
275 }
276 }
277 // Without `config`, or on any cascade miss, use defaults.
278 #[cfg(not(feature = "config"))]
279 let _ = key;
280 Self::default()
281 }
282}