Skip to main content

conduit_core/
channel.rs

1//! Unified channel buffer that wraps either a lossy or ordered strategy.
2//!
3//! [`ChannelBuffer`] is an enum over [`crate::RingBuffer`] (lossy) and
4//! [`crate::Queue`] (guaranteed delivery). It provides a single
5//! API surface so that the plugin layer can treat all channels uniformly.
6
7use crate::error::Error;
8use crate::queue::Queue;
9use crate::ringbuf::{PushOutcome, RingBuffer};
10
11// ---------------------------------------------------------------------------
12// ChannelBuffer
13// ---------------------------------------------------------------------------
14
15/// A channel buffer that is either lossy ([`RingBuffer`]) or reliable
16/// ([`Queue`]).
17///
18/// The [`push`](Self::push) method returns `Ok(usize)` — the number of
19/// older frames evicted (always `0` for reliable channels). Use
20/// [`push_checked`](Self::push_checked) for a richer [`PushOutcome`].
21pub enum ChannelBuffer {
22    /// Lossy mode: oldest frames are silently dropped when the byte budget is
23    /// exceeded.
24    Lossy(RingBuffer),
25    /// Reliable mode: pushes are rejected with
26    /// [`Error::ChannelFull`](crate::Error::ChannelFull) when the byte budget
27    /// is exceeded.
28    Reliable(Queue),
29}
30
31impl ChannelBuffer {
32    /// Push a frame into the channel.
33    ///
34    /// For lossy channels, returns `Ok(usize)` — the number of older frames
35    /// evicted to make room (or `0` if the frame was too large and silently
36    /// discarded). For reliable channels, returns `Ok(0)` on success or
37    /// [`Err(Error::ChannelFull)`](crate::Error::ChannelFull) if the byte
38    /// budget would be exceeded.
39    pub fn push(&self, frame: &[u8]) -> Result<usize, Error> {
40        match self {
41            Self::Lossy(rb) => Ok(rb.push(frame)),
42            Self::Reliable(q) => q.push(frame).map(|()| 0),
43        }
44    }
45
46    /// Push a frame with a richer outcome report.
47    ///
48    /// Like [`push`](Self::push), but returns [`PushOutcome`] for lossy
49    /// channels, distinguishing between accepted frames and frames that were
50    /// too large to ever fit.
51    pub fn push_checked(&self, frame: &[u8]) -> Result<PushOutcome, Error> {
52        match self {
53            Self::Lossy(rb) => Ok(rb.push_checked(frame)),
54            Self::Reliable(q) => q.push(frame).map(|()| PushOutcome::Accepted(0)),
55        }
56    }
57
58    /// Drain all buffered frames into a single binary blob and clear the
59    /// buffer.
60    ///
61    /// The wire format is identical for both variants — see
62    /// [`RingBuffer::drain_all`] for details.
63    #[must_use]
64    pub fn drain_all(&self) -> Vec<u8> {
65        match self {
66            Self::Lossy(rb) => rb.drain_all(),
67            Self::Reliable(q) => q.drain_all(),
68        }
69    }
70
71    /// Read one frame from the front of the buffer (FIFO).
72    ///
73    /// Returns `None` if the buffer is empty.
74    #[must_use]
75    pub fn try_pop(&self) -> Option<Vec<u8>> {
76        match self {
77            Self::Lossy(rb) => rb.try_pop(),
78            Self::Reliable(q) => q.try_pop(),
79        }
80    }
81
82    /// Number of frames currently buffered.
83    #[must_use]
84    pub fn frame_count(&self) -> usize {
85        match self {
86            Self::Lossy(rb) => rb.frame_count(),
87            Self::Reliable(q) => q.frame_count(),
88        }
89    }
90
91    /// Number of bytes currently used (including per-frame length prefixes).
92    #[must_use]
93    pub fn bytes_used(&self) -> usize {
94        match self {
95            Self::Lossy(rb) => rb.bytes_used(),
96            Self::Reliable(q) => q.bytes_used(),
97        }
98    }
99
100    /// Clear all buffered frames.
101    pub fn clear(&self) {
102        match self {
103            Self::Lossy(rb) => rb.clear(),
104            Self::Reliable(q) => q.clear(),
105        }
106    }
107
108    /// Returns `true` if this channel uses reliable (guaranteed-delivery)
109    /// buffering.
110    #[must_use]
111    pub fn is_ordered(&self) -> bool {
112        matches!(self, Self::Reliable(_))
113    }
114}
115
116impl std::fmt::Debug for ChannelBuffer {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            Self::Lossy(rb) => f.debug_tuple("ChannelBuffer::Lossy").field(rb).finish(),
120            Self::Reliable(q) => f.debug_tuple("ChannelBuffer::Reliable").field(q).finish(),
121        }
122    }
123}
124
125// ---------------------------------------------------------------------------
126// Tests
127// ---------------------------------------------------------------------------
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn lossy_delegates_correctly() {
135        let cb = ChannelBuffer::Lossy(RingBuffer::new(1024));
136
137        let dropped = cb.push(b"alpha").unwrap();
138        assert_eq!(dropped, 0);
139
140        cb.push(b"beta").unwrap();
141        assert_eq!(cb.frame_count(), 2);
142        assert_eq!(cb.try_pop().unwrap(), b"alpha");
143        assert_eq!(cb.try_pop().unwrap(), b"beta");
144        assert!(cb.try_pop().is_none());
145        assert!(!cb.is_ordered());
146    }
147
148    #[test]
149    fn reliable_delegates_correctly() {
150        let cb = ChannelBuffer::Reliable(Queue::new(1024));
151
152        let dropped = cb.push(b"alpha").unwrap();
153        assert_eq!(dropped, 0);
154
155        cb.push(b"beta").unwrap();
156        assert_eq!(cb.frame_count(), 2);
157        assert_eq!(cb.try_pop().unwrap(), b"alpha");
158        assert_eq!(cb.try_pop().unwrap(), b"beta");
159        assert!(cb.try_pop().is_none());
160        assert!(cb.is_ordered());
161    }
162
163    #[test]
164    fn push_lossy_never_errors() {
165        // Capacity for exactly 1 frame of 4 bytes (cost = 8).
166        let cb = ChannelBuffer::Lossy(RingBuffer::new(8));
167
168        // First push fills the buffer.
169        assert_eq!(cb.push(b"aaaa").unwrap(), 0);
170
171        // Second push evicts the first — but never errors.
172        assert_eq!(cb.push(b"bbbb").unwrap(), 1);
173
174        assert_eq!(cb.frame_count(), 1);
175        assert_eq!(cb.try_pop().unwrap(), b"bbbb");
176    }
177
178    #[test]
179    fn push_reliable_errors_when_full() {
180        // Capacity for exactly 2 frames of 4 bytes (cost = 16).
181        let cb = ChannelBuffer::Reliable(Queue::new(16));
182
183        cb.push(b"aaaa").unwrap(); // cost 8
184        cb.push(b"bbbb").unwrap(); // cost 8, total 16
185
186        // Third push should fail.
187        let err = cb.push(b"cccc").unwrap_err();
188        assert!(matches!(err, Error::ChannelFull));
189
190        // Original frames still intact.
191        assert_eq!(cb.frame_count(), 2);
192    }
193
194    #[test]
195    fn push_checked_lossy() {
196        let cb = ChannelBuffer::Lossy(RingBuffer::new(8));
197        assert_eq!(cb.push_checked(b"aaaa").unwrap(), PushOutcome::Accepted(0));
198        assert_eq!(cb.push_checked(b"bbbb").unwrap(), PushOutcome::Accepted(1));
199
200        // Too large for buffer (cost 104 > 8).
201        assert_eq!(cb.push_checked(&[0u8; 100]).unwrap(), PushOutcome::TooLarge);
202    }
203
204    #[test]
205    fn push_checked_reliable() {
206        let cb = ChannelBuffer::Reliable(Queue::new(16));
207        assert_eq!(cb.push_checked(b"aaaa").unwrap(), PushOutcome::Accepted(0));
208        assert_eq!(cb.push_checked(b"bbbb").unwrap(), PushOutcome::Accepted(0));
209        let err = cb.push_checked(b"cccc").unwrap_err();
210        assert!(matches!(err, Error::ChannelFull));
211    }
212
213    #[test]
214    fn drain_format_identical() {
215        let lossy = ChannelBuffer::Lossy(RingBuffer::new(1024));
216        let reliable = ChannelBuffer::Reliable(Queue::new(1024));
217
218        // Push identical data to both.
219        let frames: &[&[u8]] = &[b"hello", b"world", b"test"];
220        for frame in frames {
221            lossy.push(frame).unwrap();
222            reliable.push(frame).unwrap();
223        }
224
225        let lossy_blob = lossy.drain_all();
226        let reliable_blob = reliable.drain_all();
227
228        assert_eq!(lossy_blob, reliable_blob);
229        assert!(!lossy_blob.is_empty());
230    }
231
232    #[test]
233    fn clear_delegates() {
234        let cb = ChannelBuffer::Reliable(Queue::new(1024));
235        cb.push(b"one").unwrap();
236        cb.push(b"two").unwrap();
237        assert_eq!(cb.frame_count(), 2);
238
239        cb.clear();
240        assert_eq!(cb.frame_count(), 0);
241        assert_eq!(cb.bytes_used(), 0);
242    }
243
244    #[test]
245    fn bytes_used_delegates() {
246        let cb = ChannelBuffer::Lossy(RingBuffer::new(1024));
247        assert_eq!(cb.bytes_used(), 0);
248
249        cb.push(b"abc").unwrap(); // cost = 4 + 3 = 7
250        assert_eq!(cb.bytes_used(), 7);
251    }
252}