tekhsi_rs 0.1.1

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use super::{BatchDownloadResult, SymbolClients, SymbolFetcher};
use crate::errors::TekHsiError;
use crate::helpers::Backoff;
use crate::tekscope::new_connect_client;
use crate::tekscope::{ConnectRequest, ConnectStatus};
use smol_str::SmolStr;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

pub(super) async fn download_loop(
    channel: tonic::transport::Channel,
    client_name: SmolStr,
    config: super::AcquisitionConfig,
    batch_sender: kanal::AsyncSender<BatchDownloadResult>,
    recycle_rx: kanal::AsyncReceiver<BatchDownloadResult>,
    shutdown: CancellationToken,
    mut symbol_updates: tokio::sync::watch::Receiver<Vec<String>>,
) -> Result<(), TekHsiError> {
    let mut wait_backoff = Backoff::new(5, 100, 14);
    let mut client = new_connect_client(channel.clone(), &client_name);

    let mut symbol_clients = Vec::new();
    let mut symbol_fetcher = SymbolFetcher::new(0);

    rebuild_symbol_state(
        &config,
        &channel,
        &config.active_symbols,
        &mut symbol_clients,
        &mut symbol_fetcher,
    );

    loop {
        if shutdown.is_cancelled() {
            break;
        }

        if symbol_updates.has_changed().unwrap_or(false) {
            let active_symbols = symbol_updates.borrow_and_update();
            rebuild_symbol_state(
                &config,
                &channel,
                &active_symbols,
                &mut symbol_clients,
                &mut symbol_fetcher,
            );
        }

        debug_assert!(
            !symbol_clients.is_empty(),
            "Active symbol list should not be empty"
        );

        while let Ok(Some(batch)) = recycle_rx.try_recv() {
            symbol_fetcher.recycle(batch);
        }

        let wait_start = Instant::now();
        let wait_reply = tokio::select! {
            _ = shutdown.cancelled() => {
                break;
            }
            result = client.wait_for_data_access(ConnectRequest {
                name: client_name.to_string(),
            }) => {
                result?.into_inner()
            }
        };

        if wait_reply.status() != ConnectStatus::ConnectstatusSuccess {
            warn!("wait_for_data_access failed: {:?}", wait_reply.status());
            if !wait_backoff.sleep_next().await {
                warn!("wait_for_data_access exceeded retry limit");
                break;
            }
            continue;
        }
        let wait_time = wait_start.elapsed();
        wait_backoff.reset();

        let download_start = Instant::now();
        let result = symbol_fetcher
            .fetch_symbols(&symbol_clients, config.download_chunk_size)
            .await;
        let download_time = download_start.elapsed();

        match result {
            Ok(mut batch) => {
                batch.wait_time = wait_time;
                batch.download_time = download_time;
                if batch_sender.send(batch).await.is_err() {
                    break;
                }
            }
            Err(err) => {
                warn!("acquisition read failed: {}", err);
            }
        }

        let finished_reply = client
            .finished_with_data_access(ConnectRequest {
                name: client_name.to_string(),
            })
            .await?
            .into_inner();

        if finished_reply.status() != ConnectStatus::ConnectstatusSuccess {
            warn!(
                "finished_with_data_access failed: {:?}",
                finished_reply.status()
            );
            break;
        }
    }

    drop(batch_sender);
    debug!("download loop exited");
    Ok(())
}

fn rebuild_symbol_state(
    config: &super::AcquisitionConfig,
    channel: &tonic::transport::Channel,
    active_symbols: &[String],
    symbol_clients: &mut Vec<SymbolClients>,
    symbol_fetcher: &mut SymbolFetcher,
) {
    symbol_clients.clear();
    symbol_clients.extend(active_symbols.iter().map(|symbol| {
        let name: SmolStr = symbol.as_str().into();
        SymbolClients::new(name, config.client_name.clone(), channel.clone())
    }));

    *symbol_fetcher = SymbolFetcher::new((config.decode_batch_buffer + 1) * symbol_clients.len());
    symbol_clients.iter().for_each(|client| {
        symbol_fetcher.preallocate_pool(&client.symbol, config.decode_batch_buffer + 1);
    });
}