pub trait Sink<T> {
type Error;
fn try_push(&mut self, val: T) -> Result<(), Self::Error>;
}
pub trait Source<T> {
fn try_pop(&mut self) -> Option<T>;
}
pub trait Link<In, Out>: Sink<In> + Source<Out> {}
impl<In, Out, L> Link<In, Out> for L where L: Sink<In> + Source<Out> {}
pub fn forward<T, S, K>(src: &mut S, snk: &mut K, max: usize) -> (usize, Option<K::Error>)
where
S: Source<T>,
K: Sink<T>,
{
let mut count = 0;
while count < max {
let val = match src.try_pop() {
Some(v) => v,
None => break,
};
match snk.try_push(val) {
Ok(()) => count += 1,
Err(e) => return (count, Some(e)),
}
}
(count, None)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventBuf, RingBuf, SeqRing};
#[test]
fn ringbuf_as_sink() {
let mut ring = RingBuf::<u32, 4>::new();
assert!(ring.try_push(1).is_ok());
assert!(ring.try_push(2).is_ok());
assert_eq!(ring.len(), 2);
}
#[test]
fn seq_producer_as_sink() {
let ring = SeqRing::<u32, 4>::new();
let mut p = ring.producer();
assert!(p.try_push(10).is_ok());
assert!(p.try_push(20).is_ok());
}
#[test]
fn event_producer_as_sink() {
let buf = EventBuf::<u32, 2>::new();
let mut p = buf.producer();
assert!(p.try_push(1).is_ok());
assert!(p.try_push(2).is_ok());
assert_eq!(p.try_push(3), Err(3));
}
#[test]
fn seq_consumer_as_source() {
let ring = SeqRing::<u32, 4>::new();
let p = ring.producer();
let mut c = ring.consumer();
p.push(10);
p.push(20);
assert_eq!(c.try_pop(), Some(10));
assert_eq!(c.try_pop(), Some(20));
assert_eq!(c.try_pop(), None);
}
#[test]
fn event_consumer_as_source() {
let buf = EventBuf::<u32, 4>::new();
let p = buf.producer();
let mut c = buf.consumer();
p.push(10).unwrap();
p.push(20).unwrap();
assert_eq!(c.try_pop(), Some(10));
assert_eq!(c.try_pop(), Some(20));
assert_eq!(c.try_pop(), None);
}
#[test]
fn forward_seq_to_event() {
let seq = SeqRing::<u32, 8>::new();
let sp = seq.producer();
let mut sc = seq.consumer();
sp.push(1);
sp.push(2);
sp.push(3);
let eb = EventBuf::<u32, 8>::new();
let mut ep = eb.producer();
let (n, err) = forward(&mut sc, &mut ep, 10);
assert_eq!(n, 3);
assert!(err.is_none());
let ec = eb.consumer();
assert_eq!(ec.pop(), Some(1));
assert_eq!(ec.pop(), Some(2));
assert_eq!(ec.pop(), Some(3));
}
#[test]
fn forward_event_to_ringbuf() {
let eb = EventBuf::<u32, 8>::new();
let ep = eb.producer();
let mut ec = eb.consumer();
ep.push(10).unwrap();
ep.push(20).unwrap();
let mut ring = RingBuf::<u32, 4>::new();
let (n, err) = forward(&mut ec, &mut ring, 10);
assert_eq!(n, 2);
assert!(err.is_none());
assert_eq!(ring.get(0), Some(10));
assert_eq!(ring.get(1), Some(20));
}
#[test]
fn forward_stops_when_sink_full() {
let src_buf = EventBuf::<u32, 8>::new();
let sp = src_buf.producer();
let mut sc = src_buf.consumer();
for i in 0..5 {
sp.push(i).unwrap();
}
let dst_buf = EventBuf::<u32, 2>::new();
let mut dp = dst_buf.producer();
let (n, err) = forward(&mut sc, &mut dp, 10);
assert_eq!(n, 2);
assert_eq!(err, Some(2)); }
#[test]
fn forward_empty_source_transfers_nothing() {
let seq = SeqRing::<u32, 4>::new();
let _sp = seq.producer();
let mut sc = seq.consumer();
let eb = EventBuf::<u32, 4>::new();
let mut ep = eb.producer();
let (n, err) = forward(&mut sc, &mut ep, 10);
assert_eq!(n, 0);
assert!(err.is_none());
}
fn drain_all_into_vec<T: Copy, S: Source<T>>(src: &mut S) -> std::vec::Vec<T> {
let mut out = std::vec::Vec::new();
while let Some(v) = src.try_pop() {
out.push(v);
}
out
}
#[test]
fn generic_drain_seq() {
let ring = SeqRing::<u32, 4>::new();
let p = ring.producer();
let mut c = ring.consumer();
p.push(1);
p.push(2);
p.push(3);
let v = drain_all_into_vec(&mut c);
assert_eq!(v, [1, 2, 3]);
}
#[test]
fn generic_drain_event() {
let buf = EventBuf::<u32, 4>::new();
let p = buf.producer();
let mut c = buf.consumer();
p.push(1).unwrap();
p.push(2).unwrap();
let v = drain_all_into_vec(&mut c);
assert_eq!(v, [1, 2]);
}
}