tokio_process_tools/output_stream/
event.rs1use crate::StreamReadError;
2use bytes::{Buf, Bytes};
3
4#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17pub struct Chunk(pub(crate) Bytes);
18
19impl AsRef<[u8]> for Chunk {
20 fn as_ref(&self) -> &[u8] {
21 self.0.chunk()
22 }
23}
24
25impl From<Bytes> for Chunk {
26 fn from(bytes: Bytes) -> Self {
27 Self(bytes)
28 }
29}
30
31impl From<&'static [u8]> for Chunk {
32 fn from(bytes: &'static [u8]) -> Self {
33 Self(Bytes::from_static(bytes))
34 }
35}
36
37impl<const N: usize> From<&'static [u8; N]> for Chunk {
38 fn from(bytes: &'static [u8; N]) -> Self {
39 Self(Bytes::from_static(bytes))
40 }
41}
42
43impl PartialEq<[u8]> for Chunk {
44 fn eq(&self, other: &[u8]) -> bool {
45 self.as_ref() == other
46 }
47}
48
49impl PartialEq<&[u8]> for Chunk {
50 fn eq(&self, other: &&[u8]) -> bool {
51 self.as_ref() == *other
52 }
53}
54
55impl<const N: usize> PartialEq<&[u8; N]> for Chunk {
56 fn eq(&self, other: &&[u8; N]) -> bool {
57 self.as_ref() == other.as_slice()
58 }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum StreamEvent {
67 Chunk(Chunk),
69
70 Gap,
76
77 Eof,
81
82 ReadError(StreamReadError),
86}
87
88impl StreamEvent {
89 pub fn chunk(chunk: impl Into<Chunk>) -> Self {
91 Self::Chunk(chunk.into())
92 }
93}
94
95#[cfg(test)]
96pub(crate) mod tests {
97 use super::*;
98 use assertr::AssertThat;
99 use assertr::actual::Actual;
100 use assertr::mode::Panic;
101 use tokio::sync::mpsc;
102
103 pub(crate) async fn event_receiver(events: Vec<StreamEvent>) -> mpsc::Receiver<StreamEvent> {
104 let (tx, rx) = mpsc::channel(events.len().max(1));
105 for event in events {
106 tx.send(event).await.unwrap();
107 }
108 drop(tx);
109 rx
110 }
111
112 pub(crate) trait StreamEventAssertions<'t> {
113 #[allow(clippy::wrong_self_convention)]
116 fn is_chunk(self) -> AssertThat<'t, Chunk, Panic>;
117 }
118
119 impl<'t> StreamEventAssertions<'t> for AssertThat<'t, StreamEvent, Panic> {
120 #[track_caller]
121 fn is_chunk(self) -> AssertThat<'t, Chunk, Panic> {
122 if !matches!(self.actual(), StreamEvent::Chunk(_)) {
123 let actual = self.actual();
124 self.fail(format_args!(
125 "Actual: {actual:#?}\n\nis not of expected variant: StreamEvent::Chunk"
126 ));
127 }
128
129 self.map(|actual| match actual {
130 Actual::Owned(StreamEvent::Chunk(chunk)) => Actual::Owned(chunk),
131 Actual::Borrowed(StreamEvent::Chunk(chunk)) => Actual::Borrowed(chunk),
132 _ => unreachable!("variant verified above"),
133 })
134 }
135 }
136}