use futures::StreamExt;
use oris_runtime::graph::{
function_node, MessagesState, StateGraph, StreamChunk, StreamMode, END, START,
};
use oris_runtime::schemas::messages::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let node1 = function_node("node1", |_state: &MessagesState| async move {
use std::collections::HashMap;
let mut update = HashMap::new();
update.insert(
"messages".to_string(),
serde_json::to_value(vec![Message::new_ai_message("Hello from node1")])?,
);
Ok(update)
});
let node2 = function_node("node2", |_state: &MessagesState| async move {
use std::collections::HashMap;
let mut update = HashMap::new();
update.insert(
"messages".to_string(),
serde_json::to_value(vec![Message::new_ai_message("Hello from node2")])?,
);
Ok(update)
});
let mut graph = StateGraph::<MessagesState>::new();
graph.add_node("node1", node1)?;
graph.add_node("node2", node2)?;
graph.add_edge(START, "node1");
graph.add_edge("node1", "node2");
graph.add_edge("node2", END);
let compiled = graph.compile()?;
let initial_state = MessagesState::with_messages(vec![Message::new_human_message("start")]);
println!("=== Example 1: Updates mode ===");
let mut stream = compiled.stream_with_mode(initial_state.clone(), StreamMode::Updates);
while let Some(chunk) = stream.next().await {
match chunk {
StreamChunk::Updates { node, update } => {
println!("Node: {}, Update: {:?}", node, update);
}
_ => {}
}
}
println!("\n=== Example 2: Values mode ===");
let mut stream = compiled.stream_with_mode(initial_state.clone(), StreamMode::Values);
while let Some(chunk) = stream.next().await {
match chunk {
StreamChunk::Values { state } => {
println!("State messages count: {}", state.messages.len());
}
_ => {}
}
}
println!("\n=== Example 3: Multiple modes ===");
let mut stream =
compiled.stream_with_modes(initial_state, vec![StreamMode::Updates, StreamMode::Values]);
while let Some((mode, chunk)) = stream.next().await {
println!("Mode: {:?}, Chunk: {:?}", mode, chunk.mode());
}
Ok(())
}