Skip to main content

hyperi_rustlib/transport/
traits.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/traits.rs
3// Purpose:   Transport trait definitions (sender + receiver split)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use std::fmt::{Debug, Display};
13use std::future::Future;
14
15/// Transport-specific token for commit/acknowledgment.
16///
17/// Each transport implementation provides its own token type that
18/// captures the information needed to acknowledge message processing.
19pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
20    /// Get a string representation for logging/debugging.
21    fn as_str(&self) -> String {
22        format!("{self}")
23    }
24}
25
26/// Result of a [`TransportReceiver::recv`] call.
27///
28/// Carries the messages that passed inbound filtering AND any entries those
29/// filters routed to DLQ, so a caller cannot accidentally lose dead-letters by
30/// forgetting a separate drain step (the previous two-call
31/// `recv()` + `take_filtered_dlq_entries()` contract). The caller routes
32/// `dlq_entries` onward via its own DLQ handle.
33#[derive(Debug)]
34pub struct RecvBatch<T: CommitToken> {
35    /// Messages that passed all inbound filters (or had no filter match).
36    pub messages: Vec<Message<T>>,
37    /// Entries matched by `action: dlq` inbound filters. Caller routes to DLQ.
38    pub dlq_entries: Vec<FilteredDlqEntry>,
39}
40
41impl<T: CommitToken> RecvBatch<T> {
42    /// An empty batch (no messages, no DLQ entries).
43    #[must_use]
44    pub fn empty() -> Self {
45        Self {
46            messages: Vec::new(),
47            dlq_entries: Vec::new(),
48        }
49    }
50
51    /// A batch of messages with no DLQ entries (e.g. filters disabled).
52    #[must_use]
53    pub fn from_messages(messages: Vec<Message<T>>) -> Self {
54        Self {
55            messages,
56            dlq_entries: Vec::new(),
57        }
58    }
59
60    /// Whether the batch has no messages (DLQ entries may still be present).
61    #[must_use]
62    pub fn is_empty(&self) -> bool {
63        self.messages.is_empty()
64    }
65
66    /// Number of passing messages.
67    #[must_use]
68    pub fn len(&self) -> usize {
69        self.messages.len()
70    }
71}
72
73/// Common transport operations shared by senders and receivers.
74///
75/// Every transport implementation provides these lifecycle and
76/// introspection methods regardless of direction.
77pub trait TransportBase: Send + Sync {
78    /// Shutdown the transport gracefully.
79    fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
80
81    /// Check if the transport is healthy and connected.
82    fn is_healthy(&self) -> bool;
83
84    /// Get transport name for logging/metrics.
85    fn name(&self) -> &'static str;
86}
87
88/// Send-side transport.
89///
90/// Extends `TransportBase` with send capability. The factory returns
91/// `AnySender` (enum dispatch) for runtime transport selection.
92///
93/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
94/// when a `MetricsManager` recorder is installed.
95pub trait TransportSender: TransportBase {
96    /// Send raw bytes to a key/destination.
97    ///
98    /// The `key` semantics depend on the transport:
99    /// - Kafka: topic name
100    /// - gRPC: metadata routing key
101    /// - HTTP: URL path suffix or ignored
102    /// - File: filename suffix or ignored
103    /// - Redis: stream name
104    /// - Pipe: ignored (single stdout)
105    fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
106}
107
108/// Receive-side transport -- generic over commit token type.
109///
110/// Extends `TransportBase` with receive and commit capability.
111/// Input stages (receiver, fetcher) use concrete implementations
112/// directly for type-safe token handling.
113pub trait TransportReceiver: TransportBase {
114    /// The token type for this transport.
115    type Token: CommitToken;
116
117    /// Receive up to `max` messages.
118    ///
119    /// Returns immediately with available messages (may be fewer than `max`).
120    /// Returns an empty batch if no messages are available.
121    ///
122    /// **Filter behaviour:** if the transport has inbound filters configured,
123    /// `recv()` removes messages matching `action: drop` filters and returns
124    /// messages matching `action: dlq` filters in [`RecvBatch`]`.dlq_entries`
125    /// alongside the passing [`RecvBatch`]`.messages`. Route the DLQ entries via
126    /// your own DLQ handle -- they cannot be silently lost.
127    ///
128    /// # Example
129    ///
130    /// ```rust,ignore
131    /// let batch = transport.recv(100).await?;
132    /// for entry in batch.dlq_entries {
133    ///     dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
134    /// }
135    /// for msg in batch.messages { /* process */ }
136    /// ```
137    fn recv(
138        &self,
139        max: usize,
140    ) -> impl Future<Output = TransportResult<RecvBatch<Self::Token>>> + Send;
141
142    /// Commit/acknowledge processed messages.
143    ///
144    /// - Kafka: commits consumer offsets
145    /// - gRPC: no-op (no persistence)
146    /// - Redis: XACK
147    /// - File: advances read position
148    /// - Memory: advances internal sequence
149    fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
150}
151
152/// Combined transport -- implements both send and receive.
153///
154/// Convenience trait for transports that support bidirectional communication.
155/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
156/// implement this. Automatically implemented via blanket impl.
157pub trait Transport: TransportSender + TransportReceiver {}
158
159/// Blanket impl: anything that implements both traits is a Transport.
160impl<T: TransportSender + TransportReceiver> Transport for T {}
161
162/// Load a transport config from the cascade under a fixed key.
163///
164/// Consolidates the byte-identical `from_cascade()` bodies that each transport
165/// config previously repeated (try the cascade under a key, register the
166/// section, fall back to `Default`). Implementors only name their key; the
167/// loading logic lives here once. Without the `config` feature the default
168/// method just returns `Default::default()`.
169pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
170    /// Load `Self` from the config cascade under `key`, registering the section
171    /// in the global registry; falls back to `Default` if the cascade is
172    /// unavailable or the key is absent/invalid.
173    #[must_use]
174    fn from_cascade_key(key: &str) -> Self {
175        #[cfg(feature = "config")]
176        {
177            if let Some(cfg) = crate::config::try_get()
178                && let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
179            {
180                return value;
181            }
182        }
183        // Without `config`, or on any cascade miss, use defaults.
184        #[cfg(not(feature = "config"))]
185        let _ = key;
186        Self::default()
187    }
188}