use async_trait::async_trait;
use oxidizedgraph::prelude::*;
use std::sync::Arc;
use std::time::Duration;
struct WorkerNode {
id: String,
work_duration_ms: u64,
next: Option<String>,
}
#[async_trait]
impl NodeExecutor for WorkerNode {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, state: SharedState) -> Result<NodeOutput, NodeError> {
tokio::time::sleep(Duration::from_millis(self.work_duration_ms)).await;
{
let mut guard = state
.write()
.map_err(|e| NodeError::execution_failed(e.to_string()))?;
let count = guard.get_context::<i32>("processed").unwrap_or(0);
guard.set_context("processed", count + 1);
}
match &self.next {
Some(target) => Ok(NodeOutput::continue_to(target.clone())),
None => Ok(NodeOutput::finish()),
}
}
fn description(&self) -> Option<&str> {
Some("Simulates work by sleeping")
}
}
struct TimingHandler {
start_times: std::sync::RwLock<std::collections::HashMap<String, std::time::Instant>>,
}
impl TimingHandler {
fn new() -> Self {
Self {
start_times: std::sync::RwLock::new(std::collections::HashMap::new()),
}
}
}
#[async_trait]
impl EventHandler for TimingHandler {
async fn handle(&self, event: &Event) {
match &event.kind {
EventKind::Node(NodeEvent::Entered { node_id, .. }) => {
if let Ok(mut times) = self.start_times.write() {
times.insert(node_id.clone(), std::time::Instant::now());
}
println!("⏱️ Started: {}", node_id);
}
EventKind::Node(NodeEvent::Exited { node_id, duration_ms, .. }) => {
println!("✅ Finished: {} ({}ms)", node_id, duration_ms);
}
EventKind::Graph(GraphEvent::Completed { iterations, duration_ms }) => {
println!("\n📊 Graph completed:");
println!(" Iterations: {}", iterations);
println!(" Total time: {}ms", duration_ms);
}
_ => {}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let graph = GraphBuilder::new()
.name("worker_pipeline")
.description("Simulated work pipeline with metrics")
.add_node(WorkerNode {
id: "fetch".to_string(),
work_duration_ms: 50,
next: Some("process".to_string()),
})
.add_node(WorkerNode {
id: "process".to_string(),
work_duration_ms: 100,
next: Some("transform".to_string()),
})
.add_node(WorkerNode {
id: "transform".to_string(),
work_duration_ms: 75,
next: Some("save".to_string()),
})
.add_node(WorkerNode {
id: "save".to_string(),
work_duration_ms: 25,
next: None,
})
.set_entry_point("fetch")
.compile()?;
println!("=== Streaming Events Example ===\n");
println!("Graph:\n{}", graph.to_mermaid());
let bus = Arc::new(EventBus::new());
let timing_handler = Arc::new(TimingHandler::new());
let timing_receiver = bus.subscribe();
let timing_handle = spawn_handler(timing_handler, timing_receiver);
let metrics = Arc::new(MetricsHandler::new());
let metrics_receiver = bus.subscribe();
let metrics_handle = spawn_handler(metrics.clone(), metrics_receiver);
let runner = StreamingRunner::new(graph, bus.clone());
println!("\n--- Execution Log ---\n");
let result = runner.invoke("demo-thread", AgentState::new()).await?;
tokio::time::sleep(Duration::from_millis(50)).await;
let summary = metrics.summary();
println!("\n--- Metrics Summary ---\n");
println!("Total events: {}", summary.total_events);
println!("Graphs completed: {}", summary.graphs_completed);
println!("\nNode Performance:");
for (node_id, node_metrics) in &summary.nodes {
println!(
" {}: {} executions, avg {:.1}ms",
node_id,
node_metrics.executions,
node_metrics.avg_duration_ms.unwrap_or(0.0)
);
}
println!("\n--- Final State ---");
println!("Items processed: {}", result.state().get_context::<i32>("processed").unwrap_or(0));
drop(bus); let _ = timing_handle.await;
let _ = metrics_handle.await;
Ok(())
}