tekhsi_rs 0.1.1

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use super::decode_batch::decode_batch;
use crate::data::Acquisition;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

pub(super) async fn decode_loop(
    batch_receiver: kanal::AsyncReceiver<super::BatchDownloadResult>,
    recycle_tx: kanal::AsyncSender<super::BatchDownloadResult>,
    sender: tokio::sync::broadcast::Sender<Acquisition>,
    shutdown: CancellationToken,
) {
    while let Ok(mut batch) = batch_receiver.recv().await {
        match decode_batch(&mut batch) {
            Ok(acquisition) => {
                if sender.receiver_count() == 0 || sender.send(acquisition).is_err() {
                    shutdown.cancel();
                    break;
                }
            }
            Err(err) => {
                warn!("decode failed: {}", err);
            }
        }
        let _ = recycle_tx.try_send(batch);
    }

    shutdown.cancel();
    debug!("decode loop exited");
}

#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
mod tests {
    use super::decode_loop;
    use crate::acquisition::result::{BatchDownloadResult, SymbolDownloadResult};
    use crate::errors::AcquisitionError;
    use crate::tekscope::WaveformHeader;
    use smol_str::SmolStr;
    use tokio::time::{Duration, timeout};
    use tokio_util::sync::CancellationToken;

    fn base_header() -> WaveformHeader {
        WaveformHeader {
            sourcename: "ch1".to_string(),
            verticalunits: "V".to_string(),
            horizontal_units: "s".to_string(),
            hasdata: true,
            ..Default::default()
        }
    }

    #[tokio::test]
    async fn decode_loop_cancels_when_no_receivers() {
        let (batch_tx, batch_rx) = kanal::bounded_async(1);
        let (recycle_tx, _recycle_rx) = kanal::bounded_async(1);
        let (sender, receiver) = tokio::sync::broadcast::channel(1);
        let shutdown = CancellationToken::new();

        drop(receiver);

        let task = tokio::spawn(decode_loop(batch_rx, recycle_tx, sender, shutdown.clone()));

        let header = WaveformHeader {
            wfmtype: 1,
            sourcewidth: 1,
            noofsamples: 1,
            dataid: 1,
            ..base_header()
        };
        let batch = BatchDownloadResult {
            results: vec![SymbolDownloadResult::Success {
                symbol: SmolStr::from("ch1"),
                header,
                data_chunks: vec![vec![1u8]],
            }],
            ..Default::default()
        };

        batch_tx.send(batch).await.unwrap();

        timeout(Duration::from_secs(2), task)
            .await
            .expect("decode loop timed out")
            .expect("decode loop task failed");
        assert!(shutdown.is_cancelled());
    }

    #[tokio::test]
    async fn decode_loop_logs_decode_errors() {
        let (batch_tx, batch_rx) = kanal::bounded_async(1);
        let (recycle_tx, recycle_rx) = kanal::bounded_async(1);
        let (sender, _receiver) = tokio::sync::broadcast::channel(1);
        let shutdown = CancellationToken::new();

        let task = tokio::spawn(decode_loop(
            batch_rx,
            recycle_tx.clone(),
            sender,
            shutdown.clone(),
        ));

        let batch = BatchDownloadResult {
            results: vec![SymbolDownloadResult::Failure {
                symbol: SmolStr::from("ch1"),
                error: AcquisitionError::DownloadFailed {
                    message: "test".to_string(),
                },
            }],
            ..Default::default()
        };

        batch_tx.send(batch).await.unwrap();
        drop(batch_tx);

        timeout(Duration::from_secs(2), task)
            .await
            .expect("decode loop timed out")
            .expect("decode loop task failed");
        assert!(shutdown.is_cancelled());

        let recycled = recycle_rx.recv().await.expect("expected recycled batch");
        assert_eq!(recycled.results.len(), 1);
    }
}