1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
//! # Unified Node Architecture - Pure Stream Implementation
//!
//! This module defines the core `Node` trait for StreamWeave's unified streaming architecture.
//! All streaming operations (sources, transforms, sinks) are implemented as Nodes with
//! zero-copy data passing via `Arc<T>`.
//!
//! ## Design Principles
//!
//! - **Unified Interface**: One `Node` trait for all operations (sources, transforms, sinks)
//! - **Zero-Copy**: All data flows as `Arc<T>` for efficient sharing
//! - **Stream-Based**: Nodes consume and produce streams - no channels exposed
//! - **Pure Functional**: Stream composition enables clean, functional programming style
//!
//! ## Node Types
//!
//! Nodes can have different port configurations:
//!
//! - **Source Nodes**: 0 inputs, 1+ outputs (generate data)
//! - **Transform Nodes**: 1+ inputs, 1+ outputs (process data)
//! - **Sink Nodes**: 1+ inputs, 0 outputs (consume data)
//!
//! ## Zero-Copy Architecture
//!
//! All data flows as `Arc<T>`:
//!
//! - **Single Output**: Data moved directly (zero-cost move of Arc)
//! - **Fan-Out**: Data shared via `Arc::clone()` (atomic refcount increment, ~1-2ns)
//! - **No Serialization**: Direct in-process passing, no overhead
//! - **No Wrapping**: Raw payload types, no message envelopes
//!
//! ## Stream-Based Port System
//!
//! All nodes work with streams:
//!
//! - Input ports: `HashMap<String, InputStream>` where `InputStream = Pin<Box<dyn Stream<Item = Arc<dyn Any + Send + Sync>> + Send>>`
//! - Output ports: `HashMap<String, OutputStream>` where `OutputStream = Pin<Box<dyn Stream<Item = Arc<dyn Any + Send + Sync>> + Send>>`
//! - Nodes consume input streams and produce output streams
//! - Graph execution engine connects streams between nodes
//! - Channels are used internally for backpressure, but never exposed to nodes
//!
//! ## Execution Model
//!
//! The graph execution engine:
//! 1. Collects input streams for each node from connected upstream nodes
//! 2. Calls `execute(inputs)` which returns output streams
//! 3. Connects output streams to downstream nodes' input streams
//! 4. Drives all streams to completion
//!
//! Nodes process streams functionally - no channels, no Mutex locking, pure stream composition.
//!
//! ## Example
//!
//! ```rust,no_run
//! use streamweave::node::Node;
//! use std::sync::Arc;
//! use std::collections::HashMap;
//! use tokio_stream::StreamExt;
//! use async_stream::stream;
//!
//! // Transform node that doubles integers
//! struct DoubleNode {
//! name: String,
//! }
//!
//! impl Node for DoubleNode {
//! fn name(&self) -> &str { &self.name }
//! fn set_name(&mut self, name: &str) { self.name = name.to_string(); }
//! fn input_port_names(&self) -> &[String] { &["in".to_string()] }
//! fn output_port_names(&self) -> &[String] { &["out".to_string()] }
//! fn has_input_port(&self, name: &str) -> bool { name == "in" }
//! fn has_output_port(&self, name: &str) -> bool { name == "out" }
//!
//! fn execute(
//! &self,
//! mut inputs: InputStreams,
//! ) -> Pin<Box<dyn Future<Output = Result<OutputStreams, NodeExecutionError>> + Send + '_>> {
//! Box::pin(async move {
//! let input_stream = inputs.remove("in")
//! .ok_or("Missing 'in' input")?;
//!
//! let output_stream: OutputStream = Box::pin(stream! {
//! let mut input = input_stream;
//! while let Some(item) = input.next().await {
//! if let Ok(arc_i32) = item.clone().downcast::<i32>() {
//! let doubled = *arc_i32 * 2;
//! yield Arc::new(doubled) as Arc<dyn Any + Send + Sync>;
//! }
//! }
//! });
//!
//! let mut outputs = HashMap::new();
//! outputs.insert("out".to_string(), output_stream);
//! Ok(outputs)
//! })
//! }
//! }
//! ```
use async_trait;
use Any;
use HashMap;
use Future;
use Pin;
use Arc;
use Stream;
/// Type alias for input streams.
///
/// Input streams are pinned, boxed streams that yield `Arc<dyn Any + Send + Sync>` items.
/// Nodes consume these streams to process data.
pub type InputStream = ;
/// Type alias for output streams.
///
/// Output streams are pinned, boxed streams that yield `Arc<dyn Any + Send + Sync>` items.
/// Nodes produce these streams as their output.
pub type OutputStream = ;
/// Type alias for a collection of input streams, keyed by port name.
pub type InputStreams = ;
/// Type alias for a collection of output streams, keyed by port name.
pub type OutputStreams = ;
/// Error type for node execution operations.
pub type NodeExecutionError = ;
/// The unified Node trait for all streaming operations.
///
/// All nodes in StreamWeave implement this trait, whether they are sources,
/// transforms, or sinks. The `execute` method consumes input streams and
/// produces output streams.
///
/// # Stream-Based Architecture
///
/// Nodes work purely with streams:
///
/// - **Input**: `HashMap<String, InputStream>` - named input streams
/// - **Output**: `HashMap<String, OutputStream>` - named output streams
/// - **No Channels**: Channels are used internally by the graph execution engine
/// for backpressure, but nodes never see them
/// - **No Mutex**: Streams are async-native, no locking needed
///
/// # Execution Model
///
/// 1. Graph execution engine collects input streams for a node from upstream nodes
/// 2. Calls `execute(inputs)` with those streams
/// 3. Node processes streams and returns output streams
/// 4. Graph execution engine connects output streams to downstream nodes
///
/// # Port Naming Convention
///
/// - Single input port: `"in"`
/// - Single output port: `"out"`
/// - Multiple ports: `"in_0"`, `"in_1"`, `"out_0"`, `"out_1"`, etc.
/// - Mandatory ports: `"configuration"` (input), `"error"` (output)
///
/// # Zero-Copy Execution
///
/// Data is passed as `Arc<T>` wrapped in `Arc<dyn Any + Send + Sync>`:
///
/// - **Single output**: `Arc::clone()` is cheap (atomic increment, ~1-2ns)
/// - **Fan-out**: Multiple `Arc::clone()` calls share the same data
/// - **No serialization**: Direct in-process passing
/// - **No wrapping**: Raw payload types, no message envelopes
///
/// # Type Safety
///
/// Nodes are responsible for downcasting to their expected types when receiving
/// data from input streams.