1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//! Sink Effect for streaming output with constant memory.
//!
//! The Sink Effect enables streaming logs, metrics, or events during computation
//! without accumulating them in memory, unlike the Writer Effect which collects
//! all writes until execution completes.
//!
//! # Overview
//!
//! Where Writer Effect accumulates all output:
//!
//! ```rust,ignore
//! // Writer Effect - O(n) memory, all logs collected
//! let (result, logs) = traverse_writer(million_items, process)
//! .run_writer(&env).await;
//! // logs now contains 1M entries in memory
//! ```
//!
//! Sink Effect streams immediately with constant memory:
//!
//! ```rust
//! use stillwater::effect::sink::prelude::*;
//!
//! # tokio_test::block_on(async {
//! let items = vec![1, 2, 3];
//! let effect = traverse_sink(items, |n| {
//! emit::<_, String, ()>(format!("Processing: {}", n))
//! .map(move |_| n * 10)
//! });
//!
//! // Stream to console - O(1) memory regardless of item count
//! let result = effect.run_with_sink(&(), |log| async move {
//! println!("{}", log);
//! }).await;
//!
//! assert_eq!(result, Ok(vec![10, 20, 30]));
//! # });
//! ```
//!
//! # Key Features
//!
//! - **Constant memory**: Items streamed immediately, not accumulated
//! - **Real-time output**: Logs appear as they happen, not after completion
//! - **Flexible sinks**: Write to stdout, files, channels, databases
//! - **Testable**: `run_collecting` bridges to Writer semantics for assertions
//! - **Async sinks**: Support for async I/O operations
//!
//! # When to Use
//!
//! | Trait | Purpose | Memory | Best For |
//! |-------|---------|--------|----------|
//! | WriterEffect | Pure accumulation | O(n) | Testing, short chains, audit trails |
//! | SinkEffect | Streaming output | O(1) | Production, high-volume, real-time logs |
//!
//! # Module Structure
//!
//! - [`SinkEffect`] - Core trait extending Effect with streaming
//! - [`SinkEffectExt`] - Extension trait providing combinator methods
//! - [`emit()`], [`emit_many`] - Functions to emit items
//! - [`into_sink()`] - Lift regular Effects into SinkEffect
//!
//! # Example: Testing vs Production
//!
//! ```rust
//! use stillwater::effect::sink::prelude::*;
//!
//! # tokio_test::block_on(async {
//! let effect = emit::<_, String, ()>("step 1".to_string())
//! .and_then(|_| emit("step 2".to_string()))
//! .and_then(|_| emit("step 3".to_string()))
//! .map(|_| "done");
//!
//! // Testing: collect for assertions
//! let (result, logs) = effect.run_collecting(&()).await;
//! assert_eq!(result, Ok("done"));
//! assert_eq!(logs, vec!["step 1", "step 2", "step 3"]);
//! # });
//! ```
// Re-export core trait
pub use SinkEffect;
// Re-export extension trait
pub use SinkEffectExt;
// Re-export constructors
pub use ;
// Re-export lifting function
pub use ;
// Re-export combinator types
pub use SinkAndThen;
pub use SinkMap;
pub use SinkMapErr;
pub use SinkOrElse;
pub use TapEmit;
pub use SinkZip;
// Re-export boxed types
pub use BoxedSinkEffect;
// Re-export collection combinators
pub use ;