tekhsi_rs 0.1.1

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use smol_str::SmolStr;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::debug;

use crate::data::Acquisition;

pub(crate) struct AcquisitionConfig {
    pub(crate) client_name: SmolStr,
    pub(crate) active_symbols: Vec<String>,
    pub(crate) download_chunk_size: u32,
    pub(crate) decode_batch_buffer: usize,
}

pub(crate) struct AcquisitionRunner {
    channel: tonic::transport::Channel,
    config: AcquisitionConfig,
    sender: broadcast::Sender<Acquisition>,
    shutdown: CancellationToken,
    symbol_updates: watch::Receiver<Vec<String>>,
}

impl AcquisitionRunner {
    pub(crate) fn new(
        channel: tonic::transport::Channel,
        config: AcquisitionConfig,
        sender: broadcast::Sender<Acquisition>,
        shutdown: CancellationToken,
        symbol_updates: watch::Receiver<Vec<String>>,
    ) -> Self {
        debug_assert!(
            config.download_chunk_size != 0,
            "Download chunk size must be greater than 0"
        );

        Self {
            channel,
            config,
            sender,
            shutdown,
            symbol_updates,
        }
    }

    pub(crate) fn spawn(self) -> JoinHandle<()> {
        tokio::spawn(async move {
            self.run().await;
        })
    }

    async fn run(self) {
        // Initial wait for at least one receiver
        while self.sender.receiver_count() == 0 {
            if self.shutdown.is_cancelled() {
                debug!("acquisition loop cancelled before start");
                return;
            }
            sleep(Duration::from_millis(1)).await;
        }

        let (batch_sender, batch_receiver) =
            kanal::bounded_async::<super::BatchDownloadResult>(self.config.decode_batch_buffer);
        let (recycle_tx, recycle_rx) =
            kanal::bounded_async::<super::BatchDownloadResult>(self.config.decode_batch_buffer);

        let download_handle = tokio::spawn(super::download_loop::download_loop(
            self.channel,
            self.config.client_name.clone(),
            self.config,
            batch_sender,
            recycle_rx,
            self.shutdown.clone(),
            self.symbol_updates,
        ));

        let decode_handle = tokio::spawn(super::decode_loop::decode_loop(
            batch_receiver,
            recycle_tx,
            self.sender.clone(),
            self.shutdown,
        ));

        let _ = decode_handle.await;
        let _ = download_handle.await;
        debug!("acquisition loop exited");
    }
}