1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::AsyncLineVisitor;
5use crate::output_stream::visitor::AsyncStreamVisitor;
6use std::borrow::Cow;
7use std::io;
8use tokio::io::AsyncWriteExt;
9use typed_builder::TypedBuilder;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum LineWriteMode {
14 AsIs,
19
20 AppendLf,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum SinkWriteErrorAction {
30 Stop,
33
34 Continue,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SinkWriteOperation {
41 Chunk,
43
44 Line,
46
47 LineDelimiter,
49}
50
51#[derive(Debug)]
53pub struct SinkWriteError {
54 stream_name: &'static str,
55 operation: SinkWriteOperation,
56 attempted_len: usize,
57 source: io::Error,
58}
59
60impl SinkWriteError {
61 pub(crate) fn new(
62 stream_name: &'static str,
63 operation: SinkWriteOperation,
64 attempted_len: usize,
65 source: io::Error,
66 ) -> Self {
67 Self {
68 stream_name,
69 operation,
70 attempted_len,
71 source,
72 }
73 }
74
75 #[must_use]
77 pub fn stream_name(&self) -> &'static str {
78 self.stream_name
79 }
80
81 #[must_use]
83 pub fn operation(&self) -> SinkWriteOperation {
84 self.operation
85 }
86
87 #[must_use]
89 pub fn attempted_len(&self) -> usize {
90 self.attempted_len
91 }
92
93 #[must_use]
95 pub fn source(&self) -> &io::Error {
96 &self.source
97 }
98}
99
100impl std::fmt::Display for SinkWriteError {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(
103 f,
104 "Failed to write consumed output from stream '{}' to sink: {}",
105 self.stream_name, self.source
106 )
107 }
108}
109
110impl std::error::Error for SinkWriteError {
111 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112 Some(&self.source)
113 }
114}
115
116pub trait SinkWriteErrorHandler: Send + 'static {
118 fn handle(&mut self, error: &SinkWriteError) -> SinkWriteErrorAction;
120}
121
122impl<F> SinkWriteErrorHandler for F
123where
124 F: FnMut(&SinkWriteError) -> SinkWriteErrorAction + Send + 'static,
125{
126 fn handle(&mut self, error: &SinkWriteError) -> SinkWriteErrorAction {
127 self(error)
128 }
129}
130
131#[derive(Debug, Clone, Copy)]
137pub struct WriteCollectionOptions<H = fn(&SinkWriteError) -> SinkWriteErrorAction> {
138 error_handler: H,
139}
140
141impl WriteCollectionOptions {
142 #[must_use]
144 pub fn fail_fast() -> Self {
145 Self {
146 error_handler: |_| SinkWriteErrorAction::Stop,
147 }
148 }
149
150 #[must_use]
152 pub fn log_and_continue() -> Self {
153 Self {
154 error_handler: |error| {
155 tracing::warn!(
156 stream = error.stream_name(),
157 operation = ?error.operation(),
158 attempted_len = error.attempted_len(),
159 source = %error.source(),
160 "Could not write collected output to write sink; continuing"
161 );
162 SinkWriteErrorAction::Continue
163 },
164 }
165 }
166
167 #[must_use]
169 pub fn with_error_handler<H>(handler: H) -> WriteCollectionOptions<H>
170 where
171 H: FnMut(&SinkWriteError) -> SinkWriteErrorAction + Send + 'static,
172 {
173 WriteCollectionOptions {
174 error_handler: handler,
175 }
176 }
177}
178
179impl<H> WriteCollectionOptions<H> {
180 pub fn into_error_handler(self) -> H {
184 self.error_handler
185 }
186}
187
188#[derive(TypedBuilder)]
206pub struct WriteChunks<W, H, F, B>
207where
208 W: Sink + AsyncWriteExt + Unpin,
209 H: SinkWriteErrorHandler,
210 B: AsRef<[u8]> + Send + 'static,
211 F: Fn(Chunk) -> B + Send + Sync + 'static,
212{
213 pub stream_name: &'static str,
215 pub writer: W,
217 pub error_handler: H,
219 pub mapper: F,
221 #[builder(default)]
224 pub error: Option<SinkWriteError>,
225}
226
227impl<W, H> WriteChunks<W, H, fn(Chunk) -> Chunk, Chunk>
228where
229 W: Sink + AsyncWriteExt + Unpin,
230 H: SinkWriteErrorHandler,
231{
232 pub fn passthrough(
235 stream_name: &'static str,
236 writer: W,
237 options: WriteCollectionOptions<H>,
238 ) -> Self {
239 Self {
240 stream_name,
241 writer,
242 error_handler: options.into_error_handler(),
243 mapper: identity_chunk,
244 error: None,
245 }
246 }
247}
248
249impl<W, H, F, B> WriteChunks<W, H, F, B>
250where
251 W: Sink + AsyncWriteExt + Unpin,
252 H: SinkWriteErrorHandler,
253 B: AsRef<[u8]> + Send + 'static,
254 F: Fn(Chunk) -> B + Send + Sync + 'static,
255{
256 pub fn mapped(
260 stream_name: &'static str,
261 writer: W,
262 options: WriteCollectionOptions<H>,
263 mapper: F,
264 ) -> Self {
265 Self {
266 stream_name,
267 writer,
268 error_handler: options.into_error_handler(),
269 mapper,
270 error: None,
271 }
272 }
273}
274
275fn identity_chunk(chunk: Chunk) -> Chunk {
276 chunk
277}
278
279impl<W, H, F, B> AsyncStreamVisitor for WriteChunks<W, H, F, B>
280where
281 W: Sink + AsyncWriteExt + Unpin,
282 H: SinkWriteErrorHandler,
283 B: AsRef<[u8]> + Send + 'static,
284 F: Fn(Chunk) -> B + Send + Sync + 'static,
285{
286 type Output = Result<W, SinkWriteError>;
287
288 async fn on_chunk(&mut self, chunk: Chunk) -> Next {
289 let mapped_output = (self.mapper)(chunk);
290 let bytes = mapped_output.as_ref();
291 let attempted_len = bytes.len();
292 let result = self.writer.write_all(bytes).await;
293 match handle_write_result(
294 self.stream_name,
295 &mut self.error_handler,
296 SinkWriteOperation::Chunk,
297 attempted_len,
298 result,
299 ) {
300 Ok(_) => Next::Continue,
301 Err(err) => {
302 self.error = Some(err);
303 Next::Break
304 }
305 }
306 }
307
308 fn into_output(self) -> Self::Output {
309 match self.error {
310 Some(err) => Err(err),
311 None => Ok(self.writer),
312 }
313 }
314}
315
316pub struct WriteLines<W, H, F, B>
327where
328 W: Sink + AsyncWriteExt + Unpin,
329 H: SinkWriteErrorHandler,
330 B: AsRef<[u8]> + Send + 'static,
331 F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
332{
333 stream_name: &'static str,
334 writer: W,
335 error_handler: H,
336 mapper: F,
337 mode: LineWriteMode,
338 error: Option<SinkWriteError>,
339}
340
341impl<W, H, F, B> WriteLines<W, H, F, B>
342where
343 W: Sink + AsyncWriteExt + Unpin,
344 H: SinkWriteErrorHandler,
345 B: AsRef<[u8]> + Send + 'static,
346 F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
347{
348 pub fn new(
352 stream_name: &'static str,
353 writer: W,
354 error_handler: H,
355 mapper: F,
356 mode: LineWriteMode,
357 ) -> Self {
358 Self {
359 stream_name,
360 writer,
361 error_handler,
362 mapper,
363 mode,
364 error: None,
365 }
366 }
367}
368
369impl<W, H> WriteLines<W, H, fn(Cow<'_, str>) -> String, String>
370where
371 W: Sink + AsyncWriteExt + Unpin,
372 H: SinkWriteErrorHandler,
373{
374 pub fn passthrough(
377 stream_name: &'static str,
378 writer: W,
379 options: WriteCollectionOptions<H>,
380 mode: LineWriteMode,
381 ) -> Self {
382 Self::new(
383 stream_name,
384 writer,
385 options.into_error_handler(),
386 identity_line,
387 mode,
388 )
389 }
390}
391
392fn identity_line(line: Cow<'_, str>) -> String {
393 line.into_owned()
394}
395
396impl<W, H, F, B> AsyncLineVisitor for WriteLines<W, H, F, B>
397where
398 W: Sink + AsyncWriteExt + Unpin,
399 H: SinkWriteErrorHandler,
400 B: AsRef<[u8]> + Send + 'static,
401 F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
402{
403 type Output = Result<W, SinkWriteError>;
404
405 async fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> Next {
406 let mapped_output = (self.mapper)(line);
407 let bytes = mapped_output.as_ref();
408 match write_line(
409 self.stream_name,
410 &mut self.writer,
411 &mut self.error_handler,
412 bytes,
413 self.mode,
414 )
415 .await
416 {
417 Ok(()) => Next::Continue,
418 Err(err) => {
419 self.error = Some(err);
420 Next::Break
421 }
422 }
423 }
424
425 fn into_output(self) -> Self::Output {
426 match self.error {
427 Some(err) => Err(err),
428 None => Ok(self.writer),
429 }
430 }
431}
432
433async fn write_line<W, H>(
434 stream_name: &'static str,
435 write: &mut W,
436 error_handler: &mut H,
437 line: &[u8],
438 mode: LineWriteMode,
439) -> Result<(), SinkWriteError>
440where
441 W: AsyncWriteExt + Unpin,
442 H: SinkWriteErrorHandler,
443{
444 let line_write = write.write_all(line).await;
445 let line_written = handle_write_result(
446 stream_name,
447 error_handler,
448 SinkWriteOperation::Line,
449 line.len(),
450 line_write,
451 )?;
452 if !line_written || !matches!(mode, LineWriteMode::AppendLf) {
453 return Ok(());
454 }
455
456 handle_write_result(
457 stream_name,
458 error_handler,
459 SinkWriteOperation::LineDelimiter,
460 1,
461 write.write_all(b"\n").await,
462 )?;
463
464 Ok(())
465}
466
467fn handle_write_result<H>(
468 stream_name: &'static str,
469 error_handler: &mut H,
470 operation: SinkWriteOperation,
471 attempted_len: usize,
472 result: io::Result<()>,
473) -> Result<bool, SinkWriteError>
474where
475 H: SinkWriteErrorHandler,
476{
477 match result {
478 Ok(()) => Ok(true),
479 Err(source) => {
480 let error = SinkWriteError::new(stream_name, operation, attempted_len, source);
481 match error_handler.handle(&error) {
482 SinkWriteErrorAction::Stop => Err(error),
483 SinkWriteErrorAction::Continue => Ok(false),
484 }
485 }
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use crate::output_stream::Subscription;
493 use crate::output_stream::consumer::Consumer;
494 use crate::output_stream::consumer::driver::spawn_consumer_async;
495 use crate::output_stream::event::StreamEvent;
496 use crate::output_stream::event::tests::event_receiver;
497 use crate::output_stream::line::adapter::ParseLines;
498 use crate::output_stream::line::options::LineParsingOptions;
499 use assertr::prelude::*;
500 use bytes::Bytes;
501 use std::cell::Cell;
502 use std::io;
503 use std::pin::Pin;
504 use std::sync::{Arc, Mutex};
505 use std::task::{Context, Poll};
506 use tokio::io::AsyncWrite;
507
508 fn collect_chunks_into_write<S, W, H>(
512 stream_name: &'static str,
513 subscription: S,
514 write: W,
515 write_options: WriteCollectionOptions<H>,
516 ) -> Consumer<Result<W, SinkWriteError>>
517 where
518 S: Subscription,
519 W: Sink + AsyncWriteExt + Unpin,
520 H: SinkWriteErrorHandler,
521 {
522 spawn_consumer_async(
523 stream_name,
524 subscription,
525 WriteChunks::builder()
526 .stream_name(stream_name)
527 .writer(write)
528 .error_handler(write_options.into_error_handler())
529 .mapper((|chunk: Chunk| chunk) as fn(Chunk) -> Chunk)
530 .error(None)
531 .build(),
532 )
533 }
534
535 fn collect_chunks_into_write_mapped<S, W, B, F, H>(
536 stream_name: &'static str,
537 subscription: S,
538 write: W,
539 mapper: F,
540 write_options: WriteCollectionOptions<H>,
541 ) -> Consumer<Result<W, SinkWriteError>>
542 where
543 S: Subscription,
544 W: Sink + AsyncWriteExt + Unpin,
545 B: AsRef<[u8]> + Send + 'static,
546 F: Fn(Chunk) -> B + Send + Sync + 'static,
547 H: SinkWriteErrorHandler,
548 {
549 spawn_consumer_async(
550 stream_name,
551 subscription,
552 WriteChunks::builder()
553 .stream_name(stream_name)
554 .writer(write)
555 .error_handler(write_options.into_error_handler())
556 .mapper(mapper)
557 .error(None)
558 .build(),
559 )
560 }
561
562 fn collect_lines_into_write<S, W, H>(
563 stream_name: &'static str,
564 subscription: S,
565 write: W,
566 options: LineParsingOptions,
567 mode: LineWriteMode,
568 write_options: WriteCollectionOptions<H>,
569 ) -> Consumer<Result<W, SinkWriteError>>
570 where
571 S: Subscription,
572 W: Sink + AsyncWriteExt + Unpin,
573 H: SinkWriteErrorHandler,
574 {
575 spawn_consumer_async(
576 stream_name,
577 subscription,
578 ParseLines::new(
579 options,
580 WriteLines::new(
581 stream_name,
582 write,
583 write_options.into_error_handler(),
584 (|line: Cow<'_, str>| line.into_owned()) as fn(Cow<'_, str>) -> String,
585 mode,
586 ),
587 ),
588 )
589 }
590
591 fn collect_lines_into_write_mapped<S, W, B, F, H>(
592 stream_name: &'static str,
593 subscription: S,
594 write: W,
595 mapper: F,
596 options: LineParsingOptions,
597 mode: LineWriteMode,
598 write_options: WriteCollectionOptions<H>,
599 ) -> Consumer<Result<W, SinkWriteError>>
600 where
601 S: Subscription,
602 W: Sink + AsyncWriteExt + Unpin,
603 B: AsRef<[u8]> + Send + 'static,
604 F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
605 H: SinkWriteErrorHandler,
606 {
607 spawn_consumer_async(
608 stream_name,
609 subscription,
610 ParseLines::new(
611 options,
612 WriteLines::new(
613 stream_name,
614 write,
615 write_options.into_error_handler(),
616 mapper,
617 mode,
618 ),
619 ),
620 )
621 }
622
623 #[derive(Debug)]
624 struct FailingWrite {
625 fail_after_successful_writes: usize,
626 error_kind: io::ErrorKind,
627 write_calls: usize,
628 bytes_written: usize,
629 }
630
631 impl FailingWrite {
632 fn new(fail_after_successful_writes: usize, error_kind: io::ErrorKind) -> Self {
633 Self {
634 fail_after_successful_writes,
635 error_kind,
636 write_calls: 0,
637 bytes_written: 0,
638 }
639 }
640 }
641
642 impl AsyncWrite for FailingWrite {
643 fn poll_write(
644 mut self: Pin<&mut Self>,
645 _cx: &mut Context<'_>,
646 buf: &[u8],
647 ) -> Poll<io::Result<usize>> {
648 self.write_calls += 1;
649 if self.write_calls > self.fail_after_successful_writes {
650 return Poll::Ready(Err(io::Error::new(
651 self.error_kind,
652 "injected write failure",
653 )));
654 }
655
656 self.bytes_written += buf.len();
657 Poll::Ready(Ok(buf.len()))
658 }
659
660 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
661 Poll::Ready(Ok(()))
662 }
663
664 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
665 Poll::Ready(Ok(()))
666 }
667 }
668
669 #[derive(Default)]
670 struct SendOnlyWrite {
671 bytes: Vec<u8>,
672 write_calls: Cell<usize>,
673 }
674
675 impl AsyncWrite for SendOnlyWrite {
676 fn poll_write(
677 mut self: Pin<&mut Self>,
678 _cx: &mut Context<'_>,
679 buf: &[u8],
680 ) -> Poll<io::Result<usize>> {
681 self.write_calls.set(self.write_calls.get() + 1);
682 self.bytes.extend_from_slice(buf);
683 Poll::Ready(Ok(buf.len()))
684 }
685
686 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
687 Poll::Ready(Ok(()))
688 }
689
690 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
691 Poll::Ready(Ok(()))
692 }
693 }
694
695 #[tokio::test]
696 async fn chunk_writer_reports_and_can_handle_sink_write_errors() {
697 let collector = collect_chunks_into_write(
698 "custom",
699 event_receiver(vec![
700 StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
701 StreamEvent::Eof,
702 ])
703 .await,
704 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
705 WriteCollectionOptions::fail_fast(),
706 );
707
708 match collector.wait().await {
709 Ok(Err(err)) => {
710 assert_that!(err.stream_name()).is_equal_to("custom");
711 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
712 }
713 other => {
714 assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
715 }
716 }
717
718 let handled_count = Arc::new(Mutex::new(0_usize));
719 let count_for_handler = Arc::clone(&handled_count);
720 let collector = collect_chunks_into_write(
721 "custom",
722 event_receiver(vec![
723 StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
724 StreamEvent::Eof,
725 ])
726 .await,
727 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
728 WriteCollectionOptions::with_error_handler(move |err| {
729 assert_that!(err.stream_name()).is_equal_to("custom");
730 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
731 *count_for_handler.lock().unwrap() += 1;
732 SinkWriteErrorAction::Continue
733 }),
734 );
735
736 let write = collector.wait().await.unwrap().unwrap();
737 assert_that!(write.bytes_written).is_equal_to(0);
738 assert_that!(*handled_count.lock().unwrap()).is_equal_to(1);
739 }
740
741 #[tokio::test]
742 async fn line_writer_reports_line_and_delimiter_write_errors() {
743 let line_error = collect_lines_into_write(
744 "custom",
745 event_receiver(vec![
746 StreamEvent::Chunk(Chunk(Bytes::from_static(b"line\n"))),
747 StreamEvent::Eof,
748 ])
749 .await,
750 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
751 LineParsingOptions::default(),
752 LineWriteMode::AppendLf,
753 WriteCollectionOptions::fail_fast(),
754 )
755 .wait()
756 .await;
757 match line_error {
758 Ok(Err(err)) => {
759 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
760 }
761 other => {
762 assert_that!(&other).fail(format_args!("expected line write error, got {other:?}"));
763 }
764 }
765
766 let delimiter_error = collect_lines_into_write(
767 "custom",
768 event_receiver(vec![
769 StreamEvent::Chunk(Chunk(Bytes::from_static(b"line\n"))),
770 StreamEvent::Eof,
771 ])
772 .await,
773 FailingWrite::new(1, io::ErrorKind::WriteZero),
774 LineParsingOptions::default(),
775 LineWriteMode::AppendLf,
776 WriteCollectionOptions::fail_fast(),
777 )
778 .wait()
779 .await;
780 match delimiter_error {
781 Ok(Err(err)) => {
782 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::WriteZero);
783 }
784 other => {
785 assert_that!(&other).fail(format_args!(
786 "expected delimiter write error, got {other:?}"
787 ));
788 }
789 }
790 }
791
792 #[tokio::test]
793 async fn line_writer_respects_requested_delimiter_mode() {
794 let collector = collect_lines_into_write(
795 "custom",
796 event_receiver(vec![
797 StreamEvent::Chunk(Chunk(Bytes::from_static(
798 b"Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n",
799 ))),
800 StreamEvent::Eof,
801 ])
802 .await,
803 SendOnlyWrite::default(),
804 LineParsingOptions::default(),
805 LineWriteMode::AsIs,
806 WriteCollectionOptions::fail_fast(),
807 );
808
809 let writer = collector.wait().await.unwrap().unwrap();
810 assert_that!(writer.bytes).is_equal_to(b"Cargo.lockCargo.tomlREADME.mdsrctarget".to_vec());
811 }
812
813 #[tokio::test]
814 async fn chunk_writer_accepts_send_only_writer() {
815 let collector = collect_chunks_into_write(
816 "custom",
817 event_receiver(vec![
818 StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
819 StreamEvent::Chunk(Chunk(Bytes::from_static(b"def"))),
820 StreamEvent::Eof,
821 ])
822 .await,
823 SendOnlyWrite::default(),
824 WriteCollectionOptions::fail_fast(),
825 );
826
827 let writer = collector.wait().await.unwrap().unwrap();
828 assert_that!(writer.bytes).is_equal_to(b"abcdef".to_vec());
829 assert_that!(writer.write_calls.get()).is_greater_than(0);
830 }
831
832 #[tokio::test]
833 async fn chunk_writer_mapped_writes_mapped_output() {
834 let collector = collect_chunks_into_write_mapped(
835 "custom",
836 event_receiver(vec![
837 StreamEvent::Chunk(Chunk(Bytes::from_static(b"Cargo.lock\n"))),
838 StreamEvent::Chunk(Chunk(Bytes::from_static(b"Cargo.toml\n"))),
839 StreamEvent::Eof,
840 ])
841 .await,
842 SendOnlyWrite::default(),
843 |chunk| String::from_utf8_lossy(chunk.as_ref()).to_string(),
844 WriteCollectionOptions::fail_fast(),
845 );
846
847 let writer = collector.wait().await.unwrap().unwrap();
848 assert_that!(writer.bytes).is_equal_to(b"Cargo.lock\nCargo.toml\n".to_vec());
849 }
850
851 #[tokio::test]
852 async fn mapped_writers_return_sink_write_errors() {
853 let chunk_error = collect_chunks_into_write_mapped(
854 "custom",
855 event_receiver(vec![
856 StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
857 StreamEvent::Eof,
858 ])
859 .await,
860 FailingWrite::new(0, io::ErrorKind::ConnectionReset),
861 |chunk| chunk,
862 WriteCollectionOptions::fail_fast(),
863 )
864 .wait()
865 .await;
866 match chunk_error {
867 Ok(Err(err)) => {
868 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::ConnectionReset);
869 }
870 other => {
871 assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
872 }
873 }
874
875 let line_error = collect_lines_into_write_mapped(
876 "custom",
877 event_receiver(vec![
878 StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\n"))),
879 StreamEvent::Eof,
880 ])
881 .await,
882 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
883 |line| line.into_owned().into_bytes(),
884 LineParsingOptions::default(),
885 LineWriteMode::AsIs,
886 WriteCollectionOptions::fail_fast(),
887 )
888 .wait()
889 .await;
890 match line_error {
891 Ok(Err(err)) => {
892 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
893 }
894 other => {
895 assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
896 }
897 }
898 }
899
900 #[tokio::test]
901 async fn line_write_error_handler_can_continue_after_sink_write_errors() {
902 let events = Arc::new(Mutex::new(Vec::new()));
903 let handled_events = Arc::clone(&events);
904 let collector = collect_lines_into_write(
905 "custom",
906 event_receiver(vec![
907 StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\n"))),
908 StreamEvent::Eof,
909 ])
910 .await,
911 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
912 LineParsingOptions::default(),
913 LineWriteMode::AppendLf,
914 WriteCollectionOptions::with_error_handler(move |err| {
915 handled_events.lock().unwrap().push((
916 err.stream_name(),
917 err.operation(),
918 err.attempted_len(),
919 err.source().kind(),
920 ));
921 SinkWriteErrorAction::Continue
922 }),
923 );
924
925 let write = collector.wait().await.unwrap().unwrap();
926 assert_that!(write.bytes_written).is_equal_to(0);
927 assert_that!(events.lock().unwrap().as_slice()).is_equal_to([
928 (
929 "custom",
930 SinkWriteOperation::Line,
931 1,
932 io::ErrorKind::BrokenPipe,
933 ),
934 (
935 "custom",
936 SinkWriteOperation::Line,
937 1,
938 io::ErrorKind::BrokenPipe,
939 ),
940 ]);
941 }
942
943 #[tokio::test]
944 async fn chunk_write_error_handler_can_continue_then_stop() {
945 let handled_count = Arc::new(Mutex::new(0_usize));
946 let count_for_handler = Arc::clone(&handled_count);
947 let collector = collect_chunks_into_write(
948 "custom",
949 event_receiver(vec![
950 StreamEvent::Chunk(Chunk(Bytes::from_static(b"a"))),
951 StreamEvent::Chunk(Chunk(Bytes::from_static(b"b"))),
952 StreamEvent::Eof,
953 ])
954 .await,
955 FailingWrite::new(0, io::ErrorKind::BrokenPipe),
956 WriteCollectionOptions::with_error_handler(move |err| {
957 assert_that!(err.operation()).is_equal_to(SinkWriteOperation::Chunk);
958 let mut count = count_for_handler.lock().unwrap();
959 *count += 1;
960 if *count == 1 {
961 SinkWriteErrorAction::Continue
962 } else {
963 SinkWriteErrorAction::Stop
964 }
965 }),
966 );
967
968 match collector.wait().await {
969 Ok(Err(err)) => {
970 assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
971 }
972 other => {
973 assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
974 }
975 }
976 assert_that!(*handled_count.lock().unwrap()).is_equal_to(2);
977 }
978}