Skip to main content

tokio_process_tools/output_stream/visitors/
write.rs

1use crate::output_stream::Next;
2use crate::output_stream::consumer::Sink;
3use crate::output_stream::event::Chunk;
4use crate::output_stream::line::adapter::AsyncLineVisitor;
5use crate::output_stream::visitor::AsyncStreamVisitor;
6use std::borrow::Cow;
7use std::io;
8use tokio::io::AsyncWriteExt;
9use typed_builder::TypedBuilder;
10
11/// Controls how line-based write helpers delimit successive lines.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum LineWriteMode {
14    /// Write lines exactly as parsed, without appending any delimiter.
15    ///
16    /// Use this when your mapper already includes delimiters or when the downstream format does
17    /// not want line separators reintroduced.
18    AsIs,
19
20    /// Append a trailing `\n` after each emitted line.
21    ///
22    /// This reconstructs conventional line-oriented output after parsing removed the original
23    /// newline byte.
24    AppendLf,
25}
26
27/// Action to take after an async writer sink rejects collected output.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum SinkWriteErrorAction {
30    /// Stop collection and surface the [`SinkWriteError`] as the consumer's output. The
31    /// writer-backed consumer's `wait` returns `Ok(Err(sink_write_error))` in that case.
32    Stop,
33
34    /// Accept the individual write failure and keep collecting later stream output.
35    Continue,
36}
37
38/// The write operation that failed while forwarding collected output into an async writer.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SinkWriteOperation {
41    /// A raw output chunk failed to write.
42    Chunk,
43
44    /// Parsed line bytes failed to write.
45    Line,
46
47    /// The line delimiter requested by [`LineWriteMode::AppendLf`] failed to write.
48    LineDelimiter,
49}
50
51/// Details about a failed async write into a collector sink.
52#[derive(Debug)]
53pub struct SinkWriteError {
54    stream_name: &'static str,
55    operation: SinkWriteOperation,
56    attempted_len: usize,
57    source: io::Error,
58}
59
60impl SinkWriteError {
61    pub(crate) fn new(
62        stream_name: &'static str,
63        operation: SinkWriteOperation,
64        attempted_len: usize,
65        source: io::Error,
66    ) -> Self {
67        Self {
68            stream_name,
69            operation,
70            attempted_len,
71            source,
72        }
73    }
74
75    /// The name of the stream this collector operates on.
76    #[must_use]
77    pub fn stream_name(&self) -> &'static str {
78        self.stream_name
79    }
80
81    /// The write operation that failed.
82    #[must_use]
83    pub fn operation(&self) -> SinkWriteOperation {
84        self.operation
85    }
86
87    /// Number of bytes passed to the failed `write_all` call.
88    #[must_use]
89    pub fn attempted_len(&self) -> usize {
90        self.attempted_len
91    }
92
93    /// The underlying async writer error.
94    #[must_use]
95    pub fn source(&self) -> &io::Error {
96        &self.source
97    }
98}
99
100impl std::fmt::Display for SinkWriteError {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(
103            f,
104            "Failed to write consumed output from stream '{}' to sink: {}",
105            self.stream_name, self.source
106        )
107    }
108}
109
110impl std::error::Error for SinkWriteError {
111    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
112        Some(&self.source)
113    }
114}
115
116/// Handles async writer sink failures observed by writer collectors.
117pub trait SinkWriteErrorHandler: Send + 'static {
118    /// Decide whether collection should continue after a sink write failure.
119    fn handle(&mut self, error: &SinkWriteError) -> SinkWriteErrorAction;
120}
121
122impl<F> SinkWriteErrorHandler for F
123where
124    F: FnMut(&SinkWriteError) -> SinkWriteErrorAction + Send + 'static,
125{
126    fn handle(&mut self, error: &SinkWriteError) -> SinkWriteErrorAction {
127        self(error)
128    }
129}
130
131/// Options for forwarding collected stream output into an async writer.
132///
133/// Use [`WriteCollectionOptions::fail_fast`] to stop on the first sink write failure,
134/// [`WriteCollectionOptions::log_and_continue`] to preserve best-effort logging behavior, or
135/// [`WriteCollectionOptions::with_error_handler`] to make a custom per-error decision.
136#[derive(Debug, Clone, Copy)]
137pub struct WriteCollectionOptions<H = fn(&SinkWriteError) -> SinkWriteErrorAction> {
138    error_handler: H,
139}
140
141impl WriteCollectionOptions {
142    /// Creates writer collection options that fail on the first sink write error.
143    #[must_use]
144    pub fn fail_fast() -> Self {
145        Self {
146            error_handler: |_| SinkWriteErrorAction::Stop,
147        }
148    }
149
150    /// Creates writer collection options that log sink write errors and keep collecting.
151    #[must_use]
152    pub fn log_and_continue() -> Self {
153        Self {
154            error_handler: |error| {
155                tracing::warn!(
156                    stream = error.stream_name(),
157                    operation = ?error.operation(),
158                    attempted_len = error.attempted_len(),
159                    source = %error.source(),
160                    "Could not write collected output to write sink; continuing"
161                );
162                SinkWriteErrorAction::Continue
163            },
164        }
165    }
166
167    /// Creates writer collection options with a custom sink write error handler.
168    #[must_use]
169    pub fn with_error_handler<H>(handler: H) -> WriteCollectionOptions<H>
170    where
171        H: FnMut(&SinkWriteError) -> SinkWriteErrorAction + Send + 'static,
172    {
173        WriteCollectionOptions {
174            error_handler: handler,
175        }
176    }
177}
178
179impl<H> WriteCollectionOptions<H> {
180    /// Consumes these options and returns the configured error handler. Useful when wiring a
181    /// [`WriteChunks`] / [`WriteLines`] visitor manually instead of via a built-in
182    /// constructor.
183    pub fn into_error_handler(self) -> H {
184        self.error_handler
185    }
186}
187
188/// Asynchronous [`StreamVisitor`](AsyncStreamVisitor) that runs each observed chunk
189/// through `mapper`, writes the resulting bytes via `writer`, and routes failures through
190/// `error_handler`. The visitor surfaces the writer (or the first sink write error) via
191/// [`AsyncStreamVisitor::into_output`](AsyncStreamVisitor::into_output).
192///
193/// The two common shapes have dedicated constructors:
194/// - [`WriteChunks::passthrough`] writes each chunk's raw bytes unchanged.
195/// - [`WriteChunks::mapped`] runs each chunk through a user mapper first.
196///
197/// If you need full control over every field (e.g. injecting a pre-existing
198/// [`SinkWriteError`] into the `error` slot for tests), use
199/// [`builder`](Self::builder) directly.
200///
201/// `WriteChunks` is [`AsyncStreamVisitor`]-only because the underlying writer is
202/// [`AsyncWriteExt`]. There is no synchronous counterpart; reach for
203/// [`CollectChunks`](crate::visitors::collect::CollectChunks) (with a sink that performs the
204/// blocking write) if you need a sync chunk-level visitor.
205#[derive(TypedBuilder)]
206pub struct WriteChunks<W, H, F, B>
207where
208    W: Sink + AsyncWriteExt + Unpin,
209    H: SinkWriteErrorHandler,
210    B: AsRef<[u8]> + Send + 'static,
211    F: Fn(Chunk) -> B + Send + Sync + 'static,
212{
213    /// Stream name used to label any [`SinkWriteError`] this visitor surfaces.
214    pub stream_name: &'static str,
215    /// Async writer chunks are forwarded into.
216    pub writer: W,
217    /// Routing for sink write failures.
218    pub error_handler: H,
219    /// Per-chunk mapper that produces the bytes actually written to `writer`.
220    pub mapper: F,
221    /// Latest sink write error, if any. Constructors initialize this to `None`.
222    /// The visitor writes into it before signaling [`Next::Break`](Next::Break).
223    #[builder(default)]
224    pub error: Option<SinkWriteError>,
225}
226
227impl<W, H> WriteChunks<W, H, fn(Chunk) -> Chunk, Chunk>
228where
229    W: Sink + AsyncWriteExt + Unpin,
230    H: SinkWriteErrorHandler,
231{
232    /// Identity-mapper constructor. Each chunk's raw bytes are written to `writer` unchanged.
233    /// `stream_name` labels any [`SinkWriteError`] this visitor surfaces.
234    pub fn passthrough(
235        stream_name: &'static str,
236        writer: W,
237        options: WriteCollectionOptions<H>,
238    ) -> Self {
239        Self {
240            stream_name,
241            writer,
242            error_handler: options.into_error_handler(),
243            mapper: identity_chunk,
244            error: None,
245        }
246    }
247}
248
249impl<W, H, F, B> WriteChunks<W, H, F, B>
250where
251    W: Sink + AsyncWriteExt + Unpin,
252    H: SinkWriteErrorHandler,
253    B: AsRef<[u8]> + Send + 'static,
254    F: Fn(Chunk) -> B + Send + Sync + 'static,
255{
256    /// Mapped constructor. Each chunk is run through `mapper` before being written. Use this
257    /// when the bytes you want to forward differ from the raw chunk bytes (for example, a
258    /// rendered line, an envelope-wrapped frame, or a transcoded representation).
259    pub fn mapped(
260        stream_name: &'static str,
261        writer: W,
262        options: WriteCollectionOptions<H>,
263        mapper: F,
264    ) -> Self {
265        Self {
266            stream_name,
267            writer,
268            error_handler: options.into_error_handler(),
269            mapper,
270            error: None,
271        }
272    }
273}
274
275fn identity_chunk(chunk: Chunk) -> Chunk {
276    chunk
277}
278
279impl<W, H, F, B> AsyncStreamVisitor for WriteChunks<W, H, F, B>
280where
281    W: Sink + AsyncWriteExt + Unpin,
282    H: SinkWriteErrorHandler,
283    B: AsRef<[u8]> + Send + 'static,
284    F: Fn(Chunk) -> B + Send + Sync + 'static,
285{
286    type Output = Result<W, SinkWriteError>;
287
288    async fn on_chunk(&mut self, chunk: Chunk) -> Next {
289        let mapped_output = (self.mapper)(chunk);
290        let bytes = mapped_output.as_ref();
291        let attempted_len = bytes.len();
292        let result = self.writer.write_all(bytes).await;
293        match handle_write_result(
294            self.stream_name,
295            &mut self.error_handler,
296            SinkWriteOperation::Chunk,
297            attempted_len,
298            result,
299        ) {
300            Ok(_) => Next::Continue,
301            Err(err) => {
302                self.error = Some(err);
303                Next::Break
304            }
305        }
306    }
307
308    fn into_output(self) -> Self::Output {
309        match self.error {
310            Some(err) => Err(err),
311            None => Ok(self.writer),
312        }
313    }
314}
315
316/// [`AsyncLineVisitor`] that maps each parsed line through `mapper`, writes the result via
317/// `writer`, and routes failures through `error_handler`. Compose with
318/// [`ParseLines`](crate::output_stream::line::adapter::ParseLines) (its
319/// [`AsyncStreamVisitor`] impl is selected automatically when the inner sink is an
320/// [`AsyncLineVisitor`]) and pass the resulting visitor to
321/// [`Consumable::consume_async`](crate::Consumable::consume_async).
322///
323/// The common identity-mapper shape has a dedicated constructor: [`WriteLines::passthrough`].
324/// For mappers that change the line bytes, use [`WriteLines::new`] with a typed mapper
325/// closure.
326pub struct WriteLines<W, H, F, B>
327where
328    W: Sink + AsyncWriteExt + Unpin,
329    H: SinkWriteErrorHandler,
330    B: AsRef<[u8]> + Send + 'static,
331    F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
332{
333    stream_name: &'static str,
334    writer: W,
335    error_handler: H,
336    mapper: F,
337    mode: LineWriteMode,
338    error: Option<SinkWriteError>,
339}
340
341impl<W, H, F, B> WriteLines<W, H, F, B>
342where
343    W: Sink + AsyncWriteExt + Unpin,
344    H: SinkWriteErrorHandler,
345    B: AsRef<[u8]> + Send + 'static,
346    F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
347{
348    /// Creates a new sink that maps each parsed line through `mapper`, writes the result to
349    /// `writer` with the requested `mode`, and routes failures through `error_handler`.
350    /// `stream_name` labels the stream in any [`SinkWriteError`] this sink emits.
351    pub fn new(
352        stream_name: &'static str,
353        writer: W,
354        error_handler: H,
355        mapper: F,
356        mode: LineWriteMode,
357    ) -> Self {
358        Self {
359            stream_name,
360            writer,
361            error_handler,
362            mapper,
363            mode,
364            error: None,
365        }
366    }
367}
368
369impl<W, H> WriteLines<W, H, fn(Cow<'_, str>) -> String, String>
370where
371    W: Sink + AsyncWriteExt + Unpin,
372    H: SinkWriteErrorHandler,
373{
374    /// Identity-mapper constructor. Each parsed line is written to `writer` (with `mode`)
375    /// without transformation. `stream_name` labels any [`SinkWriteError`] this sink emits.
376    pub fn passthrough(
377        stream_name: &'static str,
378        writer: W,
379        options: WriteCollectionOptions<H>,
380        mode: LineWriteMode,
381    ) -> Self {
382        Self::new(
383            stream_name,
384            writer,
385            options.into_error_handler(),
386            identity_line,
387            mode,
388        )
389    }
390}
391
392fn identity_line(line: Cow<'_, str>) -> String {
393    line.into_owned()
394}
395
396impl<W, H, F, B> AsyncLineVisitor for WriteLines<W, H, F, B>
397where
398    W: Sink + AsyncWriteExt + Unpin,
399    H: SinkWriteErrorHandler,
400    B: AsRef<[u8]> + Send + 'static,
401    F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
402{
403    type Output = Result<W, SinkWriteError>;
404
405    async fn on_line<'a>(&'a mut self, line: Cow<'a, str>) -> Next {
406        let mapped_output = (self.mapper)(line);
407        let bytes = mapped_output.as_ref();
408        match write_line(
409            self.stream_name,
410            &mut self.writer,
411            &mut self.error_handler,
412            bytes,
413            self.mode,
414        )
415        .await
416        {
417            Ok(()) => Next::Continue,
418            Err(err) => {
419                self.error = Some(err);
420                Next::Break
421            }
422        }
423    }
424
425    fn into_output(self) -> Self::Output {
426        match self.error {
427            Some(err) => Err(err),
428            None => Ok(self.writer),
429        }
430    }
431}
432
433async fn write_line<W, H>(
434    stream_name: &'static str,
435    write: &mut W,
436    error_handler: &mut H,
437    line: &[u8],
438    mode: LineWriteMode,
439) -> Result<(), SinkWriteError>
440where
441    W: AsyncWriteExt + Unpin,
442    H: SinkWriteErrorHandler,
443{
444    let line_write = write.write_all(line).await;
445    let line_written = handle_write_result(
446        stream_name,
447        error_handler,
448        SinkWriteOperation::Line,
449        line.len(),
450        line_write,
451    )?;
452    if !line_written || !matches!(mode, LineWriteMode::AppendLf) {
453        return Ok(());
454    }
455
456    handle_write_result(
457        stream_name,
458        error_handler,
459        SinkWriteOperation::LineDelimiter,
460        1,
461        write.write_all(b"\n").await,
462    )?;
463
464    Ok(())
465}
466
467fn handle_write_result<H>(
468    stream_name: &'static str,
469    error_handler: &mut H,
470    operation: SinkWriteOperation,
471    attempted_len: usize,
472    result: io::Result<()>,
473) -> Result<bool, SinkWriteError>
474where
475    H: SinkWriteErrorHandler,
476{
477    match result {
478        Ok(()) => Ok(true),
479        Err(source) => {
480            let error = SinkWriteError::new(stream_name, operation, attempted_len, source);
481            match error_handler.handle(&error) {
482                SinkWriteErrorAction::Stop => Err(error),
483                SinkWriteErrorAction::Continue => Ok(false),
484            }
485        }
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use crate::output_stream::Subscription;
493    use crate::output_stream::consumer::Consumer;
494    use crate::output_stream::consumer::driver::spawn_consumer_async;
495    use crate::output_stream::event::StreamEvent;
496    use crate::output_stream::event::tests::event_receiver;
497    use crate::output_stream::line::adapter::ParseLines;
498    use crate::output_stream::line::options::LineParsingOptions;
499    use assertr::prelude::*;
500    use bytes::Bytes;
501    use std::cell::Cell;
502    use std::io;
503    use std::pin::Pin;
504    use std::sync::{Arc, Mutex};
505    use std::task::{Context, Poll};
506    use tokio::io::AsyncWrite;
507
508    // Test-only helpers replacing the deleted factory functions. They build the visitor via
509    // its constructor and spawn a consumer task, the same shape every test wants without
510    // each test repeating the boilerplate.
511    fn collect_chunks_into_write<S, W, H>(
512        stream_name: &'static str,
513        subscription: S,
514        write: W,
515        write_options: WriteCollectionOptions<H>,
516    ) -> Consumer<Result<W, SinkWriteError>>
517    where
518        S: Subscription,
519        W: Sink + AsyncWriteExt + Unpin,
520        H: SinkWriteErrorHandler,
521    {
522        spawn_consumer_async(
523            stream_name,
524            subscription,
525            WriteChunks::builder()
526                .stream_name(stream_name)
527                .writer(write)
528                .error_handler(write_options.into_error_handler())
529                .mapper((|chunk: Chunk| chunk) as fn(Chunk) -> Chunk)
530                .error(None)
531                .build(),
532        )
533    }
534
535    fn collect_chunks_into_write_mapped<S, W, B, F, H>(
536        stream_name: &'static str,
537        subscription: S,
538        write: W,
539        mapper: F,
540        write_options: WriteCollectionOptions<H>,
541    ) -> Consumer<Result<W, SinkWriteError>>
542    where
543        S: Subscription,
544        W: Sink + AsyncWriteExt + Unpin,
545        B: AsRef<[u8]> + Send + 'static,
546        F: Fn(Chunk) -> B + Send + Sync + 'static,
547        H: SinkWriteErrorHandler,
548    {
549        spawn_consumer_async(
550            stream_name,
551            subscription,
552            WriteChunks::builder()
553                .stream_name(stream_name)
554                .writer(write)
555                .error_handler(write_options.into_error_handler())
556                .mapper(mapper)
557                .error(None)
558                .build(),
559        )
560    }
561
562    fn collect_lines_into_write<S, W, H>(
563        stream_name: &'static str,
564        subscription: S,
565        write: W,
566        options: LineParsingOptions,
567        mode: LineWriteMode,
568        write_options: WriteCollectionOptions<H>,
569    ) -> Consumer<Result<W, SinkWriteError>>
570    where
571        S: Subscription,
572        W: Sink + AsyncWriteExt + Unpin,
573        H: SinkWriteErrorHandler,
574    {
575        spawn_consumer_async(
576            stream_name,
577            subscription,
578            ParseLines::new(
579                options,
580                WriteLines::new(
581                    stream_name,
582                    write,
583                    write_options.into_error_handler(),
584                    (|line: Cow<'_, str>| line.into_owned()) as fn(Cow<'_, str>) -> String,
585                    mode,
586                ),
587            ),
588        )
589    }
590
591    fn collect_lines_into_write_mapped<S, W, B, F, H>(
592        stream_name: &'static str,
593        subscription: S,
594        write: W,
595        mapper: F,
596        options: LineParsingOptions,
597        mode: LineWriteMode,
598        write_options: WriteCollectionOptions<H>,
599    ) -> Consumer<Result<W, SinkWriteError>>
600    where
601        S: Subscription,
602        W: Sink + AsyncWriteExt + Unpin,
603        B: AsRef<[u8]> + Send + 'static,
604        F: Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
605        H: SinkWriteErrorHandler,
606    {
607        spawn_consumer_async(
608            stream_name,
609            subscription,
610            ParseLines::new(
611                options,
612                WriteLines::new(
613                    stream_name,
614                    write,
615                    write_options.into_error_handler(),
616                    mapper,
617                    mode,
618                ),
619            ),
620        )
621    }
622
623    #[derive(Debug)]
624    struct FailingWrite {
625        fail_after_successful_writes: usize,
626        error_kind: io::ErrorKind,
627        write_calls: usize,
628        bytes_written: usize,
629    }
630
631    impl FailingWrite {
632        fn new(fail_after_successful_writes: usize, error_kind: io::ErrorKind) -> Self {
633            Self {
634                fail_after_successful_writes,
635                error_kind,
636                write_calls: 0,
637                bytes_written: 0,
638            }
639        }
640    }
641
642    impl AsyncWrite for FailingWrite {
643        fn poll_write(
644            mut self: Pin<&mut Self>,
645            _cx: &mut Context<'_>,
646            buf: &[u8],
647        ) -> Poll<io::Result<usize>> {
648            self.write_calls += 1;
649            if self.write_calls > self.fail_after_successful_writes {
650                return Poll::Ready(Err(io::Error::new(
651                    self.error_kind,
652                    "injected write failure",
653                )));
654            }
655
656            self.bytes_written += buf.len();
657            Poll::Ready(Ok(buf.len()))
658        }
659
660        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
661            Poll::Ready(Ok(()))
662        }
663
664        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
665            Poll::Ready(Ok(()))
666        }
667    }
668
669    #[derive(Default)]
670    struct SendOnlyWrite {
671        bytes: Vec<u8>,
672        write_calls: Cell<usize>,
673    }
674
675    impl AsyncWrite for SendOnlyWrite {
676        fn poll_write(
677            mut self: Pin<&mut Self>,
678            _cx: &mut Context<'_>,
679            buf: &[u8],
680        ) -> Poll<io::Result<usize>> {
681            self.write_calls.set(self.write_calls.get() + 1);
682            self.bytes.extend_from_slice(buf);
683            Poll::Ready(Ok(buf.len()))
684        }
685
686        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
687            Poll::Ready(Ok(()))
688        }
689
690        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
691            Poll::Ready(Ok(()))
692        }
693    }
694
695    #[tokio::test]
696    async fn chunk_writer_reports_and_can_handle_sink_write_errors() {
697        let collector = collect_chunks_into_write(
698            "custom",
699            event_receiver(vec![
700                StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
701                StreamEvent::Eof,
702            ])
703            .await,
704            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
705            WriteCollectionOptions::fail_fast(),
706        );
707
708        match collector.wait().await {
709            Ok(Err(err)) => {
710                assert_that!(err.stream_name()).is_equal_to("custom");
711                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
712            }
713            other => {
714                assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
715            }
716        }
717
718        let handled_count = Arc::new(Mutex::new(0_usize));
719        let count_for_handler = Arc::clone(&handled_count);
720        let collector = collect_chunks_into_write(
721            "custom",
722            event_receiver(vec![
723                StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
724                StreamEvent::Eof,
725            ])
726            .await,
727            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
728            WriteCollectionOptions::with_error_handler(move |err| {
729                assert_that!(err.stream_name()).is_equal_to("custom");
730                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
731                *count_for_handler.lock().unwrap() += 1;
732                SinkWriteErrorAction::Continue
733            }),
734        );
735
736        let write = collector.wait().await.unwrap().unwrap();
737        assert_that!(write.bytes_written).is_equal_to(0);
738        assert_that!(*handled_count.lock().unwrap()).is_equal_to(1);
739    }
740
741    #[tokio::test]
742    async fn line_writer_reports_line_and_delimiter_write_errors() {
743        let line_error = collect_lines_into_write(
744            "custom",
745            event_receiver(vec![
746                StreamEvent::Chunk(Chunk(Bytes::from_static(b"line\n"))),
747                StreamEvent::Eof,
748            ])
749            .await,
750            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
751            LineParsingOptions::default(),
752            LineWriteMode::AppendLf,
753            WriteCollectionOptions::fail_fast(),
754        )
755        .wait()
756        .await;
757        match line_error {
758            Ok(Err(err)) => {
759                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
760            }
761            other => {
762                assert_that!(&other).fail(format_args!("expected line write error, got {other:?}"));
763            }
764        }
765
766        let delimiter_error = collect_lines_into_write(
767            "custom",
768            event_receiver(vec![
769                StreamEvent::Chunk(Chunk(Bytes::from_static(b"line\n"))),
770                StreamEvent::Eof,
771            ])
772            .await,
773            FailingWrite::new(1, io::ErrorKind::WriteZero),
774            LineParsingOptions::default(),
775            LineWriteMode::AppendLf,
776            WriteCollectionOptions::fail_fast(),
777        )
778        .wait()
779        .await;
780        match delimiter_error {
781            Ok(Err(err)) => {
782                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::WriteZero);
783            }
784            other => {
785                assert_that!(&other).fail(format_args!(
786                    "expected delimiter write error, got {other:?}"
787                ));
788            }
789        }
790    }
791
792    #[tokio::test]
793    async fn line_writer_respects_requested_delimiter_mode() {
794        let collector = collect_lines_into_write(
795            "custom",
796            event_receiver(vec![
797                StreamEvent::Chunk(Chunk(Bytes::from_static(
798                    b"Cargo.lock\nCargo.toml\nREADME.md\nsrc\ntarget\n",
799                ))),
800                StreamEvent::Eof,
801            ])
802            .await,
803            SendOnlyWrite::default(),
804            LineParsingOptions::default(),
805            LineWriteMode::AsIs,
806            WriteCollectionOptions::fail_fast(),
807        );
808
809        let writer = collector.wait().await.unwrap().unwrap();
810        assert_that!(writer.bytes).is_equal_to(b"Cargo.lockCargo.tomlREADME.mdsrctarget".to_vec());
811    }
812
813    #[tokio::test]
814    async fn chunk_writer_accepts_send_only_writer() {
815        let collector = collect_chunks_into_write(
816            "custom",
817            event_receiver(vec![
818                StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
819                StreamEvent::Chunk(Chunk(Bytes::from_static(b"def"))),
820                StreamEvent::Eof,
821            ])
822            .await,
823            SendOnlyWrite::default(),
824            WriteCollectionOptions::fail_fast(),
825        );
826
827        let writer = collector.wait().await.unwrap().unwrap();
828        assert_that!(writer.bytes).is_equal_to(b"abcdef".to_vec());
829        assert_that!(writer.write_calls.get()).is_greater_than(0);
830    }
831
832    #[tokio::test]
833    async fn chunk_writer_mapped_writes_mapped_output() {
834        let collector = collect_chunks_into_write_mapped(
835            "custom",
836            event_receiver(vec![
837                StreamEvent::Chunk(Chunk(Bytes::from_static(b"Cargo.lock\n"))),
838                StreamEvent::Chunk(Chunk(Bytes::from_static(b"Cargo.toml\n"))),
839                StreamEvent::Eof,
840            ])
841            .await,
842            SendOnlyWrite::default(),
843            |chunk| String::from_utf8_lossy(chunk.as_ref()).to_string(),
844            WriteCollectionOptions::fail_fast(),
845        );
846
847        let writer = collector.wait().await.unwrap().unwrap();
848        assert_that!(writer.bytes).is_equal_to(b"Cargo.lock\nCargo.toml\n".to_vec());
849    }
850
851    #[tokio::test]
852    async fn mapped_writers_return_sink_write_errors() {
853        let chunk_error = collect_chunks_into_write_mapped(
854            "custom",
855            event_receiver(vec![
856                StreamEvent::Chunk(Chunk(Bytes::from_static(b"abc"))),
857                StreamEvent::Eof,
858            ])
859            .await,
860            FailingWrite::new(0, io::ErrorKind::ConnectionReset),
861            |chunk| chunk,
862            WriteCollectionOptions::fail_fast(),
863        )
864        .wait()
865        .await;
866        match chunk_error {
867            Ok(Err(err)) => {
868                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::ConnectionReset);
869            }
870            other => {
871                assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
872            }
873        }
874
875        let line_error = collect_lines_into_write_mapped(
876            "custom",
877            event_receiver(vec![
878                StreamEvent::Chunk(Chunk(Bytes::from_static(b"one\n"))),
879                StreamEvent::Eof,
880            ])
881            .await,
882            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
883            |line| line.into_owned().into_bytes(),
884            LineParsingOptions::default(),
885            LineWriteMode::AsIs,
886            WriteCollectionOptions::fail_fast(),
887        )
888        .wait()
889        .await;
890        match line_error {
891            Ok(Err(err)) => {
892                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
893            }
894            other => {
895                assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
896            }
897        }
898    }
899
900    #[tokio::test]
901    async fn line_write_error_handler_can_continue_after_sink_write_errors() {
902        let events = Arc::new(Mutex::new(Vec::new()));
903        let handled_events = Arc::clone(&events);
904        let collector = collect_lines_into_write(
905            "custom",
906            event_receiver(vec![
907                StreamEvent::Chunk(Chunk(Bytes::from_static(b"a\nb\n"))),
908                StreamEvent::Eof,
909            ])
910            .await,
911            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
912            LineParsingOptions::default(),
913            LineWriteMode::AppendLf,
914            WriteCollectionOptions::with_error_handler(move |err| {
915                handled_events.lock().unwrap().push((
916                    err.stream_name(),
917                    err.operation(),
918                    err.attempted_len(),
919                    err.source().kind(),
920                ));
921                SinkWriteErrorAction::Continue
922            }),
923        );
924
925        let write = collector.wait().await.unwrap().unwrap();
926        assert_that!(write.bytes_written).is_equal_to(0);
927        assert_that!(events.lock().unwrap().as_slice()).is_equal_to([
928            (
929                "custom",
930                SinkWriteOperation::Line,
931                1,
932                io::ErrorKind::BrokenPipe,
933            ),
934            (
935                "custom",
936                SinkWriteOperation::Line,
937                1,
938                io::ErrorKind::BrokenPipe,
939            ),
940        ]);
941    }
942
943    #[tokio::test]
944    async fn chunk_write_error_handler_can_continue_then_stop() {
945        let handled_count = Arc::new(Mutex::new(0_usize));
946        let count_for_handler = Arc::clone(&handled_count);
947        let collector = collect_chunks_into_write(
948            "custom",
949            event_receiver(vec![
950                StreamEvent::Chunk(Chunk(Bytes::from_static(b"a"))),
951                StreamEvent::Chunk(Chunk(Bytes::from_static(b"b"))),
952                StreamEvent::Eof,
953            ])
954            .await,
955            FailingWrite::new(0, io::ErrorKind::BrokenPipe),
956            WriteCollectionOptions::with_error_handler(move |err| {
957                assert_that!(err.operation()).is_equal_to(SinkWriteOperation::Chunk);
958                let mut count = count_for_handler.lock().unwrap();
959                *count += 1;
960                if *count == 1 {
961                    SinkWriteErrorAction::Continue
962                } else {
963                    SinkWriteErrorAction::Stop
964                }
965            }),
966        );
967
968        match collector.wait().await {
969            Ok(Err(err)) => {
970                assert_that!(err.source().kind()).is_equal_to(io::ErrorKind::BrokenPipe);
971            }
972            other => {
973                assert_that!(&other).fail(format_args!("expected sink write error, got {other:?}"));
974            }
975        }
976        assert_that!(*handled_count.lock().unwrap()).is_equal_to(2);
977    }
978}