use async_trait::async_trait;
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use streamweave::graph;
use streamweave::graph::Graph;
use streamweave::node::{InputStreams, Node, NodeExecutionError, OutputStreams};
use tokio_stream::{Stream, StreamExt};
struct TransformNode {
name: String,
input_port_names: Vec<String>,
output_port_names: Vec<String>,
}
impl TransformNode {
fn new(name: String) -> Self {
Self {
name,
input_port_names: vec!["in".to_string()],
output_port_names: vec!["out".to_string()],
}
}
}
#[async_trait]
impl Node for TransformNode {
fn name(&self) -> &str {
&self.name
}
fn set_name(&mut self, name: &str) {
self.name = name.to_string();
}
fn input_port_names(&self) -> &[String] {
&self.input_port_names
}
fn output_port_names(&self) -> &[String] {
&self.output_port_names
}
fn has_input_port(&self, name: &str) -> bool {
name == "in"
}
fn has_output_port(&self, name: &str) -> bool {
name == "out"
}
fn execute(
&self,
mut inputs: InputStreams,
) -> Pin<
Box<dyn std::future::Future<Output = Result<OutputStreams, NodeExecutionError>> + Send + '_>,
> {
Box::pin(async move {
let input_stream = inputs.remove("in").ok_or("Missing 'in' input")?;
let mut outputs = HashMap::new();
outputs.insert(
"out".to_string(),
Box::pin(async_stream::stream! {
let mut input = input_stream;
while let Some(item) = input.next().await {
yield item;
}
}) as Pin<Box<dyn Stream<Item = Arc<dyn Any + Send + Sync>> + Send>>,
);
Ok(outputs)
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Graph I/O Patterns Example");
println!("===========================");
println!("\nThis example demonstrates graph-level input and output ports.");
println!("\n1. Graph input with initial value:");
println!(" graph.config: 42i32 => transform.in");
let mut graph1: Graph = graph! {
transform: TransformNode::new("transform".to_string()),
graph.config: 42i32 => transform.in
};
graph1
.expose_output_port("transform", "out", "result")
.map_err(|e| format!("Failed to expose output: {}", e))?;
println!(" ✓ Graph created with input port 'config' (value: 42)");
println!(" ✓ Graph has output port 'result'");
assert!(graph1.has_input_port("config"));
assert!(graph1.has_output_port("result"));
println!("\n2. Graph input without value:");
println!(" graph.input => transform.in");
let mut graph2: Graph = graph! {
transform: TransformNode::new("transform".to_string()),
graph.input => transform.in
};
graph2
.expose_output_port("transform", "out", "output")
.map_err(|e| format!("Failed to expose output: {}", e))?;
println!(" ✓ Graph created with input port 'input' (no initial value)");
println!(" ✓ Graph has output port 'output'");
assert!(graph2.has_input_port("input"));
assert!(graph2.has_output_port("output"));
println!("\n3. Graph output:");
println!(" transform.out => graph.result");
let graph3: Graph = graph! {
transform: TransformNode::new("transform".to_string()),
transform.out => graph.result
};
println!(" ✓ Graph created with output port 'result'");
assert!(graph3.has_output_port("result"));
println!("\n✓ All graph I/O patterns demonstrated successfully!");
println!("\nNote: Graph inputs with values send data via channels when the graph is built.");
println!("Graph inputs without values require external channels to be connected at runtime.");
Ok(())
}