tekhsi_rs 0.1.1

High-performance client for Tektronix TekHSI enabled oscilloscopes
Documentation
use crate::errors::{AcquisitionError, TekHsiError};
use crate::tekscope::{
    WaveformRequest, WfmReplyStatus, raw_reply::data_or_header_access::Value as RawValue,
};
use crate::validation::expected_payload_len;
use smol_str::SmolStr;
use std::collections::HashMap;

use super::client::SymbolClients;
use super::result::{BatchDownloadResult, SymbolDownloadResult};

type DataChunks = Vec<Vec<u8>>;

pub(super) struct SymbolFetcher {
    pool: HashMap<SmolStr, Vec<DataChunks>>,
}

impl SymbolFetcher {
    pub(super) fn new(slot_count: usize) -> Self {
        Self {
            pool: HashMap::with_capacity_and_hasher(slot_count, Default::default()),
        }
    }

    pub(super) fn preallocate_pool(&mut self, symbol: &SmolStr, slots: usize) {
        self.pool.insert(symbol.clone(), Vec::with_capacity(slots));
    }

    pub(super) fn recycle(&mut self, batch: BatchDownloadResult) {
        for result in batch.results {
            if let SymbolDownloadResult::Success {
                symbol,
                data_chunks,
                ..
            } = result
            {
                if let Some(existing) = self.pool.get_mut(&symbol) {
                    existing.push(data_chunks);
                } else {
                    self.pool.insert(symbol, vec![data_chunks]);
                }
            }
        }
    }

    pub(super) async fn fetch_symbols(
        &mut self,
        clients: &[SymbolClients],
        chunk_size: u32,
    ) -> Result<BatchDownloadResult, TekHsiError> {
        let mut join_set: tokio::task::JoinSet<Result<SymbolDownloadResult, AcquisitionError>> =
            tokio::task::JoinSet::new();

        for symbol_client in clients {
            let client = symbol_client.clone();

            let chunk_storage = if let Some(r) = self.pool.get_mut(&client.symbol) {
                r.pop().unwrap_or_else(Vec::new)
            } else {
                Vec::new()
            };

            join_set
                .spawn(async move { Self::fetch_symbol(client, chunk_size, chunk_storage).await });
        }

        let mut results = Vec::with_capacity(clients.len());
        while let Some(result) = join_set.join_next().await {
            let result = match result {
                Ok(Ok(fetch_result)) => fetch_result,
                Ok(Err(ae)) => ae.into(),
                Err(err) => err.into(),
            };

            results.push(result);
        }

        Ok(BatchDownloadResult {
            results,
            ..Default::default()
        })
    }

    async fn fetch_symbol(
        mut clients: SymbolClients,
        chunk_size: u32,
        mut chunk_storage: DataChunks,
    ) -> Result<SymbolDownloadResult, AcquisitionError> {
        let headers = clients.header_client.get_header(WaveformRequest {
            sourcename: clients.symbol.to_string(),
            chunksize: chunk_size,
        });
        let data = clients.data_client.get_waveform(WaveformRequest {
            sourcename: clients.symbol.to_string(),
            chunksize: chunk_size,
        });

        let (header_reply, data_stream_reply) = tokio::join!(headers, data);

        let header_reply = match header_reply {
            Ok(reply) => reply.into_inner(),
            Err(e) => {
                return Err(AcquisitionError::HeaderRequestFailed {
                    status: format!("gRPC error: {}", e),
                });
            }
        };
        if header_reply.status() != WfmReplyStatus::WfmreplystatusSuccess {
            return Err(AcquisitionError::HeaderRequestFailed {
                status: format!("Reply status: {:?}", header_reply.status()),
            });
        }

        let mut data_stream = match data_stream_reply {
            Ok(s) => s.into_inner(),
            Err(e) => {
                return Err(AcquisitionError::DownloadFailed {
                    message: format!("gRPC error: {}", e),
                });
            }
        };

        let mut header = match header_reply.headerordata {
            Some(access) => match access.value {
                Some(RawValue::Header(header)) => header,
                _ => {
                    return Err(AcquisitionError::HeaderRequestFailed {
                        status: format!(
                            "Unexpected response to header request: {:?}",
                            access.value
                        ),
                    });
                }
            },
            None => {
                return Err(AcquisitionError::HeaderRequestFailed {
                    status: "Server did not provide header payload".into(),
                });
            }
        };
        header.sourcename.make_ascii_lowercase();

        let expected_len = expected_payload_len(&header);
        let expected_chunks = if expected_len > 0 {
            debug_assert!(chunk_size > 0, "Chunk size must be positive");
            expected_len.div_ceil(chunk_size as usize)
        } else {
            0
        };

        let mut data_chunks = if expected_chunks > 0 {
            Vec::with_capacity(expected_chunks)
        } else {
            Vec::new()
        };

        while let Ok(Some(reply_result)) = data_stream.message().await {
            if reply_result.status() != WfmReplyStatus::WfmreplystatusSuccess {
                return Err(AcquisitionError::from(reply_result.status()));
            }

            if let Some(access) = reply_result.headerordata
                && let Some(RawValue::Chunk(chunk)) = access.value
            {
                let mut vec = chunk_storage
                    .pop()
                    .unwrap_or_else(|| Vec::with_capacity(chunk.data.len()));

                vec.clear();
                vec.extend_from_slice(&chunk.data);
                data_chunks.push(vec);
            }
        }

        let result = SymbolDownloadResult::Success {
            symbol: clients.symbol.clone(),
            header,
            data_chunks,
        };

        Ok(result)
    }
}

