1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
//! Synchronous I/O module
use std::{collections::VecDeque, io::Read};
use crate::{deserialize::Deserializable, types::stream::ScannedItem, Error, GenResult};
/// Ask a struct to write itself to some impl Write
pub trait WriteInto {
/// Write the struct to the writer
fn write_into(&self, writer: &mut dyn std::io::Write) -> std::io::Result<()>;
}
/// Look for subunit events in an input stream.
#[derive(Debug)]
pub struct Scanner<R> {
buffer: VecDeque<u8>,
reader: R,
read_buf: Box<[u8; 4096]>,
}
/// Iterate over a Readable, yielding the contents as `ScannedItems`.
pub fn iter_stream<R: Read>(reader: R) -> impl Iterator<Item = GenResult<ScannedItem>> {
// Maximum buffer needed to process subunit packets is 4MB
let buffer = VecDeque::<u8>::with_capacity(4 * 1024 * 1024);
Scanner {
buffer,
reader,
read_buf: Box::new([0u8; 4096]),
}
}
impl<R> Iterator for Scanner<R>
where
R: Read,
{
type Item = GenResult<ScannedItem>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let buf = self.buffer.make_contiguous();
let required_bytes = match ScannedItem::required_bytes(buf) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
if buf.len() >= required_bytes {
// We have enough data - parse it
break;
}
// Need to read more data from the reader
// Use the reusable read buffer to avoid allocations
match self.reader.read(&mut self.read_buf[..]) {
Ok(0) => {
// EOF reached - check one more time if we have enough bytes
// before declaring this Unknown. This handles the case where
// required_bytes returns a conservative estimate that gets
// refined as more data becomes available.
if self.buffer.is_empty() {
return None;
}
let buf = self.buffer.make_contiguous();
let required_bytes = match ScannedItem::required_bytes(buf) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
if buf.len() >= required_bytes {
// We actually do have enough data
break;
}
// Truly incomplete packet at EOF
return Some(Ok(ScannedItem::Unknown(
self.buffer.drain(..).collect(),
Error::NotEnoughBytes.into(),
)));
}
Ok(n) => {
// Extend buffer with the bytes we actually read
self.buffer.extend(&self.read_buf[..n]);
}
Err(e) => return Some(Err(e.into())),
}
}
// Now we have enough data to do something with it.
let buf = self.buffer.make_contiguous();
match ScannedItem::deserialize(buf) {
Ok((ScannedItem::Event(event), used)) => {
self.buffer.drain(..used);
Some(Ok(ScannedItem::Event(event)))
}
Ok((ScannedItem::Bytes(_), _)) => {
// Collect all consecutive non-event bytes into a single item
let mut bytes = Vec::new();
while let Some(&byte) = self.buffer.front() {
if byte == crate::constants::V2_SIGNATURE {
break;
}
bytes.push(self.buffer.pop_front().unwrap());
}
Some(Ok(ScannedItem::Bytes(bytes)))
}
Ok((ScannedItem::Unknown(data, e), used)) => {
self.buffer.drain(..used);
Some(Ok(ScannedItem::Unknown(data, e)))
}
Err(e) => {
// We know from the loop above that we had enough bytes, and this is not IO: some form of junk.
// We have an invalid char or failed crc32 or similar.
let buf = self.buffer.make_contiguous();
let required_bytes = ScannedItem::required_bytes(buf).unwrap_or(1);
Some(Ok(ScannedItem::Unknown(
self.buffer.drain(..required_bytes).collect(),
e,
)))
}
}
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use chrono::NaiveDate;
use crate::{
io::sync,
serialize::Serializable,
types::{event::Event, stream::ScannedItem, teststatus::TestStatus},
};
#[test]
fn test_write_full_test_event_with_file_content() {
let event = Event::new(TestStatus::InProgress)
.test_id("A_test_id")
.datetime(
NaiveDate::from_ymd_opt(2014, 7, 8)
.unwrap()
.and_hms_opt(9, 10, 11)
.unwrap()
.and_utc(),
)
.unwrap()
.tag("tag_a")
.tag("tag_b")
.mime_type("text/plain;charset=utf8")
.file_content("stdout:''", b"stdout content")
.build();
let event_a = Event::new(TestStatus::Failed)
.test_id("A_test_id")
.datetime(
NaiveDate::from_ymd_opt(2014, 7, 8)
.unwrap()
.and_hms_opt(9, 12, 1)
.unwrap()
.and_utc(),
)
.unwrap()
.tag("tag_a")
.tag("tag_b")
.build();
let mut buffer = event.to_vec().unwrap();
event_a.serialize(&mut buffer).unwrap();
let mut count = 0;
for (parsed_event, event) in
sync::iter_stream(Cursor::new(&buffer)).zip([event, event_a].iter())
{
count += 1;
let parsed_event = parsed_event.unwrap();
let ScannedItem::Event(parsed_event) = parsed_event else {
panic!("Expected event, got {:?}", parsed_event);
};
assert_eq!(*event, parsed_event);
}
assert_eq!(count, 2, "Expected to read 2 events, got {}", count);
}
#[test]
fn test_scanner_reads_owned_cursor() {
// This test exposes the bug: Scanner fails when given ownership of the Cursor
// (as opposed to borrowing it like the test above)
let event = Event::new(TestStatus::Success).test_id("test").build();
let buffer = event.to_vec().unwrap();
// Pass owned Cursor - this should work but doesn't due to Scanner bug
let mut count = 0;
for item in sync::iter_stream(Cursor::new(buffer)) {
let item = item.unwrap();
if matches!(item, ScannedItem::Event(_)) {
count += 1;
}
}
assert_eq!(count, 1, "Expected 1 event, got {}", count);
}
#[test]
fn test_stream_with_invalid_utf8() {
// Test that we can parse a stream with invalid UTF-8 bytes interleaved
let event = Event::new(TestStatus::Success).test_id("test").build();
let mut buffer = Vec::new();
// Add some invalid UTF-8 bytes (0xFF is not valid UTF-8 start byte)
buffer.extend_from_slice(&[0xFF, 0xFE, 0xFD]);
// Add a valid event
event.serialize(&mut buffer).unwrap();
// Add more invalid UTF-8
buffer.extend_from_slice(&[0x80, 0x81]);
let items: Vec<_> = sync::iter_stream(Cursor::new(&buffer))
.collect::<Result<Vec<_>, _>>()
.unwrap();
// We should get: 1 Bytes item (with 3 bytes), 1 Event, 1 Bytes item (with 2 bytes)
assert_eq!(items.len(), 3);
match &items[0] {
ScannedItem::Bytes(bytes) => assert_eq!(bytes, &[0xFF, 0xFE, 0xFD]),
_ => panic!("Expected Bytes, got {:?}", items[0]),
}
assert!(matches!(items[1], ScannedItem::Event(_)));
match &items[2] {
ScannedItem::Bytes(bytes) => assert_eq!(bytes, &[0x80, 0x81]),
_ => panic!("Expected Bytes, got {:?}", items[2]),
}
}
#[test]
fn test_many_events() {
// Test that we can parse a large number of events without losing any
const NUM_EVENTS: usize = 3461;
let mut buffer = Vec::new();
for i in 0..NUM_EVENTS {
let event = Event::new(TestStatus::Success)
.test_id(&format!("test_{}", i))
.build();
event.serialize(&mut buffer).unwrap();
}
let mut count = 0;
for item in sync::iter_stream(Cursor::new(&buffer)) {
match item {
Ok(ScannedItem::Event(_)) => count += 1,
Ok(ScannedItem::Unknown(data, e)) => {
panic!(
"Unexpected Unknown item at event {}: {} bytes, error: {:?}",
count,
data.len(),
e
);
}
Ok(ScannedItem::Bytes(_)) => {}
Err(e) => panic!("Error reading event: {:?}", e),
}
}
assert_eq!(count, NUM_EVENTS);
}
#[test]
fn test_no_infinite_loop_on_malformed_stream() {
// This test reproduces the infinite loop bug from TODO.infinite-bug.md
// Raw subunit v2 packet from a simple test command
let data: &[u8] =
b"\xb3\x29\x00\x16test1\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xb3";
let mut count = 0;
for item in sync::iter_stream(Cursor::new(data)) {
count += 1;
if count > 100 {
panic!("Infinite loop detected after {} iterations!", count);
}
// Just consume the item
let _ = item;
}
// Should finish in a reasonable number of iterations (likely 1-3)
assert!(count <= 10, "Expected few iterations, got {}", count);
}
}