Skip to main content

ph_eventing/
traits.rs

1//! Common traits for event producers and consumers.
2//!
3//! These traits abstract over the different ring buffer types so that
4//! generic code can work with any combination of producer and consumer.
5//!
6//! | Trait | Role | Implementors |
7//! |-------|------|-------------|
8//! | [`Sink`] | Accept events | [`RingBuf`], [`seq_ring::Producer`], [`event_buf::Producer`] |
9//! | [`Source`] | Yield events | [`seq_ring::Consumer`], [`event_buf::Consumer`] |
10//! | [`Link`] | Both — accept *and* yield | Blanket impl for any `Sink<In> + Source<Out>` |
11//!
12//! The free function [`forward`] transfers items from any [`Source`] to any
13//! [`Sink`], stopping when the source is empty or the sink rejects a value.
14//!
15//! [`RingBuf`]: crate::RingBuf
16//! [`seq_ring::Producer`]: crate::seq_ring::Producer
17//! [`seq_ring::Consumer`]: crate::seq_ring::Consumer
18//! [`event_buf::Producer`]: crate::event_buf::Producer
19//! [`event_buf::Consumer`]: crate::event_buf::Consumer
20
21/// Accept events.
22///
23/// `Error` is the type returned when the sink cannot accept the value.
24/// Sinks that never reject (e.g. overwrite rings) use
25/// [`core::convert::Infallible`].
26///
27/// # Implementors
28/// - [`crate::RingBuf`] — always succeeds (`Error = Infallible`).
29/// - [`crate::seq_ring::Producer`] — always succeeds (`Error = Infallible`).
30/// - [`crate::event_buf::Producer`] — returns `Err(val)` when full (`Error = T`).
31pub trait Sink<T> {
32    /// The error returned when the sink cannot accept the value.
33    type Error;
34
35    /// Push a value into the sink.
36    ///
37    /// Returns `Ok(())` on success or `Err(Self::Error)` when the sink is
38    /// unable to accept the value.
39    fn try_push(&mut self, val: T) -> Result<(), Self::Error>;
40}
41
42/// Yield events.
43///
44/// # Implementors
45/// - [`crate::seq_ring::Consumer`] — drains the next in-order item.
46/// - [`crate::event_buf::Consumer`] — pops the oldest buffered item.
47pub trait Source<T> {
48    /// Pull the next event, or `None` if nothing is available.
49    fn try_pop(&mut self) -> Option<T>;
50}
51
52/// A bidirectional pass-through: accepts `In` and yields `Out`.
53///
54/// This is automatically implemented for any type that is both
55/// [`Sink<In>`] and [`Source<Out>`].
56pub trait Link<In, Out>: Sink<In> + Source<Out> {}
57
58impl<In, Out, L> Link<In, Out> for L where L: Sink<In> + Source<Out> {}
59
60/// Transfer up to `max` items from a [`Source`] into a [`Sink`].
61///
62/// Stops early if the source is empty or the sink returns an error.
63/// Returns `(transferred, last_error)` — `last_error` is `Some` only if
64/// the sink rejected a value.
65///
66/// # Example
67/// ```
68/// use ph_eventing::{EventBuf, SeqRing};
69/// use ph_eventing::traits::{Sink, Source, forward};
70///
71/// let seq = SeqRing::<u32, 8>::new();
72/// let sp = seq.producer();
73/// let mut sc = seq.consumer();
74///
75/// sp.push(1);
76/// sp.push(2);
77///
78/// let eb = EventBuf::<u32, 8>::new();
79/// let mut ep = eb.producer();
80///
81/// let (n, err) = forward(&mut sc, &mut ep, 10);
82/// assert_eq!(n, 2);
83/// assert!(err.is_none());
84/// ```
85pub fn forward<T, S, K>(src: &mut S, snk: &mut K, max: usize) -> (usize, Option<K::Error>)
86where
87    S: Source<T>,
88    K: Sink<T>,
89{
90    let mut count = 0;
91    while count < max {
92        let val = match src.try_pop() {
93            Some(v) => v,
94            None => break,
95        };
96        match snk.try_push(val) {
97            Ok(()) => count += 1,
98            Err(e) => return (count, Some(e)),
99        }
100    }
101    (count, None)
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::{EventBuf, RingBuf, SeqRing};
108
109    // ── Sink tests ─────────────────────────────────────────────────
110
111    #[test]
112    fn ringbuf_as_sink() {
113        let mut ring = RingBuf::<u32, 4>::new();
114        assert!(ring.try_push(1).is_ok());
115        assert!(ring.try_push(2).is_ok());
116        assert_eq!(ring.len(), 2);
117    }
118
119    #[test]
120    fn seq_producer_as_sink() {
121        let ring = SeqRing::<u32, 4>::new();
122        let mut p = ring.producer();
123        assert!(p.try_push(10).is_ok());
124        assert!(p.try_push(20).is_ok());
125    }
126
127    #[test]
128    fn event_producer_as_sink() {
129        let buf = EventBuf::<u32, 2>::new();
130        let mut p = buf.producer();
131        assert!(p.try_push(1).is_ok());
132        assert!(p.try_push(2).is_ok());
133        assert_eq!(p.try_push(3), Err(3));
134    }
135
136    // ── Source tests ───────────────────────────────────────────────
137
138    #[test]
139    fn seq_consumer_as_source() {
140        let ring = SeqRing::<u32, 4>::new();
141        let p = ring.producer();
142        let mut c = ring.consumer();
143
144        p.push(10);
145        p.push(20);
146
147        assert_eq!(c.try_pop(), Some(10));
148        assert_eq!(c.try_pop(), Some(20));
149        assert_eq!(c.try_pop(), None);
150    }
151
152    #[test]
153    fn event_consumer_as_source() {
154        let buf = EventBuf::<u32, 4>::new();
155        let p = buf.producer();
156        let mut c = buf.consumer();
157
158        p.push(10).unwrap();
159        p.push(20).unwrap();
160
161        assert_eq!(c.try_pop(), Some(10));
162        assert_eq!(c.try_pop(), Some(20));
163        assert_eq!(c.try_pop(), None);
164    }
165
166    // ── forward tests ──────────────────────────────────────────────
167
168    #[test]
169    fn forward_seq_to_event() {
170        let seq = SeqRing::<u32, 8>::new();
171        let sp = seq.producer();
172        let mut sc = seq.consumer();
173
174        sp.push(1);
175        sp.push(2);
176        sp.push(3);
177
178        let eb = EventBuf::<u32, 8>::new();
179        let mut ep = eb.producer();
180
181        let (n, err) = forward(&mut sc, &mut ep, 10);
182        assert_eq!(n, 3);
183        assert!(err.is_none());
184
185        let ec = eb.consumer();
186        assert_eq!(ec.pop(), Some(1));
187        assert_eq!(ec.pop(), Some(2));
188        assert_eq!(ec.pop(), Some(3));
189    }
190
191    #[test]
192    fn forward_event_to_ringbuf() {
193        let eb = EventBuf::<u32, 8>::new();
194        let ep = eb.producer();
195        let mut ec = eb.consumer();
196
197        ep.push(10).unwrap();
198        ep.push(20).unwrap();
199
200        let mut ring = RingBuf::<u32, 4>::new();
201
202        let (n, err) = forward(&mut ec, &mut ring, 10);
203        assert_eq!(n, 2);
204        assert!(err.is_none());
205        assert_eq!(ring.get(0), Some(10));
206        assert_eq!(ring.get(1), Some(20));
207    }
208
209    #[test]
210    fn forward_stops_when_sink_full() {
211        let src_buf = EventBuf::<u32, 8>::new();
212        let sp = src_buf.producer();
213        let mut sc = src_buf.consumer();
214
215        for i in 0..5 {
216            sp.push(i).unwrap();
217        }
218
219        let dst_buf = EventBuf::<u32, 2>::new();
220        let mut dp = dst_buf.producer();
221
222        let (n, err) = forward(&mut sc, &mut dp, 10);
223        assert_eq!(n, 2);
224        assert_eq!(err, Some(2)); // third item rejected
225    }
226
227    #[test]
228    fn forward_empty_source_transfers_nothing() {
229        let seq = SeqRing::<u32, 4>::new();
230        let _sp = seq.producer();
231        let mut sc = seq.consumer();
232
233        let eb = EventBuf::<u32, 4>::new();
234        let mut ep = eb.producer();
235
236        let (n, err) = forward(&mut sc, &mut ep, 10);
237        assert_eq!(n, 0);
238        assert!(err.is_none());
239    }
240
241    // ── generic helper to prove trait-generic code compiles ────────
242
243    fn drain_all_into_vec<T: Copy, S: Source<T>>(src: &mut S) -> std::vec::Vec<T> {
244        let mut out = std::vec::Vec::new();
245        while let Some(v) = src.try_pop() {
246            out.push(v);
247        }
248        out
249    }
250
251    #[test]
252    fn generic_drain_seq() {
253        let ring = SeqRing::<u32, 4>::new();
254        let p = ring.producer();
255        let mut c = ring.consumer();
256
257        p.push(1);
258        p.push(2);
259        p.push(3);
260
261        let v = drain_all_into_vec(&mut c);
262        assert_eq!(v, [1, 2, 3]);
263    }
264
265    #[test]
266    fn generic_drain_event() {
267        let buf = EventBuf::<u32, 4>::new();
268        let p = buf.producer();
269        let mut c = buf.consumer();
270
271        p.push(1).unwrap();
272        p.push(2).unwrap();
273
274        let v = drain_all_into_vec(&mut c);
275        assert_eq!(v, [1, 2]);
276    }
277}