use crate::errors::{AcquisitionError, TekHsiError};
use crate::tekscope::{
WaveformRequest, WfmReplyStatus, raw_reply::data_or_header_access::Value as RawValue,
};
use crate::validation::expected_payload_len;
use smol_str::SmolStr;
use std::collections::HashMap;
use super::client::SymbolClients;
use super::result::{BatchDownloadResult, SymbolDownloadResult};
type DataChunks = Vec<Vec<u8>>;
pub(super) struct SymbolFetcher {
pool: HashMap<SmolStr, Vec<DataChunks>>,
}
impl SymbolFetcher {
pub(super) fn new(slot_count: usize) -> Self {
Self {
pool: HashMap::with_capacity_and_hasher(slot_count, Default::default()),
}
}
pub(super) fn preallocate_pool(&mut self, symbol: &SmolStr, slots: usize) {
self.pool.insert(symbol.clone(), Vec::with_capacity(slots));
}
pub(super) fn recycle(&mut self, batch: BatchDownloadResult) {
for result in batch.results {
if let SymbolDownloadResult::Success {
symbol,
data_chunks,
..
} = result
{
if let Some(existing) = self.pool.get_mut(&symbol) {
existing.push(data_chunks);
} else {
self.pool.insert(symbol, vec![data_chunks]);
}
}
}
}
pub(super) async fn fetch_symbols(
&mut self,
clients: &[SymbolClients],
chunk_size: u32,
) -> Result<BatchDownloadResult, TekHsiError> {
let mut join_set: tokio::task::JoinSet<Result<SymbolDownloadResult, AcquisitionError>> =
tokio::task::JoinSet::new();
for symbol_client in clients {
let client = symbol_client.clone();
let chunk_storage = if let Some(r) = self.pool.get_mut(&client.symbol) {
r.pop().unwrap_or_else(Vec::new)
} else {
Vec::new()
};
join_set
.spawn(async move { Self::fetch_symbol(client, chunk_size, chunk_storage).await });
}
let mut results = Vec::with_capacity(clients.len());
while let Some(result) = join_set.join_next().await {
let result = match result {
Ok(Ok(fetch_result)) => fetch_result,
Ok(Err(ae)) => ae.into(),
Err(err) => err.into(),
};
results.push(result);
}
Ok(BatchDownloadResult {
results,
..Default::default()
})
}
async fn fetch_symbol(
mut clients: SymbolClients,
chunk_size: u32,
mut chunk_storage: DataChunks,
) -> Result<SymbolDownloadResult, AcquisitionError> {
let headers = clients.header_client.get_header(WaveformRequest {
sourcename: clients.symbol.to_string(),
chunksize: chunk_size,
});
let data = clients.data_client.get_waveform(WaveformRequest {
sourcename: clients.symbol.to_string(),
chunksize: chunk_size,
});
let (header_reply, data_stream_reply) = tokio::join!(headers, data);
let header_reply = match header_reply {
Ok(reply) => reply.into_inner(),
Err(e) => {
return Err(AcquisitionError::HeaderRequestFailed {
status: format!("gRPC error: {}", e),
});
}
};
if header_reply.status() != WfmReplyStatus::WfmreplystatusSuccess {
return Err(AcquisitionError::HeaderRequestFailed {
status: format!("Reply status: {:?}", header_reply.status()),
});
}
let mut data_stream = match data_stream_reply {
Ok(s) => s.into_inner(),
Err(e) => {
return Err(AcquisitionError::DownloadFailed {
message: format!("gRPC error: {}", e),
});
}
};
let mut header = match header_reply.headerordata {
Some(access) => match access.value {
Some(RawValue::Header(header)) => header,
_ => {
return Err(AcquisitionError::HeaderRequestFailed {
status: format!(
"Unexpected response to header request: {:?}",
access.value
),
});
}
},
None => {
return Err(AcquisitionError::HeaderRequestFailed {
status: "Server did not provide header payload".into(),
});
}
};
header.sourcename.make_ascii_lowercase();
let expected_len = expected_payload_len(&header);
let expected_chunks = if expected_len > 0 {
debug_assert!(chunk_size > 0, "Chunk size must be positive");
expected_len.div_ceil(chunk_size as usize)
} else {
0
};
let mut data_chunks = if expected_chunks > 0 {
Vec::with_capacity(expected_chunks)
} else {
Vec::new()
};
while let Ok(Some(reply_result)) = data_stream.message().await {
if reply_result.status() != WfmReplyStatus::WfmreplystatusSuccess {
return Err(AcquisitionError::from(reply_result.status()));
}
if let Some(access) = reply_result.headerordata
&& let Some(RawValue::Chunk(chunk)) = access.value
{
let mut vec = chunk_storage
.pop()
.unwrap_or_else(|| Vec::with_capacity(chunk.data.len()));
vec.clear();
vec.extend_from_slice(&chunk.data);
data_chunks.push(vec);
}
}
let result = SymbolDownloadResult::Success {
symbol: clients.symbol.clone(),
header,
data_chunks,
};
Ok(result)
}
}
#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
mod tests {
use super::*;
use crate::tekscope::WaveformHeader;
fn base_header() -> WaveformHeader {
WaveformHeader {
sourcename: "ch1".to_string(),
verticalunits: "V".to_string(),
horizontal_units: "s".to_string(),
hasdata: true,
..Default::default()
}
}
#[test]
fn preallocate_pool_adds_to_map() {
let mut fetcher = SymbolFetcher::new(10);
fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);
assert_eq!(fetcher.pool.len(), 1);
assert!(fetcher.pool.capacity() >= 10);
assert!(fetcher.pool.contains_key(&SmolStr::from("ch1")));
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].capacity(), 2);
}
#[test]
fn recycle_adds_to_pool() {
let mut fetcher = SymbolFetcher::new(10);
fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);
let batch = BatchDownloadResult {
results: vec![SymbolDownloadResult::Success {
symbol: SmolStr::from("ch1"),
header: base_header(),
data_chunks: vec![vec![1, 2, 3], vec![4, 5, 6]],
}],
..Default::default()
};
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);
fetcher.recycle(batch);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 1);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0].len(), 2);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0][0].len(), 3);
}
#[test]
fn recycle_multiple_symbols() {
let mut fetcher = SymbolFetcher::new(10);
fetcher.preallocate_pool(&SmolStr::from("ch1"), 2);
fetcher.preallocate_pool(&SmolStr::from("ch2"), 2);
let batch = BatchDownloadResult {
results: vec![
SymbolDownloadResult::Success {
symbol: SmolStr::from("ch1"),
header: base_header(),
data_chunks: vec![vec![1, 2, 3], vec![4, 5, 6]],
},
SymbolDownloadResult::Success {
symbol: SmolStr::from("ch2"),
header: base_header(),
data_chunks: vec![vec![4, 5, 6, 7], vec![7, 8, 9]],
},
],
..Default::default()
};
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 0);
assert_eq!(fetcher.pool[&SmolStr::from("ch2")].len(), 0);
fetcher.recycle(batch);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")].len(), 1);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0].len(), 2);
assert_eq!(fetcher.pool[&SmolStr::from("ch1")][0][0].len(), 3);
assert_eq!(fetcher.pool[&SmolStr::from("ch2")].len(), 1);
assert_eq!(fetcher.pool[&SmolStr::from("ch2")][0].len(), 2);
assert_eq!(fetcher.pool[&SmolStr::from("ch2")][0][0].len(), 4);
}
#[test]
fn symbol_fetcher_new_with_capacity() {
let fetcher = SymbolFetcher::new(10);
assert!(fetcher.pool.capacity() >= 10);
assert!(fetcher.pool.is_empty());
}
}