use async_trait::async_trait;
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum AdExchangeState {
Start,
GatherContext,
RequestBids,
ScoreBids,
RunAuction,
TrackResults,
ErrorTracking,
BuildResponse,
InvalidResponse,
Complete,
Rejected,
NoFill,
}
#[derive(Debug, Clone)]
struct AdRequest {
request_id: String,
placement_id: String,
floor_price: f64,
}
#[derive(Debug, Clone)]
struct UserContext {}
#[derive(Debug, Clone)]
struct GeoContext {}
#[derive(Debug, Clone)]
struct DeviceContext {}
#[derive(Debug, Clone)]
struct BidResponse {
partner_id: String,
price: f64,
creative_id: String,
response_time_ms: u64,
}
#[derive(Debug, Clone)]
struct ScoredBid {
bid: BidResponse,
score: f64, rank: usize,
}
#[derive(Debug, Clone)]
struct AuctionResult {
winner: Option<ScoredBid>,
total_bids: usize,
auction_time_ms: u64,
}
#[derive(Clone)]
struct ValidateRequestTask;
#[async_trait]
impl Task<AdExchangeState> for ValidateRequestTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
let request: AdRequest = store.get("ad_request")?;
println!("🔍 Validating request {}", request.request_id);
if request.placement_id.is_empty() {
println!("❌ Invalid placement ID");
return Ok(TaskResult::Single(AdExchangeState::InvalidResponse));
}
if request.floor_price < 0.01 {
println!("❌ Floor price too low");
return Ok(TaskResult::Single(AdExchangeState::InvalidResponse));
}
println!("✅ Request validated");
Ok(TaskResult::Single(AdExchangeState::GatherContext))
}
}
#[derive(Clone)]
enum ContextTask {
FetchUser,
FetchGeo,
DetectDevice,
}
#[async_trait]
impl Task<AdExchangeState> for ContextTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
ContextTask::FetchUser => {
println!(" 👤 Fetching user profile...");
tokio::time::sleep(Duration::from_millis(50)).await;
let user = UserContext {};
store.put("user_context", user)?;
println!(" ✅ User profile loaded");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
ContextTask::FetchGeo => {
println!(" 🌍 Fetching geo data...");
tokio::time::sleep(Duration::from_millis(30)).await;
let geo = GeoContext {};
store.put("geo_context", geo)?;
println!(" ✅ Geo data loaded");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
ContextTask::DetectDevice => {
println!(" 📱 Detecting device...");
tokio::time::sleep(Duration::from_millis(20)).await;
let device = DeviceContext {};
store.put("device_context", device)?;
println!(" ✅ Device detected");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
}
}
}
#[derive(Clone)]
struct ContactDSPTask {
partner_id: String,
response_delay_ms: u64, }
#[async_trait]
impl Task<AdExchangeState> for ContactDSPTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!(" 📡 Requesting bid from {}...", self.partner_id);
tokio::time::sleep(Duration::from_millis(self.response_delay_ms)).await;
if self.response_delay_ms > 180 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
let bid = BidResponse {
partner_id: self.partner_id.clone(),
price: 2.50 + (self.response_delay_ms as f64 / 100.0),
creative_id: format!("creative_{}", self.partner_id),
response_time_ms: self.response_delay_ms,
};
let mut bids: Vec<BidResponse> = store.get("bids").unwrap_or_default();
bids.push(bid.clone());
store.put("bids", bids)?;
println!(" ✅ {} bid: ${:.2}", self.partner_id, bid.price);
Ok(TaskResult::Single(AdExchangeState::ScoreBids))
}
}
#[derive(Clone)]
struct ScoreBidTask {
bid_index: usize,
}
#[async_trait]
impl Task<AdExchangeState> for ScoreBidTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
let bids: Vec<BidResponse> = store.get("bids")?;
if self.bid_index >= bids.len() {
return Err(CanoError::task_execution("Bid index out of range"));
}
let bid = &bids[self.bid_index];
println!(" 📊 Scoring bid from {}...", bid.partner_id);
tokio::time::sleep(Duration::from_millis(10)).await;
let quality_multiplier = match bid.response_time_ms {
0..=50 => 1.1, 51..=100 => 1.0, 101..=150 => 0.95, _ => 0.9, };
let scored_bid = ScoredBid {
bid: bid.clone(),
score: bid.price * quality_multiplier,
rank: 0, };
let score_value = scored_bid.score;
let mut scored_bids: Vec<ScoredBid> = store.get("scored_bids").unwrap_or_default();
scored_bids.push(scored_bid);
store.put("scored_bids", scored_bids)?;
println!(" ✅ Bid scored: ${:.2}", score_value);
Ok(TaskResult::Single(AdExchangeState::RunAuction))
}
}
#[derive(Clone)]
struct RunAuctionTask;
#[async_trait]
impl Task<AdExchangeState> for RunAuctionTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n 🎯 Running auction...");
let start = tokio::time::Instant::now();
let mut scored_bids: Vec<ScoredBid> = store.get("scored_bids")?;
let request: AdRequest = store.get("ad_request")?;
scored_bids.retain(|b| b.score >= request.floor_price);
if scored_bids.is_empty() {
println!(" ❌ No valid bids above floor price");
let result = AuctionResult {
winner: None,
total_bids: 0,
auction_time_ms: start.elapsed().as_millis() as u64,
};
store.put("auction_result", result)?;
return Ok(TaskResult::Single(AdExchangeState::ErrorTracking));
}
scored_bids.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
for (i, bid) in scored_bids.iter_mut().enumerate() {
bid.rank = i + 1;
}
let winner = scored_bids[0].clone();
println!(
" 🏆 Winner: {} at ${:.2}",
winner.bid.partner_id, winner.score
);
let result = AuctionResult {
winner: Some(winner),
total_bids: scored_bids.len(),
auction_time_ms: start.elapsed().as_millis() as u64,
};
store.put("auction_result", result)?;
Ok(TaskResult::Single(AdExchangeState::TrackResults))
}
}
#[derive(Clone)]
enum TrackingTask {
LogAnalytics,
UpdateMetrics,
NotifyWinner,
StoreAuction,
}
#[async_trait]
impl Task<AdExchangeState> for TrackingTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
TrackingTask::LogAnalytics => {
println!(" 📈 Logging to analytics...");
tokio::time::sleep(Duration::from_millis(30)).await;
let result: AuctionResult = store.get("auction_result")?;
println!(" ✅ Analytics logged: {} bids", result.total_bids);
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::UpdateMetrics => {
println!(" 📊 Updating metrics...");
tokio::time::sleep(Duration::from_millis(25)).await;
println!(" ✅ Metrics updated");
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::NotifyWinner => {
println!(" 📬 Notifying winner...");
let result: AuctionResult = store.get("auction_result")?;
if let Some(winner) = result.winner {
tokio::time::sleep(Duration::from_millis(40)).await;
println!(" ✅ Winner {} notified", winner.bid.partner_id);
}
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::StoreAuction => {
println!(" 💾 Storing auction data...");
tokio::time::sleep(Duration::from_millis(35)).await;
println!(" ✅ Auction data stored");
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
}
}
}
#[derive(Clone)]
struct BuildResponseTask;
#[async_trait]
impl Task<AdExchangeState> for BuildResponseTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n 📦 Building response...");
let request: AdRequest = store.get("ad_request")?;
let result: AuctionResult = store.get("auction_result")?;
println!("\n🎯 Ad Exchange Response Summary:");
println!(" Request ID: {}", request.request_id);
println!(" Total Bids: {}", result.total_bids);
println!(" Auction Time: {}ms", result.auction_time_ms);
if let Some(winner) = result.winner {
println!(" Winner: {}", winner.bid.partner_id);
println!(" Winning Price: ${:.2}", winner.score);
println!(" Creative: {}", winner.bid.creative_id);
} else {
println!(" Result: No Fill");
}
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
#[derive(Clone)]
struct NoFillTask;
#[async_trait]
impl Task<AdExchangeState> for NoFillTask {
async fn run(&self, _store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n⚠️ No Fill Response");
println!("Unable to complete ad request due to timeout or insufficient data.\n");
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
#[derive(Clone)]
struct InvalidResponseTask;
#[async_trait]
impl Task<AdExchangeState> for InvalidResponseTask {
async fn run(&self, _store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n⚠️ Invalid Request");
println!("Request validation failed.\n");
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
#[derive(Clone)]
enum ErrorTrackingTask {
LogError,
UpdateErrorMetrics,
}
#[async_trait]
impl Task<AdExchangeState> for ErrorTrackingTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
ErrorTrackingTask::LogError => {
println!(" 📝 Logging error...");
tokio::time::sleep(Duration::from_millis(20)).await;
let error_type = if store.get::<AuctionResult>("auction_result").is_ok() {
"NoFill"
} else {
"Rejected"
};
println!(" ✅ Error logged: {}", error_type);
Ok(TaskResult::Single(AdExchangeState::NoFill))
}
ErrorTrackingTask::UpdateErrorMetrics => {
println!(" 📊 Updating error metrics...");
tokio::time::sleep(Duration::from_millis(25)).await;
println!(" ✅ Error metrics updated");
Ok(TaskResult::Single(AdExchangeState::NoFill))
}
}
}
}
fn create_ad_exchange_workflow(store: MemoryStore) -> Workflow<AdExchangeState> {
Workflow::new(store.clone())
.register(AdExchangeState::Start, ValidateRequestTask)
.register(AdExchangeState::InvalidResponse, InvalidResponseTask)
.register_split(
AdExchangeState::GatherContext,
vec![
ContextTask::FetchUser,
ContextTask::FetchGeo,
ContextTask::DetectDevice,
],
JoinConfig::new(JoinStrategy::All, AdExchangeState::RequestBids)
.with_timeout(Duration::from_millis(100)),
)
.register_split(
AdExchangeState::RequestBids,
vec![
ContactDSPTask {
partner_id: "DSP-FastBidder".to_string(),
response_delay_ms: 45,
},
ContactDSPTask {
partner_id: "DSP-Premium".to_string(),
response_delay_ms: 80,
},
ContactDSPTask {
partner_id: "DSP-Global".to_string(),
response_delay_ms: 120,
},
ContactDSPTask {
partner_id: "DSP-Slow".to_string(),
response_delay_ms: 190,
},
ContactDSPTask {
partner_id: "DSP-TooSlow".to_string(),
response_delay_ms: 250,
},
],
JoinConfig::new(JoinStrategy::PartialTimeout, AdExchangeState::ScoreBids)
.with_timeout(Duration::from_millis(200))
.with_store_partial_results(true),
)
.register_split(
AdExchangeState::ScoreBids,
vec![
ScoreBidTask { bid_index: 0 },
ScoreBidTask { bid_index: 1 },
ScoreBidTask { bid_index: 2 },
],
JoinConfig::new(JoinStrategy::All, AdExchangeState::RunAuction)
.with_timeout(Duration::from_millis(50)),
)
.register(AdExchangeState::RunAuction, RunAuctionTask)
.register_split(
AdExchangeState::TrackResults,
vec![
TrackingTask::LogAnalytics,
TrackingTask::UpdateMetrics,
TrackingTask::NotifyWinner,
TrackingTask::StoreAuction,
],
JoinConfig::new(JoinStrategy::All, AdExchangeState::BuildResponse)
.with_timeout(Duration::from_millis(100)),
)
.register(AdExchangeState::BuildResponse, BuildResponseTask)
.register(AdExchangeState::NoFill, NoFillTask)
.register_split(
AdExchangeState::ErrorTracking,
vec![
ErrorTrackingTask::LogError,
ErrorTrackingTask::UpdateErrorMetrics,
],
JoinConfig::new(JoinStrategy::All, AdExchangeState::NoFill)
.with_timeout(Duration::from_millis(50)),
)
.add_exit_states(vec![AdExchangeState::Complete, AdExchangeState::Rejected])
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("🚀 Real-Time Ad Exchange Workflow\n");
println!("{}", "=".repeat(60));
let store = MemoryStore::new();
let request = AdRequest {
request_id: "req_abc123".to_string(),
placement_id: "placement_728x90_top".to_string(),
floor_price: 1.50,
};
store.put("ad_request", request)?;
let workflow = create_ad_exchange_workflow(store.clone());
println!("\n🎬 Starting ad exchange workflow...\n");
let start = tokio::time::Instant::now();
let result = match workflow.orchestrate(AdExchangeState::Start).await {
Ok(state) => state,
Err(e) => {
eprintln!("❌ Workflow error: {}", e);
println!("\n⚠️ Handling as No Fill due to error\n");
workflow.orchestrate(AdExchangeState::ErrorTracking).await?
}
};
let total_time = start.elapsed();
println!("\n{}", "=".repeat(60));
println!("✅ Workflow completed in {:?}", total_time);
println!(" Final State: {:?}", result);
Ok(())
}