tekhsi_rs 0.1.1

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use crate::helpers::{short_uuid, Backoff};
use crate::tekscope::new_connect_client;
use smol_str::SmolStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{broadcast, watch};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};

use crate::acquisition::{AcquisitionConfig, AcquisitionRunner};
use crate::client::options::DEFAULT_CAPACITY;
use crate::data::Acquisition;
use crate::errors::{ConnectionError, SubscriptionError, SubscriptionUpdateError, TekHsiError};
use crate::tekscope::{ConnectRequest, ConnectStatus};
use crate::SubscribeOptions;

struct SubscriptionHandle {
    shutdown: CancellationToken,
    update_tx: watch::Sender<Vec<String>>,
}

/// TekHSI client for connecting, subscribing, and listing symbols.
///
/// Use [`connect`](TekHsiClient::connect) to establish a session, then call
/// [`subscribe`](TekHsiClient::subscribe) to stream acquisitions.
pub struct TekHsiClient {
    client_name: SmolStr,
    channel: tonic::transport::Channel,
    disconnect_token: AtomicBool,
    subscription: Mutex<Option<SubscriptionHandle>>,
}

impl TekHsiClient {
    fn take_subscription(&self) -> Option<SubscriptionHandle> {
        self.subscription.lock().ok()?.take()
    }

    /// Connect to a TekHSI server using the given address (e.g. `127.0.0.1:5000`).
    ///
    /// ```no_run
    /// # use tekhsi_rs::TekHsiClient;
    /// # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
    /// let _client = TekHsiClient::connect("127.0.0.1:5000").await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn connect(addr: &str) -> Result<TekHsiClient, TekHsiError> {
        let addr = if addr.contains("://") {
            addr.to_string()
        } else {
            format!("http://{addr}")
        };

        debug!("connecting to {}", addr);
        let channel = tonic::transport::Endpoint::from_shared(addr)?
            .connect()
            .await?;
        debug!("transport channel created");

        let client_name: SmolStr = short_uuid().into();

        let mut backoff = Backoff::new(5, 250, 10);
        let mut connect = new_connect_client(channel.clone(), &client_name);

        debug!("connect client created");

        loop {
            let reply = connect
                .connect(ConnectRequest {
                    name: client_name.to_string(),
                })
                .await?
                .into_inner();

            debug!("connect reply status: {:?}", reply.status());
            if reply.status() == ConnectStatus::ConnectstatusSuccess {
                break;
            }

            let connection_error: ConnectionError = reply.status().into();
            match connection_error {
                ConnectionError::Timeout | ConnectionError::ScopeBusy => {
                    if !backoff.sleep_next().await {
                        error!(
                            "final connection attempt failed with status: {:?}",
                            connection_error
                        );
                        return Err(TekHsiError::Connection(connection_error));
                    }

                    warn!(
                        "connection attempt failed with status: {:?}, retrying...",
                        connection_error
                    );
                }
                ConnectionError::Fatal { .. } | ConnectionError::LostSync => {
                    error!(
                        "connection attempt failed with status: {:?}",
                        connection_error
                    );
                    return Err(TekHsiError::Connection(connection_error));
                }
            }
        }

