use super::decode_batch::decode_batch;
use crate::data::Acquisition;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
pub(super) async fn decode_loop(
batch_receiver: kanal::AsyncReceiver<super::BatchDownloadResult>,
recycle_tx: kanal::AsyncSender<super::BatchDownloadResult>,
sender: tokio::sync::broadcast::Sender<Acquisition>,
shutdown: CancellationToken,
) {
while let Ok(mut batch) = batch_receiver.recv().await {
match decode_batch(&mut batch) {
Ok(acquisition) => {
if sender.receiver_count() == 0 || sender.send(acquisition).is_err() {
shutdown.cancel();
break;
}
}
Err(err) => {
warn!("decode failed: {}", err);
}
}
let _ = recycle_tx.try_send(batch);
}
shutdown.cancel();
debug!("decode loop exited");
}
#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
mod tests {
use super::decode_loop;
use crate::acquisition::result::{BatchDownloadResult, SymbolDownloadResult};
use crate::errors::AcquisitionError;
use crate::tekscope::WaveformHeader;
use smol_str::SmolStr;
use tokio::time::{Duration, timeout};
use tokio_util::sync::CancellationToken;
fn base_header() -> WaveformHeader {
WaveformHeader {
sourcename: "ch1".to_string(),
verticalunits: "V".to_string(),
horizontal_units: "s".to_string(),
hasdata: true,
..Default::default()
}
}
#[tokio::test]
async fn decode_loop_cancels_when_no_receivers() {
let (batch_tx, batch_rx) = kanal::bounded_async(1);
let (recycle_tx, _recycle_rx) = kanal::bounded_async(1);
let (sender, receiver) = tokio::sync::broadcast::channel(1);
let shutdown = CancellationToken::new();
drop(receiver);
let task = tokio::spawn(decode_loop(batch_rx, recycle_tx, sender, shutdown.clone()));
let header = WaveformHeader {
wfmtype: 1,
sourcewidth: 1,
noofsamples: 1,
dataid: 1,
..base_header()
};
let batch = BatchDownloadResult {
results: vec![SymbolDownloadResult::Success {
symbol: SmolStr::from("ch1"),
header,
data_chunks: vec![vec![1u8]],
}],
..Default::default()
};
batch_tx.send(batch).await.unwrap();
timeout(Duration::from_secs(2), task)
.await
.expect("decode loop timed out")
.expect("decode loop task failed");
assert!(shutdown.is_cancelled());
}
#[tokio::test]
async fn decode_loop_logs_decode_errors() {
let (batch_tx, batch_rx) = kanal::bounded_async(1);
let (recycle_tx, recycle_rx) = kanal::bounded_async(1);
let (sender, _receiver) = tokio::sync::broadcast::channel(1);
let shutdown = CancellationToken::new();
let task = tokio::spawn(decode_loop(
batch_rx,
recycle_tx.clone(),
sender,
shutdown.clone(),
));
let batch = BatchDownloadResult {
results: vec![SymbolDownloadResult::Failure {
symbol: SmolStr::from("ch1"),
error: AcquisitionError::DownloadFailed {
message: "test".to_string(),
},
}],
..Default::default()
};
batch_tx.send(batch).await.unwrap();
drop(batch_tx);
timeout(Duration::from_secs(2), task)
.await
.expect("decode loop timed out")
.expect("decode loop task failed");
assert!(shutdown.is_cancelled());
let recycled = recycle_rx.recv().await.expect("expected recycled batch");
assert_eq!(recycled.results.len(), 1);
}
}