1use crate::{EventSink, EventSource, Result};
4
5#[inline]
18pub fn pipe<S, K>(mut source: S, mut sink: K) -> Result<()>
19where
20 S: EventSource,
21 K: EventSink,
22{
23 while let Some(event) = source.next_event()? {
24 sink.handle_event(event)?;
25 }
26 sink.finish()
27}
28
29#[cfg(test)]
30mod tests {
31 use super::*;
32 use crate::{Error, Event};
33 use alloc::rc::Rc;
34 use alloc::vec;
35 use alloc::vec::Vec;
36 use core::cell::{Cell, RefCell};
37
38 #[derive(Default, Clone)]
43 struct Probe {
44 events: Rc<RefCell<Vec<Event>>>,
45 finished: Rc<Cell<bool>>,
46 }
47
48 struct VecSource {
49 events: alloc::vec::IntoIter<Event>,
50 }
51 impl VecSource {
52 fn new(events: Vec<Event>) -> Self {
53 Self {
54 events: events.into_iter(),
55 }
56 }
57 }
58 impl EventSource for VecSource {
59 fn next_event(&mut self) -> Result<Option<Event>> {
60 Ok(self.events.next())
61 }
62 }
63
64 struct CollectingSink {
65 probe: Probe,
66 }
67 impl CollectingSink {
68 fn new(probe: Probe) -> Self {
69 Self { probe }
70 }
71 }
72 impl EventSink for CollectingSink {
73 fn finish(self) -> Result<()> {
74 self.probe.finished.set(true);
75 Ok(())
76 }
77 fn handle_event(&mut self, event: Event) -> Result<()> {
78 self.probe.events.borrow_mut().push(event);
79 Ok(())
80 }
81 }
82
83 struct ErroringSource;
84 impl EventSource for ErroringSource {
85 fn next_event(&mut self) -> Result<Option<Event>> {
86 Err(Error::Other {
87 message: "source boom".to_string(),
88 })
89 }
90 }
91
92 struct ErroringSink {
93 finished: Rc<Cell<bool>>,
94 }
95 impl EventSink for ErroringSink {
96 fn finish(self) -> Result<()> {
97 self.finished.set(true);
98 Ok(())
99 }
100 fn handle_event(&mut self, _: Event) -> Result<()> {
101 Err(Error::Other {
102 message: "sink boom".to_string(),
103 })
104 }
105 }
106
107 #[test]
108 fn pipe_drains_empty_source_and_finishes() {
109 let probe = Probe::default();
110 let result = pipe(VecSource::new(vec![]), CollectingSink::new(probe.clone()));
111 assert!(result.is_ok());
112 assert!(
113 probe.finished.get(),
114 "finish() must be called when source drains cleanly"
115 );
116 assert!(
117 probe.events.borrow().is_empty(),
118 "no events should reach the sink from an empty source"
119 );
120 }
121
122 #[test]
123 fn pipe_forwards_all_events_and_finishes() {
124 let probe = Probe::default();
125 let events = vec![
126 Event::StartDocument {
127 id: None,
128 language: None,
129 metadata: None,
130 },
131 Event::EndDocument,
132 ];
133 let result = pipe(VecSource::new(events), CollectingSink::new(probe.clone()));
134 assert!(result.is_ok());
135 assert!(
136 probe.finished.get(),
137 "finish() must be called after all events are forwarded"
138 );
139 let collected = probe.events.borrow();
140 assert!(
141 matches!(
142 collected.as_slice(),
143 [Event::StartDocument { .. }, Event::EndDocument]
144 ),
145 "expected exactly [StartDocument, EndDocument], got {collected:?}"
146 );
147 }
148
149 #[test]
150 fn pipe_propagates_source_error() {
151 let probe = Probe::default();
152 let result = pipe(ErroringSource, CollectingSink::new(probe.clone()));
153 assert!(matches!(result, Err(Error::Other { .. })));
154 assert!(
155 !probe.finished.get(),
156 "finish() must be skipped when the source errors"
157 );
158 }
159
160 #[test]
161 fn pipe_propagates_sink_error_and_skips_finish() {
162 let finished = Rc::new(Cell::new(false));
163 let events = vec![Event::StartDocument {
164 id: None,
165 language: None,
166 metadata: None,
167 }];
168 let result = pipe(
169 VecSource::new(events),
170 ErroringSink {
171 finished: Rc::clone(&finished),
172 },
173 );
174 assert!(matches!(result, Err(Error::Other { .. })));
175 assert!(
176 !finished.get(),
177 "finish() must be skipped when the sink errors during handle_event"
178 );
179 }
180}