use std::time::Duration;
use anyhow::{anyhow, Context};
use socketcan::{CANFrame, CANSocket, ShouldRetry};
use crate::{
api::{
error::{foreign_info::ForeignInfo, Result},
SpringError, SpringSourceReaderConfig,
},
pipeline::{CANOptions, Options},
stream_engine::autonomous_executor::{row::CANFrameSourceRow, SourceReader, SourceRow},
};
#[derive(Debug)]
pub(in crate::stream_engine) struct CANSourceReader {
interface: String,
can_socket: CANSocket,
}
impl SourceReader for CANSourceReader {
fn start(options: &Options, config: &SpringSourceReaderConfig) -> Result<Self> {
let options = CANOptions::try_from(options)?;
let interface = &options.interface;
let can_socket = CANSocket::open(interface)
.context(format!(
"failed to open socket CAN interface {}",
&interface
))
.map_err(|e| SpringError::ForeignIo {
source: e,
foreign_info: ForeignInfo::SocketCAN(interface.to_string()),
})?;
can_socket
.set_read_timeout(Duration::from_millis(config.can_read_timeout_msec as u64))
.context(format!(
"failed to set read timeout to CAN socket {}",
&interface
))
.map_err(|e| SpringError::ForeignIo {
source: e,
foreign_info: ForeignInfo::SocketCAN(interface.to_string()),
})?;
log::info!(
"[CANSourceReader] Ready to read CAN frames from {} socket",
&interface
);
Ok(Self {
interface: interface.to_string(),
can_socket,
})
}
fn next_row(&mut self) -> Result<SourceRow> {
let frame = self.can_socket.read_frame().map_err(|io_err| {
if io_err.should_retry() {
SpringError::ForeignSourceTimeout {
source: anyhow::Error::from(io_err),
foreign_info: ForeignInfo::SocketCAN(self.interface.clone()),
}
} else {
SpringError::ForeignIo {
source: anyhow::Error::from(io_err),
foreign_info: ForeignInfo::SocketCAN(self.interface.clone()),
}
}
})?;
self.can_frame_into_row(frame)
}
}
impl CANSourceReader {
fn can_frame_into_row(&self, frame: CANFrame) -> Result<SourceRow> {
if frame.is_rtr() {
unimplemented!("RTR (remote transmission request) frames are not supported");
} else if frame.is_extended() {
unimplemented!("Extended frame format is not supported");
} else if frame.is_error() {
Err(SpringError::ForeignIo {
source: anyhow!("got a error CAN frame (CAN ID: {})", frame.id()),
foreign_info: ForeignInfo::SocketCAN(self.interface.clone()),
})
} else {
Ok(Self::_can_frame_into_row(frame))
}
}
fn _can_frame_into_row(frame: CANFrame) -> SourceRow {
SourceRow::CANFrame(CANFrameSourceRow::new(frame))
}
}
#[cfg(test)]
mod tests {
use crate::stream_engine::{autonomous_executor::row::SchemalessRow, SqlValue};
use super::*;
#[test]
fn test_can_frame_into_row() {
let can_id = 1;
let can_data = &[0x00u8, 0x01];
let frame = CANFrame::new(can_id, can_data, false, false).unwrap();
let source_row = CANSourceReader::_can_frame_into_row(frame);
let row = SchemalessRow::try_from(source_row).unwrap();
if let SqlValue::NotNull(got_can_id) = row.get_by_index(0).unwrap() {
let got_can_id: u32 = got_can_id.unpack().unwrap();
assert_eq!(got_can_id, can_id);
} else {
unreachable!()
}
if let SqlValue::NotNull(got_can_data) = row.get_by_index(1).unwrap() {
let got_can_data: Vec<u8> = got_can_data.unpack().unwrap();
assert_eq!(got_can_data, can_data);
} else {
unreachable!()
}
}
}