Skip to main content

hyperi_rustlib/transport/
traits.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/traits.rs
3// Purpose:   Transport trait definitions (sender + receiver split)
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use std::fmt::{Debug, Display};
13use std::future::Future;
14
15/// Transport-specific token for commit/acknowledgment.
16///
17/// Each transport implementation provides its own token type that
18/// captures the information needed to acknowledge message processing.
19pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
20    /// Get a string representation for logging/debugging.
21    fn as_str(&self) -> String {
22        format!("{self}")
23    }
24}
25
26/// Common transport operations shared by senders and receivers.
27///
28/// Every transport implementation provides these lifecycle and
29/// introspection methods regardless of direction.
30pub trait TransportBase: Send + Sync {
31    /// Shutdown the transport gracefully.
32    fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
33
34    /// Check if the transport is healthy and connected.
35    fn is_healthy(&self) -> bool;
36
37    /// Get transport name for logging/metrics.
38    fn name(&self) -> &'static str;
39}
40
41/// Send-side transport.
42///
43/// Extends `TransportBase` with send capability. The factory returns
44/// `AnySender` (enum dispatch) for runtime transport selection.
45///
46/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
47/// when a `MetricsManager` recorder is installed.
48pub trait TransportSender: TransportBase {
49    /// Send raw bytes to a key/destination.
50    ///
51    /// The `key` semantics depend on the transport:
52    /// - Kafka: topic name
53    /// - gRPC: metadata routing key
54    /// - HTTP: URL path suffix or ignored
55    /// - File: filename suffix or ignored
56    /// - Redis: stream name
57    /// - Pipe: ignored (single stdout)
58    fn send(&self, key: &str, payload: &[u8]) -> impl Future<Output = SendResult> + Send;
59}
60
61/// Receive-side transport -- generic over commit token type.
62///
63/// Extends `TransportBase` with receive and commit capability.
64/// Input stages (receiver, fetcher) use concrete implementations
65/// directly for type-safe token handling.
66pub trait TransportReceiver: TransportBase {
67    /// The token type for this transport.
68    type Token: CommitToken;
69
70    /// Receive up to `max` messages.
71    ///
72    /// Returns immediately with available messages (may be fewer than `max`).
73    /// Returns empty vec if no messages are available.
74    ///
75    /// **Filter behaviour:** if the transport has inbound filters configured,
76    /// `recv()` removes messages that match `action: drop` filters and stages
77    /// messages matching `action: dlq` filters into an internal queue. Use
78    /// [`take_filtered_dlq_entries`](Self::take_filtered_dlq_entries) after
79    /// each `recv()` call to retrieve the staged DLQ entries and route them
80    /// via your DLQ handle.
81    fn recv(
82        &self,
83        max: usize,
84    ) -> impl Future<Output = TransportResult<Vec<Message<Self::Token>>>> + Send;
85
86    /// Commit/acknowledge processed messages.
87    ///
88    /// - Kafka: commits consumer offsets
89    /// - gRPC: no-op (no persistence)
90    /// - Redis: XACK
91    /// - File: advances read position
92    /// - Memory: advances internal sequence
93    fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
94
95    /// Drain DLQ entries staged by inbound filtering.
96    ///
97    /// When a transport's inbound filters classify messages as `action: dlq`,
98    /// the messages are removed from the `recv()` result and staged in an
99    /// internal queue. Call this method after each `recv()` to drain the
100    /// staged entries and route them to your DLQ.
101    ///
102    /// Default implementation returns an empty vec -- transports without
103    /// filter support don't need to override this.
104    ///
105    /// # Example
106    ///
107    /// ```rust,ignore
108    /// let messages = transport.recv(100).await?;
109    /// for entry in transport.take_filtered_dlq_entries() {
110    ///     dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
111    /// }
112    /// // Process passing messages...
113    /// ```
114    fn take_filtered_dlq_entries(&self) -> Vec<FilteredDlqEntry> {
115        Vec::new()
116    }
117}
118
119/// Combined transport -- implements both send and receive.
120///
121/// Convenience trait for transports that support bidirectional communication.
122/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
123/// implement this. Automatically implemented via blanket impl.
124pub trait Transport: TransportSender + TransportReceiver {}
125
126/// Blanket impl: anything that implements both traits is a Transport.
127impl<T: TransportSender + TransportReceiver> Transport for T {}