use crate::{Error, LabjackErrorCode, Result};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc::Sender;
use tokio::time::{timeout, Duration};
pub async fn process_stream(
mut stream: TcpStream,
tx: &Sender<u16>,
timeout_duration: Duration,
) -> Result<()> {
let mut header_buf = [0; 16];
loop {
if let Err(e) = timeout(timeout_duration, stream.read_exact(&mut header_buf)).await? {
return Err(Error::TokioModbusError(tokio_modbus::Error::Transport(e)));
}
#[cfg(debug_assertions)]
{
tracing::debug!("stream header: {:?}", header_buf);
}
let function_code = header_buf[7];
if function_code != 76 {
return Err(Error::Other(format!(
"Unexpected function_code: {}, expected to be 76.",
function_code
)));
}
let status_code = u16::from_be_bytes([header_buf[12], header_buf[13]]);
match status_code.try_into() {
Ok(LabjackErrorCode::LjSuccess) => {}
Ok(LabjackErrorCode::StreamAutoRecoverActive) => {
#[cfg(debug_assertions)]
{
let backlog_bytes = u16::from_be_bytes([header_buf[10], header_buf[11]]);
tracing::debug!(
"Stream buffer overload occured. In auto recovery mode, but continuing scan.
Number of backlog bytes = {}",
backlog_bytes,
);
}
}
Ok(LabjackErrorCode::StreamAutoRecoverEnd) => {
#[cfg(debug_assertions)]
{
let num_scans_skipped = u16::from_be_bytes([header_buf[14], header_buf[15]]);
tracing::debug!(
"Auto recover mode has ended. The number of skipped scans = {}.",
num_scans_skipped,
);
}
}
Ok(LabjackErrorCode::StreamScanOverlap) => {
return Err(Error::from(LabjackErrorCode::StreamScanOverlap));
}
Ok(LabjackErrorCode::StreamAutoRecoverEndOverflow) => {
return Err(Error::from(LabjackErrorCode::StreamAutoRecoverEndOverflow));
}
Ok(LabjackErrorCode::StreamBurstComplete) => {
let num_samples_remaining = u16::from_be_bytes([header_buf[14], header_buf[15]]);
tracing::debug!(
"Burst stream mode ended successfully. Remaining samples to read: {}",
num_samples_remaining
);
let num_bytes_remaining = num_samples_remaining * 2;
let mut data_buf = vec![0; num_bytes_remaining as usize];
if let Err(e) = timeout(timeout_duration, stream.read_exact(&mut data_buf)).await? {
return Err(Error::TokioModbusError(tokio_modbus::Error::Transport(e)));
}
for data_byte in data_buf
.chunks_exact(2)
.map(|chunk| u16::from_be_bytes([chunk[0], chunk[1]]))
{
tx.send(data_byte).await?;
}
return Ok(());
}
Ok(LabjackErrorCode::StreamBufferFull) => {
return Err(Error::from(LabjackErrorCode::StreamBufferFull));
}
Ok(_) => {
tracing::debug!(
"Received unexpected status code from stream: {}",
status_code
);
return Err(Error::UnknownStatusCode(status_code));
}
Err(e) => {
return Err(e);
}
}
let num_bytes = u16::from_be_bytes([header_buf[4], header_buf[5]]) - 10;
#[cfg(debug_assertions)]
{
tracing::debug!("num bytes to read in stream data: {:?}", num_bytes);
}
let mut data_buf = vec![0; num_bytes as usize];
if let Err(e) = timeout(timeout_duration, stream.read_exact(&mut data_buf)).await? {
return Err(Error::TokioModbusError(tokio_modbus::Error::Transport(e)));
}
for data_byte in data_buf
.chunks_exact(2)
.map(|chunk| u16::from_be_bytes([chunk[0], chunk[1]]))
{
tx.send(data_byte).await?;
}
}
}