1pub trait Sink<T> {
32 type Error;
34
35 fn try_push(&mut self, val: T) -> Result<(), Self::Error>;
40}
41
42pub trait Source<T> {
48 fn try_pop(&mut self) -> Option<T>;
50}
51
52pub trait Link<In, Out>: Sink<In> + Source<Out> {}
57
58impl<In, Out, L> Link<In, Out> for L where L: Sink<In> + Source<Out> {}
59
60pub fn forward<T, S, K>(src: &mut S, snk: &mut K, max: usize) -> (usize, Option<K::Error>)
86where
87 S: Source<T>,
88 K: Sink<T>,
89{
90 let mut count = 0;
91 while count < max {
92 let val = match src.try_pop() {
93 Some(v) => v,
94 None => break,
95 };
96 match snk.try_push(val) {
97 Ok(()) => count += 1,
98 Err(e) => return (count, Some(e)),
99 }
100 }
101 (count, None)
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::{EventBuf, RingBuf, SeqRing};
108
109 #[test]
112 fn ringbuf_as_sink() {
113 let mut ring = RingBuf::<u32, 4>::new();
114 assert!(ring.try_push(1).is_ok());
115 assert!(ring.try_push(2).is_ok());
116 assert_eq!(ring.len(), 2);
117 }
118
119 #[test]
120 fn seq_producer_as_sink() {
121 let ring = SeqRing::<u32, 4>::new();
122 let mut p = ring.producer();
123 assert!(p.try_push(10).is_ok());
124 assert!(p.try_push(20).is_ok());
125 }
126
127 #[test]
128 fn event_producer_as_sink() {
129 let buf = EventBuf::<u32, 2>::new();
130 let mut p = buf.producer();
131 assert!(p.try_push(1).is_ok());
132 assert!(p.try_push(2).is_ok());
133 assert_eq!(p.try_push(3), Err(3));
134 }
135
136 #[test]
139 fn seq_consumer_as_source() {
140 let ring = SeqRing::<u32, 4>::new();
141 let p = ring.producer();
142 let mut c = ring.consumer();
143
144 p.push(10);
145 p.push(20);
146
147 assert_eq!(c.try_pop(), Some(10));
148 assert_eq!(c.try_pop(), Some(20));
149 assert_eq!(c.try_pop(), None);
150 }
151
152 #[test]
153 fn event_consumer_as_source() {
154 let buf = EventBuf::<u32, 4>::new();
155 let p = buf.producer();
156 let mut c = buf.consumer();
157
158 p.push(10).unwrap();
159 p.push(20).unwrap();
160
161 assert_eq!(c.try_pop(), Some(10));
162 assert_eq!(c.try_pop(), Some(20));
163 assert_eq!(c.try_pop(), None);
164 }
165
166 #[test]
169 fn forward_seq_to_event() {
170 let seq = SeqRing::<u32, 8>::new();
171 let sp = seq.producer();
172 let mut sc = seq.consumer();
173
174 sp.push(1);
175 sp.push(2);
176 sp.push(3);
177
178 let eb = EventBuf::<u32, 8>::new();
179 let mut ep = eb.producer();
180
181 let (n, err) = forward(&mut sc, &mut ep, 10);
182 assert_eq!(n, 3);
183 assert!(err.is_none());
184
185 let ec = eb.consumer();
186 assert_eq!(ec.pop(), Some(1));
187 assert_eq!(ec.pop(), Some(2));
188 assert_eq!(ec.pop(), Some(3));
189 }
190
191 #[test]
192 fn forward_event_to_ringbuf() {
193 let eb = EventBuf::<u32, 8>::new();
194 let ep = eb.producer();
195 let mut ec = eb.consumer();
196
197 ep.push(10).unwrap();
198 ep.push(20).unwrap();
199
200 let mut ring = RingBuf::<u32, 4>::new();
201
202 let (n, err) = forward(&mut ec, &mut ring, 10);
203 assert_eq!(n, 2);
204 assert!(err.is_none());
205 assert_eq!(ring.get(0), Some(10));
206 assert_eq!(ring.get(1), Some(20));
207 }
208
209 #[test]
210 fn forward_stops_when_sink_full() {
211 let src_buf = EventBuf::<u32, 8>::new();
212 let sp = src_buf.producer();
213 let mut sc = src_buf.consumer();
214
215 for i in 0..5 {
216 sp.push(i).unwrap();
217 }
218
219 let dst_buf = EventBuf::<u32, 2>::new();
220 let mut dp = dst_buf.producer();
221
222 let (n, err) = forward(&mut sc, &mut dp, 10);
223 assert_eq!(n, 2);
224 assert_eq!(err, Some(2)); }
226
227 #[test]
228 fn forward_empty_source_transfers_nothing() {
229 let seq = SeqRing::<u32, 4>::new();
230 let _sp = seq.producer();
231 let mut sc = seq.consumer();
232
233 let eb = EventBuf::<u32, 4>::new();
234 let mut ep = eb.producer();
235
236 let (n, err) = forward(&mut sc, &mut ep, 10);
237 assert_eq!(n, 0);
238 assert!(err.is_none());
239 }
240
241 fn drain_all_into_vec<T: Copy, S: Source<T>>(src: &mut S) -> std::vec::Vec<T> {
244 let mut out = std::vec::Vec::new();
245 while let Some(v) = src.try_pop() {
246 out.push(v);
247 }
248 out
249 }
250
251 #[test]
252 fn generic_drain_seq() {
253 let ring = SeqRing::<u32, 4>::new();
254 let p = ring.producer();
255 let mut c = ring.consumer();
256
257 p.push(1);
258 p.push(2);
259 p.push(3);
260
261 let v = drain_all_into_vec(&mut c);
262 assert_eq!(v, [1, 2, 3]);
263 }
264
265 #[test]
266 fn generic_drain_event() {
267 let buf = EventBuf::<u32, 4>::new();
268 let p = buf.producer();
269 let mut c = buf.consumer();
270
271 p.push(1).unwrap();
272 p.push(2).unwrap();
273
274 let v = drain_all_into_vec(&mut c);
275 assert_eq!(v, [1, 2]);
276 }
277}