1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
//! Side-effect operator for debugging and troubleshooting streams.
//!
//! This module provides the [`tap`](TapExt::tap) operator that allows observing
//! stream items without modifying them, useful for debugging complex pipelines.
//!
//! # Overview
//!
//! The `tap` operator invokes a side-effect function for each value without
//! affecting the stream. It's the reactive equivalent of inserting `println!`
//! statements for debugging.
//!
//! # Basic Usage
//!
//! ```
//! use fluxion_stream::prelude::*;
//! use fluxion_test_utils::Sequenced;
//! use futures::StreamExt;
//!
//! # #[tokio::main]
//! # async fn main() {
//! let (tx, rx) = async_channel::unbounded();
//! let stream = rx.into_fluxion_stream();
//!
//! let mut tapped = stream
//! .tap(|value| println!("Observed: {:?}", value));
//!
//! tx.try_send(Sequenced::new(42)).unwrap();
//! drop(tx);
//!
//! // Value passes through unchanged
//! let item = tapped.next().await.unwrap();
//! assert_eq!(item.unwrap().into_inner(), 42);
//! # }
//! ```
//!
//! # Debugging Pipelines
//!
//! Insert `tap` at any point in a pipeline to observe intermediate values:
//!
//! ```
//! use fluxion_stream::prelude::*;
//! use fluxion_test_utils::Sequenced;
//! use futures::StreamExt;
//!
//! # #[tokio::main]
//! # async fn main() {
//! let (tx, rx) = async_channel::unbounded::<Sequenced<i32>>();
//! let stream = rx.into_fluxion_stream();
//!
//! let mut processed = stream
//! .tap(|v| println!("Before filter: {:?}", v))
//! .filter_ordered(|x| *x > 10)
//! .tap(|v| println!("After filter: {:?}", v))
//! .map_ordered(|x| Sequenced::new(x.into_inner() * 2))
//! .tap(|v| println!("Final value: {:?}", v));
//!
//! tx.try_send(Sequenced::new(42)).unwrap();
//! drop(tx);
//!
//! let result = processed.next().await.unwrap().unwrap();
//! assert_eq!(result.into_inner(), 84);
//! # }
//! ```
//!
//! # Error Handling
//!
//! The tap function is only called for values, not errors. Errors pass through
//! unchanged without invoking the tap function.
//!
//! # See Also
//!
//! - [`MapOrderedExt::map_ordered`](crate::MapOrderedExt::map_ordered) - Transform values
//! - [`FilterOrderedExt::filter_ordered`](crate::FilterOrderedExt::filter_ordered) - Filter values
//! - [`OnErrorExt::on_error`](crate::OnErrorExt::on_error) - Handle errors with side effects
pub use TapExt;
pub use TapExt;