use std::env;
use std::time::{Duration, Instant};
use tekhsi_rs::{SubscribeOptions, TekHsiClient};
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
const DEFAULT_ADDRESS: &str = "127.0.0.1:5000";
const REPORT_INTERVAL: Duration = Duration::from_secs(1);
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,tekhsi_rs=debug")),
)
.init();
let addr = env::args()
.nth(1)
.or_else(|| env::var("TEKHSI_ADDRESS").ok())
.unwrap_or_else(|| DEFAULT_ADDRESS.to_string());
info!("Connecting to {addr}");
let client = TekHsiClient::connect(&addr).await?;
let mut symbols = client.list_available_symbols().await?;
symbols.sort();
info!("Symbols: {}", symbols.join(", "));
let mut receiver = client.subscribe(symbols, SubscribeOptions::default())?;
let mut waveforms_seen: u64 = 0;
let mut acquisitions_seen: u64 = 0;
let mut last_report = Instant::now();
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Stopping on Ctrl+C");
break;
}
message = receiver.recv() => match message {
Ok(acquisition) => {
acquisitions_seen += 1u64;
waveforms_seen += acquisition.data.len() as u64;
if last_report.elapsed() >= REPORT_INTERVAL {
let elapsed = last_report.elapsed().as_secs_f64();
let waveform_rate = (waveforms_seen as f64) / elapsed;
let acquisition_rate = acquisitions_seen as f64 / elapsed;
info!("waveforms_per_sec={waveform_rate:.2} | acquisitions_per_sec={acquisition_rate:.2}");
acquisitions_seen = 0;
waveforms_seen = 0;
last_report = Instant::now();
}
}
Err(RecvError::Lagged(skipped)) => {
warn!("receiver lagged, skipped {skipped} acquisitions");
}
Err(RecvError::Closed) => {
warn!("acquisition channel closed");
break;
}
}
}
}
drop(receiver);
client.disconnect().await?;
Ok(())
}