1use crate::error::Error;
8use crate::queue::Queue;
9use crate::ringbuf::{PushOutcome, RingBuffer};
10
11pub enum ChannelBuffer {
22 Lossy(RingBuffer),
25 Reliable(Queue),
29}
30
31impl ChannelBuffer {
32 pub fn push(&self, frame: &[u8]) -> Result<usize, Error> {
40 match self {
41 Self::Lossy(rb) => Ok(rb.push(frame)),
42 Self::Reliable(q) => q.push(frame).map(|()| 0),
43 }
44 }
45
46 pub fn push_checked(&self, frame: &[u8]) -> Result<PushOutcome, Error> {
52 match self {
53 Self::Lossy(rb) => Ok(rb.push_checked(frame)),
54 Self::Reliable(q) => q.push(frame).map(|()| PushOutcome::Accepted(0)),
55 }
56 }
57
58 #[must_use]
64 pub fn drain_all(&self) -> Vec<u8> {
65 match self {
66 Self::Lossy(rb) => rb.drain_all(),
67 Self::Reliable(q) => q.drain_all(),
68 }
69 }
70
71 #[must_use]
75 pub fn try_pop(&self) -> Option<Vec<u8>> {
76 match self {
77 Self::Lossy(rb) => rb.try_pop(),
78 Self::Reliable(q) => q.try_pop(),
79 }
80 }
81
82 #[must_use]
84 pub fn frame_count(&self) -> usize {
85 match self {
86 Self::Lossy(rb) => rb.frame_count(),
87 Self::Reliable(q) => q.frame_count(),
88 }
89 }
90
91 #[must_use]
93 pub fn bytes_used(&self) -> usize {
94 match self {
95 Self::Lossy(rb) => rb.bytes_used(),
96 Self::Reliable(q) => q.bytes_used(),
97 }
98 }
99
100 pub fn clear(&self) {
102 match self {
103 Self::Lossy(rb) => rb.clear(),
104 Self::Reliable(q) => q.clear(),
105 }
106 }
107
108 #[must_use]
111 pub fn is_ordered(&self) -> bool {
112 matches!(self, Self::Reliable(_))
113 }
114}
115
116impl std::fmt::Debug for ChannelBuffer {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 Self::Lossy(rb) => f.debug_tuple("ChannelBuffer::Lossy").field(rb).finish(),
120 Self::Reliable(q) => f.debug_tuple("ChannelBuffer::Reliable").field(q).finish(),
121 }
122 }
123}
124
125#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn lossy_delegates_correctly() {
135 let cb = ChannelBuffer::Lossy(RingBuffer::new(1024));
136
137 let dropped = cb.push(b"alpha").unwrap();
138 assert_eq!(dropped, 0);
139
140 cb.push(b"beta").unwrap();
141 assert_eq!(cb.frame_count(), 2);
142 assert_eq!(cb.try_pop().unwrap(), b"alpha");
143 assert_eq!(cb.try_pop().unwrap(), b"beta");
144 assert!(cb.try_pop().is_none());
145 assert!(!cb.is_ordered());
146 }
147
148 #[test]
149 fn reliable_delegates_correctly() {
150 let cb = ChannelBuffer::Reliable(Queue::new(1024));
151
152 let dropped = cb.push(b"alpha").unwrap();
153 assert_eq!(dropped, 0);
154
155 cb.push(b"beta").unwrap();
156 assert_eq!(cb.frame_count(), 2);
157 assert_eq!(cb.try_pop().unwrap(), b"alpha");
158 assert_eq!(cb.try_pop().unwrap(), b"beta");
159 assert!(cb.try_pop().is_none());
160 assert!(cb.is_ordered());
161 }
162
163 #[test]
164 fn push_lossy_never_errors() {
165 let cb = ChannelBuffer::Lossy(RingBuffer::new(8));
167
168 assert_eq!(cb.push(b"aaaa").unwrap(), 0);
170
171 assert_eq!(cb.push(b"bbbb").unwrap(), 1);
173
174 assert_eq!(cb.frame_count(), 1);
175 assert_eq!(cb.try_pop().unwrap(), b"bbbb");
176 }
177
178 #[test]
179 fn push_reliable_errors_when_full() {
180 let cb = ChannelBuffer::Reliable(Queue::new(16));
182
183 cb.push(b"aaaa").unwrap(); cb.push(b"bbbb").unwrap(); let err = cb.push(b"cccc").unwrap_err();
188 assert!(matches!(err, Error::ChannelFull));
189
190 assert_eq!(cb.frame_count(), 2);
192 }
193
194 #[test]
195 fn push_checked_lossy() {
196 let cb = ChannelBuffer::Lossy(RingBuffer::new(8));
197 assert_eq!(cb.push_checked(b"aaaa").unwrap(), PushOutcome::Accepted(0));
198 assert_eq!(cb.push_checked(b"bbbb").unwrap(), PushOutcome::Accepted(1));
199
200 assert_eq!(cb.push_checked(&[0u8; 100]).unwrap(), PushOutcome::TooLarge);
202 }
203
204 #[test]
205 fn push_checked_reliable() {
206 let cb = ChannelBuffer::Reliable(Queue::new(16));
207 assert_eq!(cb.push_checked(b"aaaa").unwrap(), PushOutcome::Accepted(0));
208 assert_eq!(cb.push_checked(b"bbbb").unwrap(), PushOutcome::Accepted(0));
209 let err = cb.push_checked(b"cccc").unwrap_err();
210 assert!(matches!(err, Error::ChannelFull));
211 }
212
213 #[test]
214 fn drain_format_identical() {
215 let lossy = ChannelBuffer::Lossy(RingBuffer::new(1024));
216 let reliable = ChannelBuffer::Reliable(Queue::new(1024));
217
218 let frames: &[&[u8]] = &[b"hello", b"world", b"test"];
220 for frame in frames {
221 lossy.push(frame).unwrap();
222 reliable.push(frame).unwrap();
223 }
224
225 let lossy_blob = lossy.drain_all();
226 let reliable_blob = reliable.drain_all();
227
228 assert_eq!(lossy_blob, reliable_blob);
229 assert!(!lossy_blob.is_empty());
230 }
231
232 #[test]
233 fn clear_delegates() {
234 let cb = ChannelBuffer::Reliable(Queue::new(1024));
235 cb.push(b"one").unwrap();
236 cb.push(b"two").unwrap();
237 assert_eq!(cb.frame_count(), 2);
238
239 cb.clear();
240 assert_eq!(cb.frame_count(), 0);
241 assert_eq!(cb.bytes_used(), 0);
242 }
243
244 #[test]
245 fn bytes_used_delegates() {
246 let cb = ChannelBuffer::Lossy(RingBuffer::new(1024));
247 assert_eq!(cb.bytes_used(), 0);
248
249 cb.push(b"abc").unwrap(); assert_eq!(cb.bytes_used(), 7);
251 }
252}