#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
mod tests {
    use super::*;
    use crate::tekscope::WaveformHeader;

    fn base_header() -> WaveformHeader {
        WaveformHeader {
            sourcename: "ch1".to_string(),
            verticalunits: "V".to_string(),
            horizontal_units: "s".to_string(),
            hasdata: true,
            ..Default::default()
        }
    }

    #[test]
    fn preallocate_pool_adds_to_map() {
        let mut fetcher = SymbolFetcher::new(10);
        fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);

        assert_eq!(fetcher.pool.len(), 1);
        assert!(fetcher.pool.capacity() >= 10);
        assert!(fetcher.pool.contains_key(&SmolStr::from("ch1")));
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].capacity(), 2);
    }

    #[test]
    fn recycle_adds_to_pool() {
        let mut fetcher = SymbolFetcher::new(10);
        fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);
        let batch = BatchDownloadResult {
            results: vec![SymbolDownloadResult::Success {
                symbol: SmolStr::from("ch1"),
                header: base_header(),
                data_chunks: vec![vec![1, 2, 3], vec![4, 5, 6]],
            }],
            ..Default::default()
        };

        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);

        fetcher.recycle(batch);

        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 1);
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0].len(), 2);
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0][0].len(), 3);
    }

    #[test]
    fn recycle_multiple_symbols() {
        let mut fetcher = SymbolFetcher::new(10);
        fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);
        fetcher.preallocate_pool(&SmolStr::from("ch2"), 2);
        let batch = BatchDownloadResult {
            results: vec![
                SymbolDownloadResult::Success {
                    symbol: SmolStr::from("ch1"),
                    header: base_header(),
                    data_chunks: vec![vec![1, 2, 3], vec![4, 5, 6]],
                },
                SymbolDownloadResult::Success {
                    symbol: SmolStr::from("ch2"),
                    header: base_header(),
                    data_chunks: vec![vec![4, 5, 6, 7], vec![7, 8, 9]],
                },
            ],
            ..Default::default()
        };

        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);
        assert_eq!(fetcher.pool[&SmolStr::from("ch2")].len(), 0);

        fetcher.recycle(batch);

        assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 1);
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0].len(), 2);
        assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0][0].len(), 3);

        assert_eq!(fetcher.pool[&SmolStr::from("ch2")].len(), 1);
        assert_eq!(fetcher.pool[&SmolStr::from("ch2")][0].len(), 2);
        assert_eq!(fetcher.pool[&SmolStr::from("ch2")][0][0].len(), 4);
    }

    #[test]
    fn symbol_fetcher_new_with_capacity() {
        let fetcher = SymbolFetcher::new(10);
        assert!(fetcher.pool.capacity() >= 10);
        assert!(fetcher.pool.is_empty());
    }
}