use futures::StreamExt;
use rs2_stream::state::config::StateConfig;
use rs2_stream::state::{CustomKeyExtractor, StatefulStreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UserActivity {
user_id: String,
action: String,
timestamp: u64,
duration: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UserProfile {
total_actions: u64,
last_action: String,
average_duration: f64,
action_counts: HashMap<String, u64>,
last_seen: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UserState {
visit_count: u64,
last_visit: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EnrichedUser {
id: String,
name: String,
visit_count: u64,
last_visit: u64,
is_returning: bool,
}
#[tokio::main]
async fn main() {
println!("\n=== Stateful Map Example ===\n");
let activities = vec![
UserActivity {
user_id: "alice".to_string(),
action: "login".to_string(),
timestamp: 1000,
duration: 5,
},
UserActivity {
user_id: "alice".to_string(),
action: "browse".to_string(),
timestamp: 1020,
duration: 120,
},
UserActivity {
user_id: "alice".to_string(),
action: "purchase".to_string(),
timestamp: 1150,
duration: 30,
},
UserActivity {
user_id: "bob".to_string(),
action: "login".to_string(),
timestamp: 1100,
duration: 3,
},
UserActivity {
user_id: "bob".to_string(),
action: "search".to_string(),
timestamp: 1110,
duration: 45,
},
UserActivity {
user_id: "charlie".to_string(),
action: "login".to_string(),
timestamp: 1200,
duration: 8,
},
];
println!("1. Basic Stateful Mapping - Enriching Activities with User Profile:");
let stream = futures::stream::iter(activities.clone());
let enriched_stream = stream.stateful_map_rs2(
StateConfig::new(),
CustomKeyExtractor::new(|activity: &UserActivity| activity.user_id.clone()),
|activity, state_access| {
Box::pin(async move {
let fut = async move {
let mut state = if let Some(bytes) = state_access.get().await {
serde_json::from_slice(&bytes).unwrap_or(UserState {
visit_count: 0,
last_visit: 0,
})
} else {
UserState {
visit_count: 0,
last_visit: 0,
}
};
state.visit_count += 1;
state.last_visit = activity.timestamp;
let state_bytes = serde_json::to_vec(&state).unwrap();
state_access.set(&state_bytes).await.unwrap();
EnrichedUser {
id: activity.user_id.clone(),
name: String::new(),
visit_count: state.visit_count,
last_visit: state.last_visit,
is_returning: state.visit_count > 1,
}
};
Ok(fut.await)
})
},
);
let enriched_results: Vec<EnrichedUser> = enriched_stream
.collect::<Vec<Result<EnrichedUser, _>>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
for result in enriched_results {
println!(" {:?}", result);
}
println!("\n2. Stateful Mapping with Session Tracking:");
let stream = futures::stream::iter(activities.clone());
let session_stream = stream.stateful_map_rs2(
StateConfig::new(),
CustomKeyExtractor::new(|activity: &UserActivity| activity.user_id.clone()),
|activity, state_access| {
Box::pin(async move {
let state_bytes = state_access.get().await.unwrap_or(Vec::new());
let mut session_data: HashMap<String, u64> = if state_bytes.is_empty() {
HashMap::new()
} else {
serde_json::from_slice(&state_bytes).unwrap_or(HashMap::new())
};
if !session_data.contains_key("session_start") {
session_data.insert("session_start".to_string(), activity.timestamp);
}
session_data.insert("last_activity".to_string(), activity.timestamp);
let session_start = session_data
.get("session_start")
.unwrap_or(&activity.timestamp);
let session_duration = activity.timestamp - session_start;
let state_bytes = serde_json::to_vec(&session_data).unwrap();
state_access.set(&state_bytes).await.unwrap();
Ok(format!(
"User: {} | Action: {} | Session Duration: {}s | Timestamp: {}",
activity.user_id, activity.action, session_duration, activity.timestamp
))
})
},
);
let session_results: Vec<String> = session_stream
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
for result in session_results {
println!(" {}", result);
}
println!("\n3. Stateful Mapping with Rate Limiting:");
let stream = futures::stream::iter(activities.clone());
let rate_limited_stream = stream.stateful_map_rs2(
StateConfig::new(),
CustomKeyExtractor::new(|activity: &UserActivity| activity.user_id.clone()),
|activity, state_access| {
Box::pin(async move {
let state_bytes = state_access.get().await.unwrap_or(Vec::new());
let mut rate_data: HashMap<String, u64> = if state_bytes.is_empty() {
HashMap::new()
} else {
serde_json::from_slice(&state_bytes).unwrap_or(HashMap::new())
};
let current_count = rate_data.get("count").unwrap_or(&0);
let window_start = rate_data.get("window_start").unwrap_or(&activity.timestamp);
let window_duration = 60;
let new_count = if activity.timestamp - window_start > window_duration {
1
} else {
current_count + 1
};
rate_data.insert("count".to_string(), new_count);
if new_count == 1 {
rate_data.insert("window_start".to_string(), activity.timestamp);
}
let state_bytes = serde_json::to_vec(&rate_data).unwrap();
state_access.set(&state_bytes).await.unwrap();
if new_count > 3 {
Ok(format!(
"User: {} | Action: {} | STATUS: RATE LIMITED ({} actions in window)",
activity.user_id, activity.action, new_count
))
} else {
Ok(format!(
"User: {} | Action: {} | Rate: {}/{} actions per minute",
activity.user_id, activity.action, new_count, window_duration
))
}
})
},
);
let rate_results: Vec<String> = rate_limited_stream
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
for result in rate_results {
println!(" {}", result);
}
println!("\n4. Stateful Mapping with Anomaly Detection:");
let stream = futures::stream::iter(activities.clone());
let anomaly_stream = stream.stateful_map_rs2(
StateConfig::new(),
CustomKeyExtractor::new(|activity: &UserActivity| activity.user_id.clone()),
|activity, state_access| {
Box::pin(async move {
let state_bytes = state_access.get().await.unwrap_or(Vec::new());
let mut anomaly_data: HashMap<String, f64> = if state_bytes.is_empty() {
HashMap::new()
} else {
serde_json::from_slice(&state_bytes).unwrap_or(HashMap::new())
};
let current_avg = anomaly_data.get("avg_duration").unwrap_or(&0.0);
let count = anomaly_data.get("count").unwrap_or(&0.0);
let new_count = count + 1.0;
let new_avg = (current_avg * count + activity.duration as f64) / new_count;
anomaly_data.insert("avg_duration".to_string(), new_avg);
anomaly_data.insert("count".to_string(), new_count);
let state_bytes = serde_json::to_vec(&anomaly_data).unwrap();
state_access.set(&state_bytes).await.unwrap();
let threshold = new_avg * 2.0;
let is_anomaly = activity.duration as f64 > threshold && new_count > 1.0;
if is_anomaly {
Ok(format!(
"ANOMALY DETECTED: User: {} | Action: {} | Score: {:.2} | Avg: {:.2}",
activity.user_id, activity.action, activity.duration as f64, new_avg
))
} else {
Ok(format!(
"Normal: User: {} | Action: {} | Duration: {}s | Avg: {:.2}s",
activity.user_id, activity.action, activity.duration, new_avg
))
}
})
},
);
let anomaly_results: Vec<String> = anomaly_stream
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
for result in anomaly_results {
println!(" {}", result);
}
println!("\n=== Stateful Map Example Complete ===");
}