tekhsi_rs 0.1.0

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use std::env;
use std::time::{Duration, Instant};

use tekhsi_rs::{SubscribeOptions, TekHsiClient};
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;

const DEFAULT_ADDRESS: &str = "127.0.0.1:5000";
const REPORT_INTERVAL: Duration = Duration::from_secs(1);

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt()
        .with_env_filter(
            EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| EnvFilter::new("info,tekhsi_rs=debug")),
        )
        .init();

    let addr = env::args()
        .nth(1)
        .or_else(|| env::var("TEKHSI_ADDRESS").ok())
        .unwrap_or_else(|| DEFAULT_ADDRESS.to_string());

    info!("Connecting to {addr}");
    let client = TekHsiClient::connect(&addr).await?;
    let mut symbols = client.list_available_symbols().await?;
    symbols.sort();

    info!("Symbols: {}", symbols.join(", "));
    let mut receiver = client.subscribe(symbols, SubscribeOptions::default())?;

    let mut waveforms_seen: u64 = 0;
    let mut acquisitions_seen: u64 = 0;
    let mut last_report = Instant::now();

    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                info!("Stopping on Ctrl+C");
                break;
            }
            message = receiver.recv() => match message {
                Ok(acquisition) => {
                    acquisitions_seen += 1u64;
                    waveforms_seen += acquisition.data.len() as u64;
                    if last_report.elapsed() >= REPORT_INTERVAL {
                        let elapsed = last_report.elapsed().as_secs_f64();
                        let waveform_rate = (waveforms_seen as f64) / elapsed;
                        let acquisition_rate = acquisitions_seen as f64 / elapsed;
                        info!("waveforms_per_sec={waveform_rate:.2} | acquisitions_per_sec={acquisition_rate:.2}");
                        acquisitions_seen = 0;
                        waveforms_seen = 0;
                        last_report = Instant::now();
                    }
                }
                Err(RecvError::Lagged(skipped)) => {
                    warn!("receiver lagged, skipped {skipped} acquisitions");
                }
                Err(RecvError::Closed) => {
                    warn!("acquisition channel closed");
                    break;
                }
            }
        }
    }

    drop(receiver);
    client.disconnect().await?;
    Ok(())
}