use rust_tango::{Consumer, DCache, Fctl, Fseq, MCache, Metrics, Producer};
use std::thread;
use std::time::Duration;
fn main() {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 256>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(16);
let metrics = Metrics::new();
println!("Starting producer/consumer with metrics tracking\n");
thread::scope(|s| {
let metrics_handle = s.spawn(|| {
for _ in 0..10 {
thread::sleep(Duration::from_millis(100));
let snapshot = metrics.snapshot();
println!(
"[Metrics] published={}, consumed={}, lag={}, backpressure={}",
snapshot.published,
snapshot.consumed,
snapshot.lag(),
snapshot.backpressure_events
);
}
});
let consumer_handle = s.spawn(|| {
let mut consumer =
Consumer::with_flow_control(&mcache, &dcache, &fctl, 1).with_metrics(&metrics);
let mut count = 0;
while count < 50 {
match consumer.poll() {
Ok(Some(_)) => {
count += 1;
thread::sleep(Duration::from_millis(10 + (count % 20)));
}
Ok(None) => thread::yield_now(),
Err(_) => break,
}
}
});
let producer_handle = s.spawn(|| {
let producer =
Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
for i in 0..50 {
let payload = format!("message-{}", i);
match producer.publish(payload.as_bytes(), i as u64, 0, 0) {
Ok(_) => {}
Err(rust_tango::TangoError::NoCredits) => {
thread::sleep(Duration::from_millis(5));
let _ = producer.publish_blocking(payload.as_bytes(), i as u64, 0, 0);
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
thread::sleep(Duration::from_millis(5));
}
});
producer_handle.join().unwrap();
consumer_handle.join().unwrap();
metrics_handle.join().unwrap();
});
let final_snapshot = metrics.snapshot();
println!("\n=== Final Metrics ===");
println!("{}", final_snapshot);
}