use async_trait::async_trait;
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RequestState {
Start,
MetricsExtracted,
ResponseGenerated,
Complete,
}
#[derive(Clone)]
pub struct MetricsNode;
#[async_trait]
impl Node<RequestState> for MetricsNode {
type PrepResult = (String, String);
type ExecResult = (f64, String, u32);
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
let request_id: String = store.get("request_id")?;
let request_body: String = store.get("request_body")?;
println!("📊 MetricsNode: Processing request {}", request_id);
Ok((request_id, request_body))
}
async fn exec(&self, (request_id, request_body): Self::PrepResult) -> Self::ExecResult {
println!(
"🔍 MetricsNode: Extracting metrics from request {}",
request_id
);
tokio::time::sleep(Duration::from_millis(50)).await;
let (revenue, customer_id, transaction_count) = if request_body.contains("purchase") {
(99.99, "cust_12345".to_string(), 1)
} else if request_body.contains("subscription") {
(29.99, "cust_67890".to_string(), 1)
} else {
(0.0, "anonymous".to_string(), 0)
};
println!(
"💰 MetricsNode: Extracted metrics - Revenue: ${:.2}, Customer: {}, Transactions: {}",
revenue, customer_id, transaction_count
);
(revenue, customer_id, transaction_count)
}
async fn post(
&self,
store: &MemoryStore,
(revenue, customer_id, transaction_count): Self::ExecResult,
) -> Result<RequestState, CanoError> {
store.put("revenue", revenue)?;
store.put("customer_id", customer_id)?;
store.put("transaction_count", transaction_count)?;
let processing_time_ms = if revenue > 50.0 { 150u64 } else { 75u64 };
store.put("processing_time_ms", processing_time_ms)?;
println!("✅ MetricsNode: Stored metrics in shared store");
Ok(RequestState::MetricsExtracted)
}
}
#[derive(Clone)]
pub struct ResponseNode;
#[async_trait]
impl Node<RequestState> for ResponseNode {
type PrepResult = (f64, String, u32, u64);
type ExecResult = (String, u16);
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
let revenue: f64 = store.get("revenue")?;
let customer_id: String = store.get("customer_id")?;
let transaction_count: u32 = store.get("transaction_count")?;
let processing_time_ms: u64 = store.get("processing_time_ms")?;
println!("📖 ResponseNode: Read metrics from shared store");
Ok((revenue, customer_id, transaction_count, processing_time_ms))
}
async fn exec(
&self,
(revenue, customer_id, transaction_count, processing_time_ms): Self::PrepResult,
) -> Self::ExecResult {
println!("✏️ ResponseNode: Generating response");
tokio::time::sleep(Duration::from_millis(30)).await;
let (response_message, status_code) = if revenue > 0.0 {
let message = format!(
"Transaction successful! Customer {} processed {} transaction(s) totaling ${:.2}. Processing time: {}ms",
customer_id, transaction_count, revenue, processing_time_ms
);
(message, 200u16)
} else {
let message = format!(
"No transactions processed for customer {}. Processing time: {}ms",
customer_id, processing_time_ms
);
(message, 200u16)
};
println!(
"📝 ResponseNode: Generated response with status {}",
status_code
);
(response_message, status_code)
}
async fn post(
&self,
store: &MemoryStore,
(response_message, status_code): Self::ExecResult,
) -> Result<RequestState, CanoError> {
store.put("response_message", response_message.clone())?;
store.put("status_code", status_code)?;
println!("✅ ResponseNode: Stored response in shared store");
println!("📤 Response: {}", response_message);
Ok(RequestState::Complete)
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("🚀 Workflow Stack Store Example");
println!("=================================\n");
println!("This example demonstrates how nodes communicate through MemoryStore\n");
{
println!("📋 Example 1: Purchase Request");
println!("-------------------------------");
let store = MemoryStore::new();
store.put("request_id", "req_001".to_string())?;
store.put("request_body", "purchase: laptop".to_string())?;
let workflow = Workflow::new(store.clone())
.register(RequestState::Start, MetricsNode)
.register(RequestState::MetricsExtracted, ResponseNode)
.add_exit_state(RequestState::Complete);
println!("\n🎬 Starting workflow...\n");
let final_state = workflow.orchestrate(RequestState::Start).await?;
println!("\n📊 Final Results:");
println!(" State: {:?}", final_state);
println!(
" Revenue: ${:.2}",
store.get::<f64>("revenue").unwrap_or(0.0)
);
println!(
" Customer: {}",
store.get::<String>("customer_id").unwrap_or_default()
);
println!(
" Response: {}",
store.get::<String>("response_message").unwrap_or_default()
);
println!();
}
{
println!("📋 Example 2: Subscription Request");
println!("-----------------------------------");
let store = MemoryStore::new();
store.put("request_id", "req_002".to_string())?;
store.put("request_body", "subscription: premium plan".to_string())?;
let workflow = Workflow::new(store.clone())
.register(RequestState::Start, MetricsNode)
.register(RequestState::MetricsExtracted, ResponseNode)
.add_exit_state(RequestState::Complete);
println!("\n🎬 Starting workflow...\n");
let final_state = workflow.orchestrate(RequestState::Start).await?;
println!("\n📊 Final Results:");
println!(" State: {:?}", final_state);
println!(
" Revenue: ${:.2}",
store.get::<f64>("revenue").unwrap_or(0.0)
);
println!(
" Customer: {}",
store.get::<String>("customer_id").unwrap_or_default()
);
println!(
" Response: {}",
store.get::<String>("response_message").unwrap_or_default()
);
println!();
}
{
println!("📋 Example 3: Unknown Request Type");
println!("-----------------------------------");
let store = MemoryStore::new();
store.put("request_id", "req_003".to_string())?;
store.put("request_body", "query: account balance".to_string())?;
let workflow = Workflow::new(store.clone())
.register(RequestState::Start, MetricsNode)
.register(RequestState::MetricsExtracted, ResponseNode)
.add_exit_state(RequestState::Complete);
println!("\n🎬 Starting workflow...\n");
let final_state = workflow.orchestrate(RequestState::Start).await?;
println!("\n📊 Final Results:");
println!(" State: {:?}", final_state);
println!(
" Revenue: ${:.2}",
store.get::<f64>("revenue").unwrap_or(0.0)
);
println!(
" Customer: {}",
store.get::<String>("customer_id").unwrap_or_default()
);
println!(
" Response: {}",
store.get::<String>("response_message").unwrap_or_default()
);
println!();
}
println!("✅ Workflow Stack Store example completed successfully!");
println!("\n💡 Key Takeaways:");
println!(" • MemoryStore provides shared state between workflow nodes");
println!(" • Nodes can read data stored by previous nodes");
println!(" • Type-safe get/put operations ensure data consistency");
println!(" • Clean separation of concerns between nodes");
Ok(())
}