use effect_rs::{Effect, MetricLabel, REGISTRY, Runtime, Schedule};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
struct Database {
failures: AtomicU64,
}
impl Database {
fn new() -> Self {
Self {
failures: AtomicU64::new(0),
}
}
fn fetch_user(&self, id: u32) -> Effect<(), String, String> {
let failures = self.failures.fetch_add(1, Ordering::Relaxed);
Effect::sync(move || {
if failures % 3 != 0 {
Effect::fail(format!("DB Connection Timeout for user {}", id))
} else {
Effect::succeed(format!("User Profile({})", id))
}
})
.flat_map(|e| e)
}
}
fn handle_request(db: Arc<Database>, req_id: u32) -> Effect<(), String, String> {
let db = db.clone();
let policy = Schedule::<String, Duration>::exponential(Duration::from_millis(10), 2.0)
.intersect(Schedule::<String, usize>::recurs(5));
Effect::succeed(())
.flat_map(move |_| {
println!("[Req {}] Processing...", req_id);
db.fetch_user(req_id)
})
.retry(policy)
.map(move |user| {
let res = format!("[Req {}] Response: {}", req_id, user);
println!("{}", res);
res
})
.with_metric_increment(
"http_requests_total",
vec![MetricLabel::new("status", "200")],
)
.with_metric_duration("http_request_duration", vec![])
.map_error(move |e| {
println!("[Req {}] Failed after retries: {}", req_id, e);
e
})
}
fn main() {
let rt = Runtime::new();
let db = Arc::new(Database::new());
let mut requests = Vec::new();
for i in 0..20 {
requests.push(handle_request(db.clone(), i));
}
let program = Effect::collect_all_par(requests).map(|_| println!("All requests completed"));
let _ = rt.block_on(program, ());
let counter = REGISTRY.get_counter(
"http_requests_total",
vec![MetricLabel::new("status", "200")],
);
println!("\n=== Metrics ===");
println!("Total Successful Requests: {}", counter.get());
}