Skip to main content

hyperi_rustlib/transport/
traits.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/traits.rs
3// Purpose:   Transport trait definitions (sender + receiver split)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use super::work_batch::{Record, WorkBatch};
13use std::fmt::{Debug, Display};
14use std::future::Future;
15
16/// Transport-specific token for commit/acknowledgment.
17///
18/// Each transport implementation provides its own token type that
19/// captures the information needed to acknowledge message processing.
20pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
21    /// Get a string representation for logging/debugging.
22    fn as_str(&self) -> String {
23        format!("{self}")
24    }
25}
26
27/// Result of a [`TransportReceiver::recv`] call.
28///
29/// Carries the messages that passed inbound filtering AND any entries those
30/// filters routed to DLQ, so a caller cannot accidentally lose dead-letters by
31/// forgetting a separate drain step (the previous two-call
32/// `recv()` + `take_filtered_dlq_entries()` contract). The caller routes
33/// `dlq_entries` onward via its own DLQ handle.
34#[derive(Debug)]
35pub struct RecvBatch<T: CommitToken> {
36    /// Messages that passed all inbound filters (or had no filter match).
37    pub messages: Vec<Message<T>>,
38    /// Entries matched by `action: dlq` inbound filters. Caller routes to DLQ.
39    pub dlq_entries: Vec<FilteredDlqEntry>,
40}
41
42impl<T: CommitToken> RecvBatch<T> {
43    /// An empty batch (no messages, no DLQ entries).
44    #[must_use]
45    pub fn empty() -> Self {
46        Self {
47            messages: Vec::new(),
48            dlq_entries: Vec::new(),
49        }
50    }
51
52    /// A batch of messages with no DLQ entries (e.g. filters disabled).
53    #[must_use]
54    pub fn from_messages(messages: Vec<Message<T>>) -> Self {
55        Self {
56            messages,
57            dlq_entries: Vec::new(),
58        }
59    }
60
61    /// Whether the batch has no messages (DLQ entries may still be present).
62    #[must_use]
63    pub fn is_empty(&self) -> bool {
64        self.messages.is_empty()
65    }
66
67    /// Number of passing messages.
68    #[must_use]
69    pub fn len(&self) -> usize {
70        self.messages.len()
71    }
72}
73
74/// Common transport operations shared by senders and receivers.
75///
76/// Every transport implementation provides these lifecycle and
77/// introspection methods regardless of direction.
78pub trait TransportBase: Send + Sync {
79    /// Shutdown the transport gracefully.
80    fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
81
82    /// Check if the transport is healthy and connected.
83    fn is_healthy(&self) -> bool;
84
85    /// Get transport name for logging/metrics.
86    fn name(&self) -> &'static str;
87}
88
89/// Send-side transport.
90///
91/// Extends `TransportBase` with send capability. The factory returns
92/// `AnySender` (enum dispatch) for runtime transport selection.
93///
94/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
95/// when a `MetricsManager` recorder is installed.
96pub trait TransportSender: TransportBase {
97    /// Send raw bytes to a key/destination.
98    ///
99    /// The `key` semantics depend on the transport:
100    /// - Kafka: topic name
101    /// - gRPC: metadata routing key
102    /// - HTTP: URL path suffix or ignored
103    /// - File: filename suffix or ignored
104    /// - Redis: stream name
105    /// - Pipe: ignored (single stdout)
106    fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
107
108    /// Send a whole block of [`Record`]s in one shot.
109    ///
110    /// The default sends each record individually via [`send`](Self::send),
111    /// using the record's own `key` (empty string when `None`) and its payload
112    /// `Bytes` (a refcount bump, not a copy). Transports with a native batch RPC
113    /// (e.g. gRPC's `RouteBatch`) override this to send the whole block in a
114    /// single call -- serde-less and round-trip-cheaper.
115    ///
116    /// Commit tokens and inline-DLQ entries are NOT sent: they are the SENDER's
117    /// local concern. Pass the records (e.g. `&workbatch.records`) and fire the
118    /// commit tokens locally after this returns [`SendResult::Ok`].
119    ///
120    /// ## At-least-once caveat -- per-record fallback can partially send
121    ///
122    /// The default is NOT atomic. If record `k` of `n` returns a non-`Ok`
123    /// result, records `0..k` are already on the wire and this returns that
124    /// first non-`Ok` result WITHOUT unsending them. The caller treats the whole
125    /// block as not-yet-committed and retries it; the already-sent prefix is
126    /// re-delivered (at-least-once -- duplicates, never loss). A `Backpressured`
127    /// or `Fatal` short-circuits (no further records are attempted) so the caller
128    /// retries the remainder rather than skipping past a transient failure. A
129    /// native batch override (gRPC) avoids the partial-send window entirely: the
130    /// whole block is one RPC, accepted or not.
131    fn send_batch(&self, records: &[Record]) -> impl Future<Output = SendResult> + Send {
132        async move {
133            for record in records {
134                let key = record.key.as_deref().unwrap_or("");
135                let result = self.send(key, record.payload.clone()).await;
136                if !result.is_ok() {
137                    // Backpressured / Fatal / FilteredDlq -- stop here so the
138                    // caller retries the (unconfirmed) remainder of the block.
139                    return result;
140                }
141            }
142            SendResult::Ok
143        }
144    }
145}
146
147/// Limits for a single byte-aware [`TransportReceiver::recv_limited`] poll.
148///
149/// A bare [`recv`](TransportReceiver::recv) takes only a RECORD cap, so a single
150/// poll can build a [`WorkBatch`] whose total bytes are arbitrarily larger than
151/// any memory budget (and, for the Kafka recv-arena, allocate one arena for the
152/// whole poll). `RecvLimits` adds the missing BYTE bound: the governed driver
153/// passes the self-regulation byte budget here so a single governed recv never
154/// retains more than `max_bytes` (plus the one-oversized-record floor) of
155/// inbound payload before the sub-block split runs.
156///
157/// `max_records` is the existing poll-safety record cap (a tiny-record flood
158/// cannot blow the count even within the byte budget). `max_bytes` is the new
159/// payload-bytes ceiling for the poll.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub struct RecvLimits {
162    /// Hard cap on the number of records returned by one poll (`>= 1`).
163    pub max_records: usize,
164    /// Soft cap on the SUM of `payload.len()` returned by one poll. A transport
165    /// that has accumulated at least one record stops draining once the
166    /// accumulated payload bytes reach this value, so the poll never retains
167    /// more than `max_bytes + one oversized record`. A transport with no
168    /// byte-aware drain (the default impl) ignores this and bounds by
169    /// `max_records` only.
170    pub max_bytes: u64,
171}
172
173/// Receive-side transport -- generic over commit token type.
174///
175/// Extends `TransportBase` with receive and commit capability.
176/// Input stages (receiver, fetcher) use concrete implementations
177/// directly for type-safe token handling.
178pub trait TransportReceiver: TransportBase {
179    /// The token type for this transport.
180    type Token: CommitToken;
181
182    /// Receive up to `max` records as one [`WorkBatch`].
183    ///
184    /// Returns immediately with available records (may be fewer than `max`).
185    /// Returns an empty batch if no records are available. The source acks for
186    /// the whole block live on [`WorkBatch::commit_tokens`] -- they are decoupled
187    /// from `records.len()` so a downstream fan-out cannot disturb them.
188    ///
189    /// **Filter behaviour:** if the transport has inbound filters configured,
190    /// `recv()` removes records matching `action: drop` filters and carries
191    /// records matching `action: dlq` filters in [`WorkBatch`]`.dlq_entries`
192    /// alongside the passing [`WorkBatch`]`.records`. Route the DLQ entries via
193    /// your own DLQ handle -- they cannot be silently lost.
194    ///
195    /// # Example
196    ///
197    /// ```rust,ignore
198    /// let batch = transport.recv(100).await?;
199    /// for entry in batch.dlq_entries {
200    ///     dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
201    /// }
202    /// for record in batch.records { /* process */ }
203    /// ```
204    fn recv(
205        &self,
206        max: usize,
207    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
208
209    /// Byte-aware receive: bound a single poll by BOTH a record cap and a
210    /// payload-byte cap (see [`RecvLimits`]).
211    ///
212    /// This is the receive limit the governed driver uses so the
213    /// self-regulation byte budget actually constrains RECEIVE memory -- not
214    /// just the post-recv sub-block lease. A single poll retains at most
215    /// `limits.max_bytes` of payload (plus one oversized record under the
216    /// floor), so the in-flight inbound footprint is bounded BEFORE the
217    /// sub-block split, never after.
218    ///
219    /// **Default impl:** falls back to [`recv`](Self::recv)`(limits.max_records)`
220    /// -- record-bounded only, byte cap ignored. This keeps every transport
221    /// green with no churn; only transports that buffer a whole poll's bytes in
222    /// one allocation (Kafka's recv-arena) override it to honour `max_bytes`.
223    /// Channel/stream transports (Memory, gRPC, ...) already retain only one
224    /// record's bytes at a time, so the record-bounded fallback is correct for
225    /// them.
226    ///
227    /// Filter behaviour and the `commit_tokens` contract are identical to
228    /// [`recv`](Self::recv).
229    fn recv_limited(
230        &self,
231        limits: RecvLimits,
232    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send {
233        self.recv(limits.max_records)
234    }
235
236    /// Commit/acknowledge processed messages.
237    ///
238    /// - Kafka: commits consumer offsets
239    /// - gRPC: no-op (no persistence)
240    /// - Redis: XACK
241    /// - File: advances read position
242    /// - Memory: advances internal sequence
243    fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
244}
245
246/// Combined transport -- implements both send and receive.
247///
248/// Convenience trait for transports that support bidirectional communication.
249/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
250/// implement this. Automatically implemented via blanket impl.
251pub trait Transport: TransportSender + TransportReceiver {}
252
253/// Blanket impl: anything that implements both traits is a Transport.
254impl<T: TransportSender + TransportReceiver> Transport for T {}
255
256/// Load a transport config from the cascade under a fixed key.
257///
258/// Consolidates the byte-identical `from_cascade()` bodies that each transport
259/// config previously repeated (try the cascade under a key, register the
260/// section, fall back to `Default`). Implementors only name their key; the
261/// loading logic lives here once. Without the `config` feature the default
262/// method just returns `Default::default()`.
263pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
264    /// Load `Self` from the config cascade under `key`, registering the section
265    /// in the global registry; falls back to `Default` if the cascade is
266    /// unavailable or the key is absent/invalid.
267    #[must_use]
268    fn from_cascade_key(key: &str) -> Self {
269        #[cfg(feature = "config")]
270        {
271            if let Some(cfg) = crate::config::try_get()
272                && let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
273            {
274                return value;
275            }
276        }
277        // Without `config`, or on any cascade miss, use defaults.
278        #[cfg(not(feature = "config"))]
279        let _ = key;
280        Self::default()
281    }
282}