1use bytes::{Buf, BytesMut};
2use std::io::BufRead;
3
4pub mod broadcast;
5pub(crate) mod impls;
6pub mod single_subscriber;
7
8pub trait OutputStream {}
13
14pub struct FromStreamOptions {
19 pub chunk_size: usize,
23
24 pub channel_capacity: usize,
32}
33
34impl Default for FromStreamOptions {
35 fn default() -> Self {
36 Self {
37 chunk_size: 16 * 1024, channel_capacity: 128, }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct Chunk(bytes::Bytes);
57
58impl AsRef<[u8]> for Chunk {
59 fn as_ref(&self) -> &[u8] {
60 self.0.chunk()
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum BackpressureControl {
66 DropLatestIncomingIfBufferFull,
68
69 BlockUntilBufferHasSpace,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum Next {
82 Continue,
83 Break,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum LineOverflowBehavior {
89 #[default]
92 DropAdditionalData,
93
94 EmitAdditionalAsNewLines,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub struct LineParsingOptions {
107 pub max_line_length: NumBytes,
120
121 pub overflow_behavior: LineOverflowBehavior,
123}
124
125impl Default for LineParsingOptions {
126 fn default() -> Self {
127 Self {
128 max_line_length: 16.kilobytes(),
129 overflow_behavior: LineOverflowBehavior::default(),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct NumBytes(pub usize);
136
137impl NumBytes {
138 pub fn zero() -> Self {
139 Self(0)
140 }
141}
142
143pub trait NumBytesExt {
144 fn bytes(self) -> NumBytes;
145
146 fn kilobytes(self) -> NumBytes;
147
148 fn megabytes(self) -> NumBytes;
149}
150
151impl NumBytesExt for usize {
152 fn bytes(self) -> NumBytes {
153 NumBytes(self)
154 }
155
156 fn kilobytes(self) -> NumBytes {
157 NumBytes(self * 1024)
158 }
159
160 fn megabytes(self) -> NumBytes {
161 NumBytes(self * 1024 * 1024)
162 }
163}
164
165pub(crate) struct LineReader<'c, 'b> {
183 chunk: &'c [u8],
184 line_buffer: &'b mut BytesMut,
185 last_line_length: Option<usize>,
186 options: LineParsingOptions,
187}
188
189impl<'c, 'b> LineReader<'c, 'b> {
190 pub fn new(
191 chunk: &'c [u8],
192 line_buffer: &'b mut BytesMut,
193 options: LineParsingOptions,
194 ) -> Self {
195 Self {
196 chunk,
197 line_buffer,
198 last_line_length: None,
199 options,
200 }
201 }
202
203 fn append_to_line_buffer(&mut self, chunk: &[u8]) {
204 self.line_buffer.extend_from_slice(chunk)
205 }
206
207 fn _take_line(&mut self) -> bytes::Bytes {
208 self.last_line_length = Some(self.line_buffer.len());
209 self.line_buffer.split().freeze()
210 }
211
212 fn take_line(&mut self, full_line_buffer: bool) -> bytes::Bytes {
213 if full_line_buffer {
214 match self.options.overflow_behavior {
215 LineOverflowBehavior::DropAdditionalData => {
216 let _ = self.chunk.skip_until(b'\n');
219 self._take_line()
220 }
221 LineOverflowBehavior::EmitAdditionalAsNewLines => {
222 self._take_line()
226 }
227 }
228 } else {
229 self._take_line()
230 }
231 }
232}
233
234impl Iterator for LineReader<'_, '_> {
235 type Item = bytes::Bytes;
236
237 fn next(&mut self) -> Option<Self::Item> {
238 if self.options.max_line_length.0 != 0 {
242 assert!(self.line_buffer.len() <= self.options.max_line_length.0);
243 }
244
245 if let Some(last_line_length) = self.last_line_length.take() {
249 let reclaimed = self.line_buffer.try_reclaim(last_line_length);
251 if !reclaimed {
252 tracing::warn!(
253 "Could not reclaim {last_line_length} bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."
254 );
255 }
256 }
257
258 if self.chunk.is_empty() {
261 return None;
262 }
263
264 let remaining_line_length = if self.options.max_line_length.0 == 0 {
266 usize::MAX
267 } else {
268 self.options.max_line_length.0 - self.line_buffer.len()
269 };
270
271 if remaining_line_length == 0 {
274 return Some(self.take_line(true));
275 }
276
277 let (usable, rest) = self
281 .chunk
282 .split_at(usize::min(self.chunk.len(), remaining_line_length));
283
284 match usable.iter().position(|b| *b == b'\n') {
286 None => {
287 self.append_to_line_buffer(usable);
289 self.chunk = rest;
290
291 if rest.is_empty() {
292 None
295 } else {
296 assert_eq!(self.line_buffer.len(), self.options.max_line_length.0);
298 Some(self.take_line(true))
299 }
300 }
301 Some(pos) => {
302 let (usable_until_line_break, _usable_rest) = usable.split_at(pos);
304 self.append_to_line_buffer(usable_until_line_break);
305
306 let rest = &self.chunk[usable_until_line_break.len()..];
312
313 self.chunk = if rest.len() > 1 { &rest[1..] } else { &[] };
315
316 Some(self.take_line(false))
318 }
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use crate::output_stream::LineReader;
326 use crate::{LineOverflowBehavior, LineParsingOptions, NumBytes};
327 use assertr::prelude::*;
328 use bytes::{Bytes, BytesMut};
329 use std::time::Duration;
330 use tokio::io::{AsyncWrite, AsyncWriteExt};
331 use tracing_test::traced_test;
332
333 pub(crate) async fn write_test_data(mut write: impl AsyncWrite + Unpin) {
334 write.write_all("Cargo.lock\n".as_bytes()).await.unwrap();
335 tokio::time::sleep(Duration::from_millis(50)).await;
336 write.write_all("Cargo.toml\n".as_bytes()).await.unwrap();
337 tokio::time::sleep(Duration::from_millis(50)).await;
338 write.write_all("README.md\n".as_bytes()).await.unwrap();
339 tokio::time::sleep(Duration::from_millis(50)).await;
340 write.write_all("src\n".as_bytes()).await.unwrap();
341 tokio::time::sleep(Duration::from_millis(50)).await;
342 write.write_all("target\n".as_bytes()).await.unwrap();
343 tokio::time::sleep(Duration::from_millis(50)).await;
344 }
345
346 #[test]
347 #[traced_test]
348 fn multi_byte_utf_8_characters_are_preserved_even_when_parsing_multiple_one_byte_chunks() {
349 let mut line_buffer = BytesMut::new();
350 let mut collected_lines: Vec<String> = Vec::new();
351
352 let data = "❤️❤️❤️\n👍\n";
353 for byte in data.as_bytes() {
354 let lr = LineReader {
355 chunk: &[*byte],
356 line_buffer: &mut line_buffer,
357 last_line_length: None,
358 options: LineParsingOptions::default(),
359 };
360 for line in lr {
361 collected_lines.push(String::from_utf8_lossy(&line).to_string());
362 }
363 }
364
365 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "👍"]);
366 }
367
368 #[test]
369 #[traced_test]
370 fn reclaims_line_buffer_space_before_collecting_new_line() {
371 let mut line_buffer = BytesMut::new();
372 let mut collected_lines: Vec<String> = Vec::new();
373 let mut bytes: Vec<Bytes> = Vec::new();
374
375 let data = "❤️❤️❤️\n❤️❤️❤️\n";
376 for byte in data.as_bytes() {
377 let lr = LineReader {
378 chunk: &[*byte],
379 line_buffer: &mut line_buffer,
380 last_line_length: None,
381 options: LineParsingOptions::default(),
382 };
383 for line in lr {
384 collected_lines.push(String::from_utf8_lossy(&line).to_string());
385 bytes.push(line);
386 }
387 }
388
389 let data = "❤️❤️❤️\n";
390 let lr = LineReader {
391 chunk: data.as_bytes(),
392 line_buffer: &mut line_buffer,
393 last_line_length: None,
394 options: LineParsingOptions::default(),
395 };
396 for line in lr {
397 collected_lines.push(String::from_utf8_lossy(&line).to_string());
398 bytes.push(line);
399 }
400
401 assert_that(collected_lines).contains_exactly(&["❤️❤️❤️", "❤️❤️❤️", "❤️❤️❤️"]);
402
403 logs_assert(|lines: &[&str]| {
404 match lines
405 .iter()
406 .filter(|line| line.contains("Could not reclaim 18 bytes of line_buffer space. DO NOT store a yielded line (of type `bytes::Bytes`) long term. If you need to, clone it instead, to prevent the `line_buffer` from growing indefinitely (for any additional line processed). Also, make sure to set an appropriate `options.max_line_length`."))
407 .count()
408 {
409 3 => {}
410 n => return Err(format!("Expected exactly one log, but found {n}")),
411 };
412 Ok(())
413 });
414 }
415
416 #[test]
417 fn line_reader() {
418 fn run_test_case(
420 test_name: &str,
421 chunk: &[u8],
422 line_buffer_before: &str,
423 line_buffer_after: &str,
424 expected_lines: &[&str],
425 options: LineParsingOptions,
426 ) {
427 let mut line_buffer = BytesMut::from(line_buffer_before);
428 let mut collected_lines: Vec<String> = Vec::new();
429
430 let lr = LineReader {
431 chunk,
432 line_buffer: &mut line_buffer,
433 last_line_length: None,
434 options,
435 };
436 for line in lr {
437 collected_lines.push(String::from_utf8_lossy(&line).to_string());
438 }
439
440 assert_that(line_buffer)
441 .with_detail_message(format!("Test case: {test_name}"))
442 .is_equal_to(line_buffer_after);
443
444 let expected_lines: Vec<String> =
445 expected_lines.iter().map(|s| s.to_string()).collect();
446
447 assert_that(collected_lines)
448 .with_detail_message(format!("Test case: {test_name}"))
449 .is_equal_to(expected_lines);
450 }
451
452 run_test_case(
453 "Test 1: Empty chunk",
454 b"",
455 "previous: ",
456 "previous: ",
457 &[],
458 LineParsingOptions::default(),
459 );
460
461 run_test_case(
462 "Test 2: Chunk with no newlines",
463 b"no newlines here",
464 "previous: ",
465 "previous: no newlines here",
466 &[],
467 LineParsingOptions::default(),
468 );
469
470 run_test_case(
471 "Test 3: Single complete line",
472 b"one line\n",
473 "",
474 "",
475 &["one line"],
476 LineParsingOptions::default(),
477 );
478
479 run_test_case(
480 "Test 4: Multiple complete lines",
481 b"first line\nsecond line\nthird line\n",
482 "",
483 "",
484 &["first line", "second line", "third line"],
485 LineParsingOptions::default(),
486 );
487
488 run_test_case(
489 "Test 5: Partial line at the end",
490 b"complete line\npartial",
491 "",
492 "partial",
493 &["complete line"],
494 LineParsingOptions::default(),
495 );
496
497 run_test_case(
498 "Test 6: Initial line with multiple newlines",
499 b"continuation\nmore lines\n",
500 "previous: ",
501 "",
502 &["previous: continuation", "more lines"],
503 LineParsingOptions::default(),
504 );
505
506 run_test_case(
507 "Test 7: Invalid UTF8 data",
508 b"valid utf8\xF0\x28\x8C\xBC invalid utf8\n",
509 "",
510 "",
511 &["valid utf8�(�� invalid utf8"],
512 LineParsingOptions::default(),
513 );
514
515 run_test_case(
516 "Test 8 - Rest of too long line is dropped",
517 b"123456789\nabcdefghi\n",
518 "",
519 "",
520 &["1234", "abcd"],
521 LineParsingOptions {
522 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::DropAdditionalData,
524 },
525 );
526
527 run_test_case(
528 "Test 9 - Rest of too long line is returned as additional lines",
529 b"123456789\nabcdefghi\n",
530 "",
531 "",
532 &["1234", "5678", "9", "abcd", "efgh", "i"],
533 LineParsingOptions {
534 max_line_length: NumBytes(4), overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
536 },
537 );
538
539 run_test_case(
540 "Test 10 - max line length of 0 disables line length checks #1",
541 b"123456789\nabcdefghi\n",
542 "",
543 "",
544 &["123456789", "abcdefghi"],
545 LineParsingOptions {
546 max_line_length: NumBytes(0),
547 overflow_behavior: LineOverflowBehavior::DropAdditionalData,
548 },
549 );
550
551 run_test_case(
552 "Test 11 - max line length of 0 disables line length checks #2",
553 b"123456789\nabcdefghi\n",
554 "",
555 "",
556 &["123456789", "abcdefghi"],
557 LineParsingOptions {
558 max_line_length: NumBytes(0),
559 overflow_behavior: LineOverflowBehavior::EmitAdditionalAsNewLines,
560 },
561 );
562 }
563}