1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! # Drop Node
//!
//! A transform node that drops all items from the input stream.
//!
//! ## Ports
//!
//! - **Input**: `"configuration"` - Receives configuration (currently unused, for consistency)
//! - **Input**: `"in"` - Receives data items
//! - **Output**: `"out"` - No items are sent (stream is empty)
//! - **Output**: `"error"` - Sends errors that occur during processing
//!
//! ## Behavior
//!
//! The node consumes all items from the input stream but does not forward any items to the output.
//! This is useful for consuming streams without processing them, or for testing purposes.
//! The output stream will be empty.
use crate::node::{InputStreams, Node, NodeExecutionError, OutputStreams};
use crate::nodes::common::BaseNode;
use async_trait::async_trait;
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
/// A node that drops all items from the input stream.
///
/// The node consumes all items from the "in" port but does not forward any items
/// to the "out" port. The output stream will be empty.
pub struct DropNode {
/// Base node functionality.
pub(crate) base: BaseNode,
}
impl DropNode {
/// Creates a new DropNode with the given name.
///
/// # Arguments
///
/// * `name` - The name of the node.
///
/// # Returns
///
/// A new `DropNode` instance.
///
/// # Example
///
/// ```rust,no_run
/// use streamweave::nodes::stream::DropNode;
///
/// let node = DropNode::new("drop".to_string());
/// // Creates ports: configuration, in → out, error
/// ```
pub fn new(name: String) -> Self {
Self {
base: BaseNode::new(
name,
vec!["configuration".to_string(), "in".to_string()],
vec!["out".to_string(), "error".to_string()],
),
}
}
}
#[async_trait]
impl Node for DropNode {
fn name(&self) -> &str {
self.base.name()
}
fn set_name(&mut self, name: &str) {
self.base.set_name(name);
}
fn input_port_names(&self) -> &[String] {
self.base.input_port_names()
}
fn output_port_names(&self) -> &[String] {
self.base.output_port_names()
}
fn has_input_port(&self, name: &str) -> bool {
self.base.has_input_port(name)
}
fn has_output_port(&self, name: &str) -> bool {
self.base.has_output_port(name)
}
fn execute(
&self,
mut inputs: InputStreams,
) -> Pin<
Box<dyn std::future::Future<Output = Result<OutputStreams, NodeExecutionError>> + Send + '_>,
> {
Box::pin(async move {
// Extract input streams
let _config_stream = inputs.remove("configuration");
let in_stream = inputs.remove("in").ok_or("Missing 'in' input")?;
// Create output channels
let (_out_tx, out_rx) = tokio::sync::mpsc::channel(10);
let (error_tx, error_rx) = tokio::sync::mpsc::channel(10);
// Process the input stream and drop all items
let _error_tx_clone = error_tx.clone();
tokio::spawn(async move {
let mut in_stream = in_stream;
// Consume all items but don't forward them
while let Some(_item) = in_stream.next().await {
// Drop the item
}
// Stream ends, output stream is already empty
});
// Convert channels to streams
let mut outputs = HashMap::new();
outputs.insert(
"out".to_string(),
Box::pin(ReceiverStream::new(out_rx))
as Pin<Box<dyn tokio_stream::Stream<Item = Arc<dyn Any + Send + Sync>> + Send>>,
);
outputs.insert(
"error".to_string(),
Box::pin(ReceiverStream::new(error_rx))
as Pin<Box<dyn tokio_stream::Stream<Item = Arc<dyn Any + Send + Sync>> + Send>>,
);
Ok(outputs)
})
}
}