hyperi_rustlib/transport/traits.rs
1// Project: hyperi-rustlib
2// File: src/transport/traits.rs
3// Purpose: Transport trait definitions (sender + receiver split)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use super::error::TransportResult;
10use super::filter::FilteredDlqEntry;
11use super::types::{Message, SendResult};
12use std::fmt::{Debug, Display};
13use std::future::Future;
14
15/// Transport-specific token for commit/acknowledgment.
16///
17/// Each transport implementation provides its own token type that
18/// captures the information needed to acknowledge message processing.
19pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
20 /// Get a string representation for logging/debugging.
21 fn as_str(&self) -> String {
22 format!("{self}")
23 }
24}
25
26/// Common transport operations shared by senders and receivers.
27///
28/// Every transport implementation provides these lifecycle and
29/// introspection methods regardless of direction.
30pub trait TransportBase: Send + Sync {
31 /// Shutdown the transport gracefully.
32 fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
33
34 /// Check if the transport is healthy and connected.
35 fn is_healthy(&self) -> bool;
36
37 /// Get transport name for logging/metrics.
38 fn name(&self) -> &'static str;
39}
40
41/// Send-side transport.
42///
43/// Extends `TransportBase` with send capability. The factory returns
44/// `AnySender` (enum dispatch) for runtime transport selection.
45///
46/// All implementations auto-emit `dfe_transport_*` Prometheus metrics
47/// when a `MetricsManager` recorder is installed.
48pub trait TransportSender: TransportBase {
49 /// Send raw bytes to a key/destination.
50 ///
51 /// The `key` semantics depend on the transport:
52 /// - Kafka: topic name
53 /// - gRPC: metadata routing key
54 /// - HTTP: URL path suffix or ignored
55 /// - File: filename suffix or ignored
56 /// - Redis: stream name
57 /// - Pipe: ignored (single stdout)
58 fn send(&self, key: &str, payload: &[u8]) -> impl Future<Output = SendResult> + Send;
59}
60
61/// Receive-side transport -- generic over commit token type.
62///
63/// Extends `TransportBase` with receive and commit capability.
64/// Input stages (receiver, fetcher) use concrete implementations
65/// directly for type-safe token handling.
66pub trait TransportReceiver: TransportBase {
67 /// The token type for this transport.
68 type Token: CommitToken;
69
70 /// Receive up to `max` messages.
71 ///
72 /// Returns immediately with available messages (may be fewer than `max`).
73 /// Returns empty vec if no messages are available.
74 ///
75 /// **Filter behaviour:** if the transport has inbound filters configured,
76 /// `recv()` removes messages that match `action: drop` filters and stages
77 /// messages matching `action: dlq` filters into an internal queue. Use
78 /// [`take_filtered_dlq_entries`](Self::take_filtered_dlq_entries) after
79 /// each `recv()` call to retrieve the staged DLQ entries and route them
80 /// via your DLQ handle.
81 fn recv(
82 &self,
83 max: usize,
84 ) -> impl Future<Output = TransportResult<Vec<Message<Self::Token>>>> + Send;
85
86 /// Commit/acknowledge processed messages.
87 ///
88 /// - Kafka: commits consumer offsets
89 /// - gRPC: no-op (no persistence)
90 /// - Redis: XACK
91 /// - File: advances read position
92 /// - Memory: advances internal sequence
93 fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
94
95 /// Drain DLQ entries staged by inbound filtering.
96 ///
97 /// When a transport's inbound filters classify messages as `action: dlq`,
98 /// the messages are removed from the `recv()` result and staged in an
99 /// internal queue. Call this method after each `recv()` to drain the
100 /// staged entries and route them to your DLQ.
101 ///
102 /// Default implementation returns an empty vec -- transports without
103 /// filter support don't need to override this.
104 ///
105 /// # Example
106 ///
107 /// ```rust,ignore
108 /// let messages = transport.recv(100).await?;
109 /// for entry in transport.take_filtered_dlq_entries() {
110 /// dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
111 /// }
112 /// // Process passing messages...
113 /// ```
114 fn take_filtered_dlq_entries(&self) -> Vec<FilteredDlqEntry> {
115 Vec::new()
116 }
117}
118
119/// Combined transport -- implements both send and receive.
120///
121/// Convenience trait for transports that support bidirectional communication.
122/// Most concrete implementations (Kafka, gRPC, Memory, Redis, File, Pipe)
123/// implement this. Automatically implemented via blanket impl.
124pub trait Transport: TransportSender + TransportReceiver {}
125
126/// Blanket impl: anything that implements both traits is a Transport.
127impl<T: TransportSender + TransportReceiver> Transport for T {}