        Ok(Self {
            client_name,
            channel,
            disconnect_token: AtomicBool::new(false),
            subscription: Mutex::new(None),
        })
    }

    /// Disconnect from the scope and stop any active acquisition loops.
    ///
    /// This method:
    /// - Sets disconnect token to prevent new operations
    /// - Cancels all running acquisition download/decode loops
    /// - Waits 500ms for loops to clean up gracefully
    /// - Sends a formal disconnect request to the TekHSI service
    ///
    /// Note: The client also implements [`Drop`] for automatic cleanup,
    /// but explicit disconnection is recommended for deterministic shutdown.
    pub async fn disconnect(&self) -> Result<(), TekHsiError> {
        self.disconnect_token.store(true, Ordering::SeqCst);
        debug!("disconnecting");

        // Stop the download/decode loops
        if let Some(handle) = self.take_subscription() {
            handle.shutdown.cancel();
            sleep(Duration::from_millis(500)).await;
        }

        let mut connect = new_connect_client(self.channel.clone(), &self.client_name);
        let _ = connect
            .disconnect(ConnectRequest {
                name: self.client_name.to_string(),
            })
            .await?;
        Ok(())
    }

    /// Start a subscription for the provided symbols.
    ///
    /// Symbols are normalized to lowercase to match headers from real scopes.
    ///
    /// ```no_run
    /// # use tekhsi_rs::{SubscribeOptions, TekHsiClient};
    /// # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
    /// let client = TekHsiClient::connect("127.0.0.1:5000").await?;
    /// let symbols = client.list_available_symbols().await?;
    /// let mut rx = client.subscribe(symbols, SubscribeOptions::default())?;
    /// let _acquisition = rx.recv().await?;
    /// client.disconnect().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn subscribe(
        &self,
        mut active_symbols: Vec<String>,
        opts: SubscribeOptions,
    ) -> Result<Receiver<Acquisition>, TekHsiError> {
        if active_symbols.is_empty() {
            return Err(SubscriptionError::EmptySymbols.into());
        }

        active_symbols
            .iter_mut()
            .for_each(|s| s.make_ascii_lowercase());

        active_symbols.dedup();

        let mut subscription = self
            .subscription
            .lock()
            .map_err(|_| SubscriptionError::Fatal)?;

        if let Some(handle) = subscription.as_ref() {
            if !handle.shutdown.is_cancelled() && !handle.update_tx.is_closed() {
                return Err(SubscriptionError::AlreadyActive.into());
            }
            subscription.take();
        }

        let capacity = if opts.capacity == 0 {
            DEFAULT_CAPACITY
        } else {
            opts.capacity
        };

        debug!(
            "starting acquisition loop for symbols: {:?}",
            active_symbols
        );

        let shutdown = CancellationToken::new();
        let (sender, receiver) = broadcast::channel(capacity);
        let (update_tx, update_rx) = watch::channel(active_symbols.clone());
        let config = AcquisitionConfig {
            client_name: self.client_name.clone(),
            active_symbols,
            download_chunk_size: opts.download_chunk_size,
            decode_batch_buffer: opts.decode_buffer_capacity,
        };

        AcquisitionRunner::new(
            self.channel.clone(),
            config,
            sender,
            shutdown.clone(),
            update_rx,
        )
        .spawn();

        // Record the new subscription
        *subscription = Some(SubscriptionHandle {
            shutdown,
            update_tx,
        });

        Ok(receiver)
    }

    /// Update the active symbols for an existing subscription.
    ///
    /// Symbols are normalized to lowercase to match headers from real scopes.
    pub fn update_symbols(&self, mut active_symbols: Vec<String>) -> Result<(), TekHsiError> {
        if active_symbols.is_empty() {
            return Err(SubscriptionUpdateError::EmptySymbols.into());
        }

        active_symbols
            .iter_mut()
            .for_each(|s| s.make_ascii_lowercase());

        active_symbols.dedup();

        let mut subscription = self
            .subscription
            .lock()
            .map_err(|_| SubscriptionUpdateError::Fatal)?;

        // We can't update if there is no subscription
        let Some(handle) = subscription.as_ref() else {
            return Err(SubscriptionUpdateError::NotActive.into());
        };

        // Refuse to update if we are tearing down
        if handle.shutdown.is_cancelled() || handle.update_tx.is_closed() {
            *subscription = None;
            return Err(SubscriptionUpdateError::NotActive.into());
        }

        debug!("updating symbols for subscription: {:?}", active_symbols);

        if handle.update_tx.send(active_symbols).is_err() {
            *subscription = None;
            return Err(SubscriptionUpdateError::UpdateFailed.into());
        }

        Ok(())
    }

    /// Fetch the list of available symbols from the scope.
    ///
    /// The returned symbols are normalized to lowercase.
    pub async fn list_available_symbols(&self) -> Result<Vec<String>, TekHsiError> {
        let mut connect = new_connect_client(self.channel.clone(), &self.client_name);
        let reply = connect
            .request_available_names(ConnectRequest {
                name: self.client_name.to_string(),
            })
            .await?
            .into_inner();
        if reply.status() != ConnectStatus::ConnectstatusSuccess {
            return Err(TekHsiError::Connection(ConnectionError::from(
                reply.status(),
            )));
        }

        let mut symbols = reply.symbolnames;
        symbols.iter_mut().for_each(|s| s.make_ascii_lowercase());
        Ok(symbols)
    }
}

impl Drop for TekHsiClient {
    fn drop(&mut self) {
        if self.disconnect_token.swap(true, Ordering::SeqCst) {
            return;
        }

        if let Some(handle) = self.take_subscription() {
            handle.shutdown.cancel();
        }

        let name = self.client_name.clone();
        let channel = self.channel.clone();
        if let Ok(handle) = tokio::runtime::Handle::try_current() {
            handle.spawn(async move {
                if let Err(err) = new_connect_client(channel, &name)
                    .disconnect(ConnectRequest {
                        name: name.to_string(),
                    })
                    .await
                {
                    warn!("disconnect failed: {}", err);
                }
            });
        }
    }
}