use futures::StreamExt;
use rs2_stream::{
state::config::StateConfigs,
state::{CustomKeyExtractor, StatefulStreamExt},
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Request {
id: String,
user_id: String,
endpoint: String,
timestamp: u64,
priority: u8,
}
#[tokio::main]
async fn main() {
println!("=== RS2 Stateful Throttle Example ===\n");
let requests = vec![
Request {
id: "req1".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/data".to_string(),
timestamp: 1000,
priority: 1,
},
Request {
id: "req2".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/data".to_string(),
timestamp: 1100,
priority: 2,
},
Request {
id: "req3".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/data".to_string(),
timestamp: 1200,
priority: 1,
},
Request {
id: "req4".to_string(),
user_id: "user2".to_string(),
endpoint: "/api/users".to_string(),
timestamp: 1300,
priority: 3,
},
Request {
id: "req5".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/users".to_string(),
timestamp: 1400,
priority: 1,
},
Request {
id: "req6".to_string(),
user_id: "user2".to_string(),
endpoint: "/api/data".to_string(),
timestamp: 1500,
priority: 2,
},
];
println!("1. Throttle by User (2 requests per second):");
let high_perf_config = StateConfigs::high_performance();
let user_throttled = futures::stream::iter(requests.clone()).stateful_throttle_rs2(
high_perf_config,
CustomKeyExtractor::new(|req: &Request| req.user_id.clone()),
2, Duration::from_secs(1), |req| req, );
let user_results: Vec<Request> = user_throttled
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!(" Original requests: {}", requests.len());
println!(" After throttling: {}", user_results.len());
println!(" Allowed requests:");
for req in user_results {
println!(
" {}: {} by {} (priority: {})",
req.id, req.endpoint, req.user_id, req.priority
);
}
println!("\n2. Throttle by Endpoint (3 requests per 2 seconds):");
let session_config = StateConfigs::session();
let endpoint_throttled = futures::stream::iter(requests.clone()).stateful_throttle_rs2(
session_config,
CustomKeyExtractor::new(|req: &Request| req.endpoint.clone()),
3, Duration::from_secs(2), |req| req, );
let endpoint_results: Vec<Request> = endpoint_throttled
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!(" Original requests: {}", requests.len());
println!(" After throttling: {}", endpoint_results.len());
println!(" Allowed requests by endpoint:");
for req in endpoint_results {
println!(
" {}: {} by {} (priority: {})",
req.id, req.endpoint, req.user_id, req.priority
);
}
println!("\n3. Throttle by User-Endpoint Combination (1 request per 500ms):");
let short_lived_config = StateConfigs::short_lived();
let user_endpoint_throttled = futures::stream::iter(requests.clone()).stateful_throttle_rs2(
short_lived_config,
CustomKeyExtractor::new(|req: &Request| format!("{}_{}", req.user_id, req.endpoint)),
1, Duration::from_millis(500), |req| req, );
let user_endpoint_results: Vec<Request> = user_endpoint_throttled
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!(" Original requests: {}", requests.len());
println!(" After throttling: {}", user_endpoint_results.len());
println!(" Allowed requests by user-endpoint:");
for req in user_endpoint_results {
println!(
" {}: {} by {} (priority: {})",
req.id, req.endpoint, req.user_id, req.priority
);
}
println!("\n4. Throttle by Priority (5 requests per 3 seconds):");
let long_lived_config = StateConfigs::long_lived();
let priority_throttled = futures::stream::iter(requests.clone()).stateful_throttle_rs2(
long_lived_config,
CustomKeyExtractor::new(|req: &Request| req.priority.to_string()),
5, Duration::from_secs(3), |req| req, );
let priority_results: Vec<Request> = priority_throttled
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!(" Original requests: {}", requests.len());
println!(" After throttling: {}", priority_results.len());
println!(" Allowed requests by priority:");
for req in priority_results {
println!(
" {}: {} by {} (priority: {})",
req.id, req.endpoint, req.user_id, req.priority
);
}
println!("\n5. Real-time Throttling Simulation:");
let realtime_config = StateConfigs::high_performance();
let rapid_requests = vec![
Request {
id: "rapid1".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/stream".to_string(),
timestamp: 1000,
priority: 1,
},
Request {
id: "rapid2".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/stream".to_string(),
timestamp: 1001,
priority: 1,
},
Request {
id: "rapid3".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/stream".to_string(),
timestamp: 1002,
priority: 1,
},
Request {
id: "rapid4".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/stream".to_string(),
timestamp: 1003,
priority: 1,
},
Request {
id: "rapid5".to_string(),
user_id: "user1".to_string(),
endpoint: "/api/stream".to_string(),
timestamp: 1004,
priority: 1,
},
];
let rapid_throttled = futures::stream::iter(rapid_requests).stateful_throttle_rs2(
realtime_config,
CustomKeyExtractor::new(|req: &Request| req.user_id.clone()),
2, Duration::from_millis(100), |req| req, );
let rapid_results: Vec<Request> = rapid_throttled
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!(" Rapid requests: 5");
println!(" After throttling: {}", rapid_results.len());
println!(" Throttled requests:");
for req in rapid_results {
println!(" {} at timestamp {}", req.id, req.timestamp);
}
println!("\n=== Stateful Throttle Example Complete ===");
}