Skip to main content

docspec_core/
pipeline.rs

1//! Stream-driver helper for connecting an [`EventSource`] to an [`EventSink`].
2
3use crate::{EventSink, EventSource, Result};
4
5/// Pull events from `source` and push them into `sink` until the source
6/// drains, then call [`EventSink::finish`] on the sink.
7///
8/// This is the canonical way to run a `DocSpec` conversion pipeline. The
9/// function returns on the first error from either side and never buffers
10/// events beyond the single in-flight event.
11///
12/// # Errors
13///
14/// Returns any error produced by `source.next_event()`, `sink.handle_event()`,
15/// or `sink.finish()`. Errors propagate immediately; processing stops at the
16/// first error.
17#[inline]
18pub fn pipe<S, K>(mut source: S, mut sink: K) -> Result<()>
19where
20    S: EventSource,
21    K: EventSink,
22{
23    while let Some(event) = source.next_event()? {
24        sink.handle_event(event)?;
25    }
26    sink.finish()
27}
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use crate::{Error, Event};
33    use alloc::rc::Rc;
34    use alloc::vec;
35    use alloc::vec::Vec;
36    use core::cell::{Cell, RefCell};
37
38    /// Externally-observable probe shared between a test and its sink. The sink
39    /// holds one `Rc` clone, the test holds another; mutations to `finished`
40    /// and `events` survive the `EventSink::finish(self)` consume because the
41    /// `Rc` keeps the inner cells alive on the test side.
42    #[derive(Default, Clone)]
43    struct Probe {
44        events: Rc<RefCell<Vec<Event>>>,
45        finished: Rc<Cell<bool>>,
46    }
47
48    struct VecSource {
49        events: alloc::vec::IntoIter<Event>,
50    }
51    impl VecSource {
52        fn new(events: Vec<Event>) -> Self {
53            Self {
54                events: events.into_iter(),
55            }
56        }
57    }
58    impl EventSource for VecSource {
59        fn next_event(&mut self) -> Result<Option<Event>> {
60            Ok(self.events.next())
61        }
62    }
63
64    struct CollectingSink {
65        probe: Probe,
66    }
67    impl CollectingSink {
68        fn new(probe: Probe) -> Self {
69            Self { probe }
70        }
71    }
72    impl EventSink for CollectingSink {
73        fn finish(self) -> Result<()> {
74            self.probe.finished.set(true);
75            Ok(())
76        }
77        fn handle_event(&mut self, event: Event) -> Result<()> {
78            self.probe.events.borrow_mut().push(event);
79            Ok(())
80        }
81    }
82
83    struct ErroringSource;
84    impl EventSource for ErroringSource {
85        fn next_event(&mut self) -> Result<Option<Event>> {
86            Err(Error::Other {
87                message: "source boom".to_string(),
88            })
89        }
90    }
91
92    struct ErroringSink {
93        finished: Rc<Cell<bool>>,
94    }
95    impl EventSink for ErroringSink {
96        fn finish(self) -> Result<()> {
97            self.finished.set(true);
98            Ok(())
99        }
100        fn handle_event(&mut self, _: Event) -> Result<()> {
101            Err(Error::Other {
102                message: "sink boom".to_string(),
103            })
104        }
105    }
106
107    #[test]
108    fn pipe_drains_empty_source_and_finishes() {
109        let probe = Probe::default();
110        let result = pipe(VecSource::new(vec![]), CollectingSink::new(probe.clone()));
111        assert!(result.is_ok());
112        assert!(
113            probe.finished.get(),
114            "finish() must be called when source drains cleanly"
115        );
116        assert!(
117            probe.events.borrow().is_empty(),
118            "no events should reach the sink from an empty source"
119        );
120    }
121
122    #[test]
123    fn pipe_forwards_all_events_and_finishes() {
124        let probe = Probe::default();
125        let events = vec![
126            Event::StartDocument {
127                id: None,
128                language: None,
129                metadata: None,
130            },
131            Event::EndDocument,
132        ];
133        let result = pipe(VecSource::new(events), CollectingSink::new(probe.clone()));
134        assert!(result.is_ok());
135        assert!(
136            probe.finished.get(),
137            "finish() must be called after all events are forwarded"
138        );
139        let collected = probe.events.borrow();
140        assert!(
141            matches!(
142                collected.as_slice(),
143                [Event::StartDocument { .. }, Event::EndDocument]
144            ),
145            "expected exactly [StartDocument, EndDocument], got {collected:?}"
146        );
147    }
148
149    #[test]
150    fn pipe_propagates_source_error() {
151        let probe = Probe::default();
152        let result = pipe(ErroringSource, CollectingSink::new(probe.clone()));
153        assert!(matches!(result, Err(Error::Other { .. })));
154        assert!(
155            !probe.finished.get(),
156            "finish() must be skipped when the source errors"
157        );
158    }
159
160    #[test]
161    fn pipe_propagates_sink_error_and_skips_finish() {
162        let finished = Rc::new(Cell::new(false));
163        let events = vec![Event::StartDocument {
164            id: None,
165            language: None,
166            metadata: None,
167        }];
168        let result = pipe(
169            VecSource::new(events),
170            ErroringSink {
171                finished: Rc::clone(&finished),
172            },
173        );
174        assert!(matches!(result, Err(Error::Other { .. })));
175        assert!(
176            !finished.get(),
177            "finish() must be skipped when the sink errors during handle_event"
178        );
179    }
180}