hyperi_rustlib/transport/traits.rs
1// Project: hyperi-rustlib
2// File: src/transport/traits.rs
3// Purpose: Transport trait definitions (sender + receiver split)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use super::work_batch::{Record, WorkBatch};
13use std::fmt::{Debug, Display};
14use std::future::Future;
15
16/// Transport-specific token for commit/acknowledgment.
17///
18/// Each transport provides its own token type capturing what it needs to
19/// acknowledge message processing.
20pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
21 /// Get a string representation for logging/debugging.
22 fn as_str(&self) -> String {
23 format!("{self}")
24 }
25}
26
27/// Result of a [`TransportReceiver::recv`] call.
28///
29/// Carries passing messages AND any filter-routed DLQ entries in one struct, so
30/// a caller cannot lose dead-letters by forgetting a separate drain step. The
31/// caller routes `dlq_entries` onward via its own DLQ handle.
32#[derive(Debug)]
33pub struct RecvBatch<T: CommitToken> {
34 /// Messages that passed all inbound filters (or had no filter match).
35 pub messages: Vec<Message<T>>,
36 /// Entries matched by `action: dlq` inbound filters. Caller routes to DLQ.
37 pub dlq_entries: Vec<FilteredDlqEntry>,
38 /// Commit tokens of messages removed by inbound `drop`/`dlq` filters.
39 ///
40 /// Handled records that produced no passing message. Carried into
41 /// `WorkBatch.commit_tokens` so the block commit advances the source past
42 /// them -- otherwise an all-filtered stretch stalls the Kafka offset /
43 /// leaks the Redis PEL.
44 pub filtered_tokens: Vec<T>,
45}
46
47impl<T: CommitToken> RecvBatch<T> {
48 /// An empty batch (no messages, no DLQ entries).
49 #[must_use]
50 pub fn empty() -> Self {
51 Self {
52 messages: Vec::new(),
53 dlq_entries: Vec::new(),
54 filtered_tokens: Vec::new(),
55 }
56 }
57
58 /// A batch of messages with no DLQ entries (e.g. filters disabled).
59 #[must_use]
60 pub fn from_messages(messages: Vec<Message<T>>) -> Self {
61 Self {
62 messages,
63 dlq_entries: Vec::new(),
64 filtered_tokens: Vec::new(),
65 }
66 }
67
68 /// Whether the batch has no messages AND no filtered-only acks to commit.
69 /// (DLQ entries may still be present alongside passing messages.)
70 #[must_use]
71 pub fn is_empty(&self) -> bool {
72 self.messages.is_empty() && self.filtered_tokens.is_empty()
73 }
74
75 /// Number of passing messages.
76 #[must_use]
77 pub fn len(&self) -> usize {
78 self.messages.len()
79 }
80}
81
82/// Lifecycle and introspection methods shared by senders and receivers.
83pub trait TransportBase: Send + Sync {
84 /// Shutdown the transport gracefully.
85 fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
86
87 /// Check if the transport is healthy and connected.
88 fn is_healthy(&self) -> bool;
89
90 /// Get transport name for logging/metrics.
91 fn name(&self) -> &'static str;
92}
93
94/// Send-side transport.
95///
96/// The factory returns `AnySender` (enum dispatch) for runtime selection. All
97/// implementations auto-emit `dfe_transport_*` metrics when a `MetricsManager`
98/// recorder is installed.
99pub trait TransportSender: TransportBase {
100 /// Send raw bytes to a key/destination.
101 ///
102 /// The `key` semantics depend on the transport:
103 /// - Kafka: topic name
104 /// - gRPC: metadata routing key
105 /// - HTTP: URL path suffix or ignored
106 /// - File: filename suffix or ignored
107 /// - Redis: stream name
108 /// - Pipe: ignored (single stdout)
109 fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
110
111 /// Send a whole block of [`Record`]s in one shot.
112 ///
113 /// The default sends each record individually via [`send`](Self::send),
114 /// using the record's own `key` (empty when `None`) and payload (a refcount
115 /// bump, not a copy). Transports with a native batch RPC (e.g. gRPC's
116 /// `RouteBatch`) override this. Commit tokens and inline-DLQ entries are NOT
117 /// sent -- they are the SENDER's local concern; fire the commit tokens
118 /// locally after this returns [`SendResult::Ok`].
119 ///
120 /// ## At-least-once caveat -- per-record fallback can partially send
121 ///
122 /// Not atomic. If record `k` of `n` returns a transient non-`Ok`
123 /// (`Backpressured`/`Fatal`), records `0..k` are already on the wire and
124 /// this returns without unsending them. The caller retries the whole block,
125 /// re-delivering the sent prefix (at-least-once -- duplicates, never loss).
126 /// A native batch override (gRPC) sends the whole block as one RPC, avoiding
127 /// the partial-send window.
128 ///
129 /// ## Outbound-filter dispositions do NOT abort the batch
130 ///
131 /// A per-record `FilteredDlq` is the record being HANDLED, not a send
132 /// failure: skip it and continue. Returning it would make the caller retry
133 /// the whole block forever -- the deterministic filter re-matches the same
134 /// record every time (a livelock that stalls the source). `Drop` records
135 /// likewise never reach the wire. Only `Backpressured`/`Fatal`
136 /// short-circuit.
137 fn send_batch(&self, records: &[Record]) -> impl Future<Output = SendResult> + Send {
138 async move {
139 for record in records {
140 let key = record.key.as_deref().unwrap_or("");
141 match self.send(key, record.payload.clone()).await {
142 // Sent, dropped (Ok), or suppressed by an outbound dlq
143 // filter -- all handled; keep going, do NOT abort the block.
144 SendResult::Ok | SendResult::FilteredDlq => {}
145 // Transient/fatal transport failure: stop so the caller
146 // retries the unconfirmed remainder of the block.
147 other @ (SendResult::Backpressured | SendResult::Fatal(_)) => return other,
148 }
149 }
150 SendResult::Ok
151 }
152 }
153}
154
155/// Limits for a single byte-aware [`TransportReceiver::recv_limited`] poll.
156///
157/// A bare [`recv`](TransportReceiver::recv) takes only a RECORD cap, so one poll
158/// can build a [`WorkBatch`] arbitrarily larger than any memory budget.
159/// `RecvLimits` adds the BYTE bound: the governed driver passes its
160/// self-regulation byte budget here so a single recv never retains more than
161/// `max_bytes` (plus the one-oversized-record floor) before the sub-block split.
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct RecvLimits {
164 /// Hard cap on records per poll (`>= 1`). Bounds a tiny-record flood that
165 /// stays within the byte budget.
166 pub max_records: usize,
167 /// Soft cap on the SUM of `payload.len()` per poll. A transport that has
168 /// accumulated at least one record stops draining once payload bytes reach
169 /// this value (so it retains at most `max_bytes + one oversized record`).
170 /// The default impl has no byte-aware drain and bounds by `max_records`
171 /// only.
172 pub max_bytes: u64,
173}
174
175/// Receive-side transport -- generic over commit token type.
176///
177/// Input stages (receiver, fetcher) use concrete implementations directly for
178/// type-safe token handling.
179pub trait TransportReceiver: TransportBase {
180 /// The token type for this transport.
181 type Token: CommitToken;
182
183 /// Receive up to `max` records as one [`WorkBatch`].
184 ///
185 /// Returns immediately with available records (may be fewer than `max`).
186 /// Returns an empty batch if no records are available. The source acks for
187 /// the whole block live on [`WorkBatch::commit_tokens`] -- they are decoupled
188 /// from `records.len()` so a downstream fan-out cannot disturb them.
189 ///
190 /// **Filter behaviour:** if the transport has inbound filters configured,
191 /// `recv()` removes records matching `action: drop` filters and carries
192 /// records matching `action: dlq` filters in [`WorkBatch`]`.dlq_entries`
193 /// alongside the passing [`WorkBatch`]`.records`. Route the DLQ entries via
194 /// your own DLQ handle -- they cannot be silently lost.
195 ///
196 /// # Example
197 ///
198 /// ```rust,ignore
199 /// let batch = transport.recv(100).await?;
200 /// for entry in batch.dlq_entries {
201 /// dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
202 /// }
203 /// for record in batch.records { /* process */ }
204 /// ```
205 ///
206 /// # Cancel-safety (REQUIRED of implementors)
207 ///
208 /// The governed run loop polls `recv()` inside a `tokio::select!` and DROPS
209 /// the future when shutdown or a ticker wins, so it must not leave records
210 /// half-consumed at an `.await` -- either gather records synchronously (no
211 /// `.await` between taking a record off the wire and returning it) or buffer
212 /// internally. The in-tree Kafka (synchronous poll) and memory (awaits only
213 /// on an empty buffer) impls satisfy this; a custom impl that holds records
214 /// across an `.await` will drop data on cancellation.
215 fn recv(
216 &self,
217 max: usize,
218 ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
219
220 /// Byte-aware receive: bound a single poll by BOTH a record cap and a
221 /// payload-byte cap (see [`RecvLimits`]).
222 ///
223 /// The governed driver uses this so the self-regulation byte budget bounds
224 /// RECEIVE memory, not just the post-recv sub-block lease: a poll retains at
225 /// most `limits.max_bytes` of payload (plus one oversized record), so the
226 /// inbound footprint is bounded BEFORE the sub-block split, never after.
227 ///
228 /// **Default impl:** falls back to [`recv`](Self::recv)`(limits.max_records)`
229 /// -- record-bounded only, byte cap ignored. Only transports that buffer a
230 /// whole poll's bytes in one allocation (Kafka's recv-arena) override it.
231 /// Channel/stream transports (Memory, gRPC, ...) already retain only one
232 /// record's bytes at a time, so the fallback is correct for them.
233 ///
234 /// Filter behaviour and the `commit_tokens` contract match
235 /// [`recv`](Self::recv).
236 fn recv_limited(
237 &self,
238 limits: RecvLimits,
239 ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send {
240 self.recv(limits.max_records)
241 }
242
243 /// Commit/acknowledge processed messages.
244 ///
245 /// - Kafka: commits consumer offsets
246 /// - gRPC: no-op (no persistence)
247 /// - Redis: XACK
248 /// - File: advances read position
249 /// - Memory: advances internal sequence
250 fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
251}
252
253/// Combined transport -- implements both send and receive.
254///
255/// Most concrete impls (Kafka, gRPC, Memory, Redis, File, Pipe) qualify;
256/// auto-implemented via blanket impl.
257pub trait Transport: TransportSender + TransportReceiver {}
258
259/// Blanket impl: anything that implements both traits is a Transport.
260impl<T: TransportSender + TransportReceiver> Transport for T {}
261
262/// Load a transport config from the cascade under a fixed key.
263///
264/// Consolidates the byte-identical `from_cascade()` bodies each transport config
265/// used to repeat. Implementors only name their key. Without the `config`
266/// feature the default method returns `Default::default()`.
267pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
268 /// Load `Self` from the config cascade under `key`, registering the section
269 /// in the global registry; falls back to `Default` if the cascade is
270 /// unavailable or the key is absent/invalid.
271 #[must_use]
272 fn from_cascade_key(key: &str) -> Self {
273 #[cfg(feature = "config")]
274 {
275 if let Some(cfg) = crate::config::try_get()
276 && let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
277 {
278 return value;
279 }
280 }
281 // Without `config`, or on any cascade miss, use defaults.
282 #[cfg(not(feature = "config"))]
283 let _ = key;
284 Self::default()
285 }
286}