Skip to main content

tokio_process_tools/output_stream/
event.rs

1use crate::StreamReadError;
2use bytes::{Buf, Bytes};
3
4/// A "chunk" is an arbitrarily sized byte slice read from the underlying stream.
5/// The slices' length is at max of the previously configured maximum `chunk_size`.
6///
7/// We use the word "chunk", as it is often used when processing collections in segments or when
8/// dealing with buffered I/O operations where data arrives in variable-sized pieces.
9///
10/// In contrast to this, a "frame" typically carries more specific semantics. It usually implies a
11/// complete logical unit with defined boundaries within a protocol or format. This we do not have
12/// here.
13///
14/// Note: If the underlying stream is of lower buffer size, chunks of full `chunk_size` length may
15/// never be observed.
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17pub struct Chunk(pub(crate) Bytes);
18
19impl AsRef<[u8]> for Chunk {
20    fn as_ref(&self) -> &[u8] {
21        self.0.chunk()
22    }
23}
24
25impl From<Bytes> for Chunk {
26    fn from(bytes: Bytes) -> Self {
27        Self(bytes)
28    }
29}
30
31impl From<&'static [u8]> for Chunk {
32    fn from(bytes: &'static [u8]) -> Self {
33        Self(Bytes::from_static(bytes))
34    }
35}
36
37impl<const N: usize> From<&'static [u8; N]> for Chunk {
38    fn from(bytes: &'static [u8; N]) -> Self {
39        Self(Bytes::from_static(bytes))
40    }
41}
42
43impl PartialEq<[u8]> for Chunk {
44    fn eq(&self, other: &[u8]) -> bool {
45        self.as_ref() == other
46    }
47}
48
49impl PartialEq<&[u8]> for Chunk {
50    fn eq(&self, other: &&[u8]) -> bool {
51        self.as_ref() == *other
52    }
53}
54
55impl<const N: usize> PartialEq<&[u8; N]> for Chunk {
56    fn eq(&self, other: &&[u8; N]) -> bool {
57        self.as_ref() == other.as_slice()
58    }
59}
60
61/// Event emitted by an output stream backend.
62///
63/// Stream backends send these events to communicate raw process output, deliberate loss of output
64/// when a bounded buffer cannot keep up, and the terminal end-of-stream marker.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum StreamEvent {
67    /// Bytes read from the underlying stdout or stderr stream.
68    Chunk(Chunk),
69
70    /// Marker indicating that one or more chunks were skipped.
71    ///
72    /// This can be emitted when a lossy backend drops buffered output for a slow consumer. Line
73    /// parsers use it to discard any partially accumulated line so data from before and after the
74    /// gap is not joined into a line that never existed in the source stream.
75    Gap,
76
77    /// End of the underlying stdout or stderr stream.
78    ///
79    /// This is the terminal event for the stream. After it is emitted, no more events are expected.
80    Eof,
81
82    /// The underlying stdout or stderr stream failed while being read.
83    ///
84    /// This is terminal, but distinct from EOF because the stream did not end cleanly.
85    ReadError(StreamReadError),
86}
87
88impl StreamEvent {
89    /// Convenience constructor for [`StreamEvent::Chunk`].
90    pub fn chunk(chunk: impl Into<Chunk>) -> Self {
91        Self::Chunk(chunk.into())
92    }
93}
94
95#[cfg(test)]
96pub(crate) mod tests {
97    use super::*;
98    use assertr::AssertThat;
99    use assertr::actual::Actual;
100    use assertr::mode::Panic;
101    use tokio::sync::mpsc;
102
103    pub(crate) async fn event_receiver(events: Vec<StreamEvent>) -> mpsc::Receiver<StreamEvent> {
104        let (tx, rx) = mpsc::channel(events.len().max(1));
105        for event in events {
106            tx.send(event).await.unwrap();
107        }
108        drop(tx);
109        rx
110    }
111
112    pub(crate) trait StreamEventAssertions<'t> {
113        /// Assert this event is the [`StreamEvent::Chunk`] variant and continue the chain
114        /// against the contained [`Chunk`].
115        #[allow(clippy::wrong_self_convention)]
116        fn is_chunk(self) -> AssertThat<'t, Chunk, Panic>;
117    }
118
119    impl<'t> StreamEventAssertions<'t> for AssertThat<'t, StreamEvent, Panic> {
120        #[track_caller]
121        fn is_chunk(self) -> AssertThat<'t, Chunk, Panic> {
122            if !matches!(self.actual(), StreamEvent::Chunk(_)) {
123                let actual = self.actual();
124                self.fail(format_args!(
125                    "Actual: {actual:#?}\n\nis not of expected variant: StreamEvent::Chunk"
126                ));
127            }
128
129            self.map(|actual| match actual {
130                Actual::Owned(StreamEvent::Chunk(chunk)) => Actual::Owned(chunk),
131                Actual::Borrowed(StreamEvent::Chunk(chunk)) => Actual::Borrowed(chunk),
132                _ => unreachable!("variant verified above"),
133            })
134        }
135    }
136}