use crate::protocols::idn::dac::{Addressed, ServerInfo};
use crate::protocols::idn::error::{CommunicationError, ProtocolError, ResponseError, Result};
use crate::protocols::idn::protocol::{
AcknowledgeResponse, ChannelConfigHeader, ChannelMessageHeader, GroupRequest, GroupResponse,
PacketHeader, ParameterGetRequest, ParameterResponse, ParameterSetRequest, Point, ReadBytes,
ReadFromBytes, SampleChunkHeader, SizeBytes, WriteBytes, EXTENDED_SAMPLE_SIZE,
IDNCMD_GROUP_REQUEST, IDNCMD_GROUP_RESPONSE, IDNCMD_PING_REQUEST, IDNCMD_PING_RESPONSE,
IDNCMD_RT_ACKNOWLEDGE, IDNCMD_RT_CNLMSG, IDNCMD_RT_CNLMSG_ACKREQ, IDNCMD_RT_CNLMSG_CLOSE,
IDNCMD_RT_CNLMSG_CLOSE_ACKREQ, IDNCMD_SERVICE_PARAMS_REQUEST, IDNCMD_SERVICE_PARAMS_RESPONSE,
IDNCMD_UNIT_PARAMS_REQUEST, IDNCMD_UNIT_PARAMS_RESPONSE, IDNFLG_CHNCFG_CLOSE,
IDNFLG_CHNCFG_ROUTING, IDNFLG_CONTENTID_CHANNELMSG, IDNFLG_CONTENTID_CONFIG_LSTFRG,
IDNVAL_CNKTYPE_LPGRF_FRAME, IDNVAL_CNKTYPE_LPGRF_FRAME_FIRST,
IDNVAL_CNKTYPE_LPGRF_FRAME_SEQUEL, IDNVAL_CNKTYPE_LPGRF_WAVE, IDNVAL_CNKTYPE_VOID,
IDNVAL_SMOD_LPGRF_CONTINUOUS, IDNVAL_SMOD_LPGRF_DISCRETE, MAX_UDP_PAYLOAD, XYRGBI_SAMPLE_SIZE,
XYRGB_HIGHRES_SAMPLE_SIZE,
};
use log::{debug, trace, warn};
use std::io;
use std::net::UdpSocket;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum FrameMode {
#[default]
Wave,
Frame,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PointFormat {
Xyrgbi,
XyrgbHighRes,
Extended,
}
impl PointFormat {
pub fn size_bytes(&self) -> usize {
match self {
PointFormat::Xyrgbi => XYRGBI_SAMPLE_SIZE,
PointFormat::XyrgbHighRes => XYRGB_HIGHRES_SAMPLE_SIZE,
PointFormat::Extended => EXTENDED_SAMPLE_SIZE,
}
}
fn word_count(&self) -> u8 {
(self.descriptors().len() / 2) as u8
}
fn descriptors(&self) -> &'static [u16] {
use crate::protocols::idn::protocol::channel_descriptors;
match self {
PointFormat::Xyrgbi => channel_descriptors::XYRGBI,
PointFormat::XyrgbHighRes => channel_descriptors::XYRGB_HIGHRES,
PointFormat::Extended => channel_descriptors::EXTENDED,
}
}
}
pub const LINK_TIMEOUT: Duration = Duration::from_secs(1);
pub const KEEPALIVE_INTERVAL: Duration = Duration::from_millis(500);
const MIN_SAMPLES_PER_FRAME: usize = 20;
pub struct Stream {
dac: Addressed,
socket: UdpSocket,
client_group: u8,
sequence: u16,
timestamp: u64,
scan_speed: u32,
point_format: PointFormat,
service_data_match: u8,
last_config_time: Option<Instant>,
previous_format: Option<PointFormat>,
frame_count: u64,
packet_buffer: Vec<u8>,
recv_buffer: [u8; 64],
last_send_time: Option<Instant>,
connect_time: Instant,
frame_mode: FrameMode,
}
pub fn connect(server: &ServerInfo, service_id: u8) -> io::Result<Stream> {
connect_with_group(server, service_id, 0)
}
pub fn connect_with_group(
server: &ServerInfo,
service_id: u8,
client_group: u8,
) -> io::Result<Stream> {
let address = server
.primary_address()
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "server has no addresses"))?;
debug!(
"IDN stream: connecting to {} (service_id={}, group={}, address={})",
server.hostname, service_id, client_group, address
);
let service = server
.find_service(service_id)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "service not found"))?;
debug!(
"IDN stream: found service '{}' type={:?} flags=0x{:02x}",
service.name, service.service_type, service.flags
);
let socket = UdpSocket::bind("0.0.0.0:0")?;
let local_addr = socket.local_addr().ok();
socket.connect(address)?;
debug!(
"IDN stream: UDP socket bound to {:?}, connected to {}",
local_addr, address
);
let dac = Addressed::new(server.clone(), service.clone(), *address);
Ok(Stream {
dac,
socket,
client_group: client_group & 0x0F,
sequence: 0,
timestamp: 0,
scan_speed: 30000, point_format: PointFormat::Xyrgbi,
service_data_match: 0,
last_config_time: None,
previous_format: None,
frame_count: 0,
packet_buffer: Vec::with_capacity(MAX_UDP_PAYLOAD * 2),
recv_buffer: [0u8; 64],
last_send_time: None,
connect_time: Instant::now(),
frame_mode: FrameMode::Wave,
})
}
impl Stream {
fn needs_config(
frame_count: u64,
previous_format: Option<PointFormat>,
point_format: PointFormat,
) -> bool {
frame_count == 0 || previous_format != Some(point_format)
}
pub fn dac(&self) -> &Addressed {
&self.dac
}
pub fn scan_speed(&self) -> u32 {
self.scan_speed
}
pub fn set_scan_speed(&mut self, pps: u32) {
self.scan_speed = pps;
}
pub fn point_format(&self) -> PointFormat {
self.point_format
}
pub fn set_point_format(&mut self, format: PointFormat) {
self.point_format = format;
}
pub fn frame_mode(&self) -> FrameMode {
self.frame_mode
}
pub fn set_frame_mode(&mut self, mode: FrameMode) {
self.frame_mode = mode;
}
pub fn needs_keepalive(&self) -> bool {
match self.last_send_time {
Some(last) => last.elapsed() >= KEEPALIVE_INTERVAL,
None => self.frame_count > 0, }
}
pub fn time_since_last_send(&self) -> Option<Duration> {
self.last_send_time.map(|t| t.elapsed())
}
pub fn send_keepalive(&mut self) -> Result<()> {
let content_id =
IDNFLG_CONTENTID_CHANNELMSG | self.channel_id() | IDNVAL_CNKTYPE_VOID as u16;
trace!(
"IDN stream: sending keepalive (seq={}, timestamp={}, time_since_last={:?})",
self.sequence,
self.timestamp,
self.last_send_time.map(|t| t.elapsed()),
);
self.packet_buffer.clear();
let packet_header = PacketHeader {
command: IDNCMD_RT_CNLMSG,
flags: self.client_group,
sequence: self.next_sequence(),
};
self.packet_buffer.write_bytes(packet_header)?;
let channel_msg = ChannelMessageHeader {
total_size: ChannelMessageHeader::SIZE_BYTES as u16,
content_id,
timestamp: (self.timestamp & 0xFFFF_FFFF) as u32,
};
self.packet_buffer.write_bytes(channel_msg)?;
let sent_bytes = self.socket.send(&self.packet_buffer)?;
self.last_send_time = Some(Instant::now());
trace!(
"IDN stream: keepalive sent ({} bytes, content_id=0x{:04x})",
sent_bytes,
content_id
);
Ok(())
}
pub fn write_frame<P: Point>(&mut self, points: &[P]) -> Result<()> {
if points.is_empty() {
trace!("IDN stream: write_frame called with empty points, skipping");
return Ok(());
}
trace!(
"IDN stream: write_frame #{} - {} points, pps={}, format={:?}, time_since_last={:?}",
self.frame_count,
points.len(),
self.scan_speed,
self.point_format,
self.last_send_time.map(|t| t.elapsed()),
);
let padded;
let points = if points.len() < MIN_SAMPLES_PER_FRAME {
trace!(
"IDN stream: padding {} points to minimum {}",
points.len(),
MIN_SAMPLES_PER_FRAME
);
padded = Self::pad_points(points);
&padded[..]
} else {
points
};
if self.scan_speed == 0 {
warn!("IDN stream: scan_speed is 0, cannot send frame");
return Err(CommunicationError::Protocol(
ProtocolError::InvalidPointFormat,
));
}
let now = Instant::now();
let needs_config =
Self::needs_config(self.frame_count, self.previous_format, self.point_format);
if needs_config {
trace!(
"IDN stream: will send config (frame_count={}, prev_format={:?}, cur_format={:?}, last_config={:?})",
self.frame_count,
self.previous_format,
self.point_format,
self.last_config_time.map(|t| t.elapsed()),
);
}
if self.previous_format != Some(self.point_format) {
self.service_data_match = self.service_data_match.wrapping_add(1);
trace!(
"IDN stream: format changed, service_data_match now {}",
self.service_data_match
);
}
if self.frame_count == 0 {
self.timestamp = self.connect_time.elapsed().as_micros() as u64;
trace!(
"IDN stream: first frame, initial timestamp = {} us ({} ms since connect)",
self.timestamp,
self.timestamp / 1000
);
}
let bytes_per_sample = P::SIZE_BYTES;
let service_id = self.dac.service_id();
let mut content_id = IDNFLG_CONTENTID_CHANNELMSG | self.channel_id();
let config_size = if needs_config {
content_id |= IDNFLG_CONTENTID_CONFIG_LSTFRG;
ChannelConfigHeader::SIZE_BYTES + self.point_format.descriptors().len() * 2
} else {
0
};
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ config_size
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / bytes_per_sample;
let num_packets = points.len().div_ceil(max_points_per_packet);
let points_to_send = points.len() / num_packets;
let duration_us = ((points_to_send as u64) * 1_000_000) / (self.scan_speed as u64);
let is_only = num_packets == 1;
let cnk_type = self.chunk_type(true, is_only);
trace!(
"IDN stream: packet layout - service_id={}, channel_id=0x{:04x}, content_id=0x{:04x}, \
bytes_per_sample={}, header_size={}, max_pts/pkt={}, num_packets={}, pts_this_pkt={}, \
duration={}us, timestamp={}",
service_id,
self.channel_id(),
content_id | cnk_type as u16,
bytes_per_sample,
header_size,
max_points_per_packet,
num_packets,
points_to_send,
duration_us,
self.timestamp,
);
self.packet_buffer.clear();
let seq = self.next_sequence();
let packet_header = PacketHeader {
command: IDNCMD_RT_CNLMSG,
flags: self.client_group,
sequence: seq,
};
self.packet_buffer.write_bytes(packet_header)?;
let msg_size = ChannelMessageHeader::SIZE_BYTES
+ config_size
+ SampleChunkHeader::SIZE_BYTES
+ points_to_send * bytes_per_sample;
let channel_msg = ChannelMessageHeader {
total_size: msg_size as u16,
content_id: content_id | cnk_type as u16,
timestamp: (self.timestamp & 0xFFFF_FFFF) as u32,
};
self.packet_buffer.write_bytes(channel_msg)?;
trace!(
"IDN stream: channel_msg total_size={}, content_id=0x{:04x}, timestamp={}",
msg_size,
channel_msg.content_id,
channel_msg.timestamp,
);
if needs_config {
self.write_config(service_id, now)?;
}
let sdm = self.sdm_flags();
let chunk_header = SampleChunkHeader::new(sdm, duration_us as u32);
self.packet_buffer.write_bytes(chunk_header)?;
trace!(
"IDN stream: sample_chunk sdm_flags=0x{:02x}, duration={}us",
sdm,
duration_us
);
for point in points.iter().take(points_to_send) {
self.packet_buffer.write_bytes(point)?;
}
let total_bytes = self.packet_buffer.len();
let sent_bytes = self.socket.send(&self.packet_buffer)?;
self.last_send_time = Some(now);
if sent_bytes != total_bytes {
warn!(
"IDN stream: partial send! wanted {} bytes, sent {}",
total_bytes, sent_bytes
);
}
trace!(
"IDN stream: sent frame #{} packet - seq={}, {} bytes, {} points",
self.frame_count,
seq,
sent_bytes,
points_to_send,
);
self.timestamp += duration_us;
self.frame_count += 1;
if points_to_send < points.len() {
let remaining = points.len() - points_to_send;
trace!(
"IDN stream: {} remaining points, sending continuation",
remaining
);
self.write_frame_continuation(&points[points_to_send..])?;
}
Ok(())
}
fn write_frame_continuation<P: Point>(&mut self, points: &[P]) -> Result<()> {
if points.is_empty() {
return Ok(());
}
let bytes_per_sample = P::SIZE_BYTES;
let content_id = IDNFLG_CONTENTID_CHANNELMSG | self.channel_id();
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / bytes_per_sample;
let num_packets = points.len().div_ceil(max_points_per_packet);
let points_to_send = points.len() / num_packets;
let duration_us = ((points_to_send as u64) * 1_000_000) / (self.scan_speed as u64);
trace!(
"IDN stream: continuation - {} points remaining, sending {}, duration={}us, timestamp={}",
points.len(),
points_to_send,
duration_us,
self.timestamp,
);
self.packet_buffer.clear();
let seq = self.next_sequence();
let packet_header = PacketHeader {
command: IDNCMD_RT_CNLMSG,
flags: self.client_group,
sequence: seq,
};
self.packet_buffer.write_bytes(packet_header)?;
let msg_size = ChannelMessageHeader::SIZE_BYTES
+ SampleChunkHeader::SIZE_BYTES
+ points_to_send * bytes_per_sample;
let cnk_type = self.chunk_type(false, false);
let channel_msg = ChannelMessageHeader {
total_size: msg_size as u16,
content_id: content_id | cnk_type as u16,
timestamp: (self.timestamp & 0xFFFF_FFFF) as u32,
};
self.packet_buffer.write_bytes(channel_msg)?;
let chunk_header = SampleChunkHeader::new(self.sdm_flags(), duration_us as u32);
self.packet_buffer.write_bytes(chunk_header)?;
for point in points.iter().take(points_to_send) {
self.packet_buffer.write_bytes(point)?;
}
let sent_bytes = self.socket.send(&self.packet_buffer)?;
self.last_send_time = Some(Instant::now());
trace!(
"IDN stream: continuation sent - seq={}, {} bytes, {} points",
seq,
sent_bytes,
points_to_send,
);
self.timestamp += duration_us;
if points_to_send < points.len() {
self.write_frame_continuation(&points[points_to_send..])?;
}
Ok(())
}
pub fn write_frame_with_ack<P: Point>(
&mut self,
points: &[P],
timeout: Duration,
) -> Result<AcknowledgeResponse> {
if points.is_empty() {
return Err(CommunicationError::Protocol(ProtocolError::BufferTooSmall));
}
let padded;
let points = if points.len() < MIN_SAMPLES_PER_FRAME {
padded = Self::pad_points(points);
&padded[..]
} else {
points
};
if self.scan_speed == 0 {
return Err(CommunicationError::Protocol(
ProtocolError::InvalidPointFormat,
));
}
let now = Instant::now();
let needs_config =
Self::needs_config(self.frame_count, self.previous_format, self.point_format);
if self.previous_format != Some(self.point_format) {
self.service_data_match = self.service_data_match.wrapping_add(1);
}
if self.frame_count == 0 {
self.timestamp = self.connect_time.elapsed().as_micros() as u64;
}
let bytes_per_sample = P::SIZE_BYTES;
let service_id = self.dac.service_id();
let mut content_id = IDNFLG_CONTENTID_CHANNELMSG | self.channel_id();
let config_size = if needs_config {
content_id |= IDNFLG_CONTENTID_CONFIG_LSTFRG;
ChannelConfigHeader::SIZE_BYTES + self.point_format.descriptors().len() * 2
} else {
0
};
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ config_size
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / bytes_per_sample;
let num_packets = points.len().div_ceil(max_points_per_packet);
let points_to_send = points.len() / num_packets;
let duration_us = ((points_to_send as u64) * 1_000_000) / (self.scan_speed as u64);
self.packet_buffer.clear();
let ack_seq = self.next_sequence();
let packet_header = PacketHeader {
command: IDNCMD_RT_CNLMSG_ACKREQ,
flags: self.client_group,
sequence: ack_seq,
};
self.packet_buffer.write_bytes(packet_header)?;
let msg_size = ChannelMessageHeader::SIZE_BYTES
+ config_size
+ SampleChunkHeader::SIZE_BYTES
+ points_to_send * bytes_per_sample;
let is_only = num_packets == 1;
let cnk_type = self.chunk_type(true, is_only);
let channel_msg = ChannelMessageHeader {
total_size: msg_size as u16,
content_id: content_id | cnk_type as u16,
timestamp: (self.timestamp & 0xFFFF_FFFF) as u32,
};
self.packet_buffer.write_bytes(channel_msg)?;
if needs_config {
self.write_config(service_id, now)?;
}
let chunk_header = SampleChunkHeader::new(self.sdm_flags(), duration_us as u32);
self.packet_buffer.write_bytes(chunk_header)?;
for point in points.iter().take(points_to_send) {
self.packet_buffer.write_bytes(point)?;
}
self.socket.send(&self.packet_buffer)?;
self.last_send_time = Some(now);
self.timestamp += duration_us;
self.frame_count += 1;
let ack = self.recv_acknowledge(timeout, ack_seq)?;
if points_to_send < points.len() {
self.write_frame_continuation(&points[points_to_send..])?;
}
Ok(ack)
}
pub fn recv_acknowledge(
&mut self,
timeout: Duration,
expected_seq: u16,
) -> Result<AcknowledgeResponse> {
trace!(
"IDN stream: waiting for ack (expected_seq={}, timeout={:?})",
expected_seq,
timeout,
);
let ack: AcknowledgeResponse =
self.recv_response(timeout, IDNCMD_RT_ACKNOWLEDGE, expected_seq)?;
trace!(
"IDN stream: received ack - result_code={}, seq={}",
ack.result_code,
expected_seq,
);
if let Some(error) = ResponseError::from_ack_code(ack.result_code) {
warn!("IDN stream: ack error: {:?}", error);
return Err(CommunicationError::Response(error));
}
Ok(ack)
}
pub fn ping(&mut self, timeout: Duration) -> Result<Duration> {
let start = Instant::now();
let seq = self.send_request(IDNCMD_PING_REQUEST, |_| Ok(()))?;
self.recv_response_header(timeout, IDNCMD_PING_RESPONSE, seq)?;
Ok(start.elapsed())
}
pub fn get_client_group_mask(&mut self, timeout: Duration) -> Result<GroupResponse> {
let seq = self.send_request(IDNCMD_GROUP_REQUEST, |buf| {
buf.write_bytes(GroupRequest::get())
})?;
self.recv_response(timeout, IDNCMD_GROUP_RESPONSE, seq)
}
pub fn set_client_group_mask(&mut self, mask: u16, timeout: Duration) -> Result<GroupResponse> {
let seq = self.send_request(IDNCMD_GROUP_REQUEST, |buf| {
buf.write_bytes(GroupRequest::set(mask))
})?;
self.recv_response(timeout, IDNCMD_GROUP_RESPONSE, seq)
}
pub fn get_parameter(
&mut self,
service_id: u8,
param_id: u16,
timeout: Duration,
) -> Result<ParameterResponse> {
let (request_cmd, response_cmd) = param_commands(service_id);
let seq = self.send_request(request_cmd, |buf| {
buf.write_bytes(ParameterGetRequest {
service_id,
reserved: 0,
param_id,
})
})?;
let response: ParameterResponse = self.recv_response(timeout, response_cmd, seq)?;
check_parameter_response(&response)?;
Ok(response)
}
pub fn set_parameter(
&mut self,
service_id: u8,
param_id: u16,
value: u32,
timeout: Duration,
) -> Result<ParameterResponse> {
let (request_cmd, response_cmd) = param_commands(service_id);
let seq = self.send_request(request_cmd, |buf| {
buf.write_bytes(ParameterSetRequest {
service_id,
reserved: 0,
param_id,
value,
})
})?;
let response: ParameterResponse = self.recv_response(timeout, response_cmd, seq)?;
check_parameter_response(&response)?;
Ok(response)
}
pub fn close(&mut self) -> Result<()> {
debug!(
"IDN stream: closing connection (frames sent: {}, timestamp: {})",
self.frame_count, self.timestamp
);
self.send_channel_close()?;
self.packet_buffer.clear();
let close_header = PacketHeader {
command: IDNCMD_RT_CNLMSG_CLOSE,
flags: self.client_group,
sequence: self.next_sequence(),
};
self.packet_buffer.write_bytes(close_header)?;
self.socket.send(&self.packet_buffer)?;
debug!("IDN stream: close complete");
Ok(())
}
pub fn close_with_ack(&mut self, timeout: Duration) -> Result<AcknowledgeResponse> {
self.send_channel_close()?;
self.packet_buffer.clear();
let close_seq = self.next_sequence();
let close_header = PacketHeader {
command: IDNCMD_RT_CNLMSG_CLOSE_ACKREQ,
flags: self.client_group,
sequence: close_seq,
};
self.packet_buffer.write_bytes(close_header)?;
self.socket.send(&self.packet_buffer)?;
self.recv_acknowledge(timeout, close_seq)
}
fn send_channel_close(&mut self) -> Result<()> {
debug!("IDN stream: sending channel close");
let service_id = self.dac.service_id();
let channel_id = self.channel_id();
self.packet_buffer.clear();
let packet_header = PacketHeader {
command: IDNCMD_RT_CNLMSG,
flags: self.client_group,
sequence: self.next_sequence(),
};
self.packet_buffer.write_bytes(packet_header)?;
let content_id = IDNFLG_CONTENTID_CHANNELMSG
| IDNFLG_CONTENTID_CONFIG_LSTFRG
| channel_id
| IDNVAL_CNKTYPE_VOID as u16;
let msg_size = ChannelMessageHeader::SIZE_BYTES + ChannelConfigHeader::SIZE_BYTES;
let channel_msg = ChannelMessageHeader {
total_size: msg_size as u16,
content_id,
timestamp: (self.timestamp & 0xFFFF_FFFF) as u32,
};
self.packet_buffer.write_bytes(channel_msg)?;
let config = ChannelConfigHeader {
word_count: 0,
flags: IDNFLG_CHNCFG_CLOSE,
service_id,
service_mode: 0,
};
self.packet_buffer.write_bytes(config)?;
self.socket.send(&self.packet_buffer)?;
Ok(())
}
fn send_request(
&mut self,
command: u8,
write_body: impl FnOnce(&mut Vec<u8>) -> io::Result<()>,
) -> Result<u16> {
self.packet_buffer.clear();
let seq = self.next_sequence();
let header = PacketHeader {
command,
flags: self.client_group,
sequence: seq,
};
self.packet_buffer.write_bytes(header)?;
write_body(&mut self.packet_buffer)?;
self.socket.send(&self.packet_buffer)?;
self.last_send_time = Some(Instant::now());
Ok(seq)
}
fn recv_response_header(
&mut self,
timeout: Duration,
expected_cmd: u8,
expected_seq: u16,
) -> Result<PacketHeader> {
let len = self.recv_into_buffer(timeout)?;
let mut cursor = &self.recv_buffer[..len];
let header: PacketHeader = cursor.read_bytes()?;
if header.command != expected_cmd {
return Err(CommunicationError::Response(
ResponseError::UnexpectedResponse,
));
}
self.check_sequence(expected_seq, header.sequence)?;
Ok(header)
}
fn recv_response<T: ReadFromBytes + SizeBytes>(
&mut self,
timeout: Duration,
expected_cmd: u8,
expected_seq: u16,
) -> Result<T> {
let len = self.recv_into_buffer(timeout)?;
if len < PacketHeader::SIZE_BYTES + T::SIZE_BYTES {
return Err(CommunicationError::Protocol(ProtocolError::BufferTooSmall));
}
let mut cursor = &self.recv_buffer[..len];
let header: PacketHeader = cursor.read_bytes()?;
if header.command != expected_cmd {
return Err(CommunicationError::Response(
ResponseError::UnexpectedResponse,
));
}
self.check_sequence(expected_seq, header.sequence)?;
let response: T = cursor.read_bytes()?;
Ok(response)
}
fn recv_into_buffer(&mut self, timeout: Duration) -> Result<usize> {
trace!("IDN stream: waiting for response (timeout={:?})", timeout);
self.socket.set_read_timeout(Some(timeout))?;
let len = match self.socket.recv(&mut self.recv_buffer) {
Ok(len) => {
trace!(
"IDN stream: received {} bytes: {:02x?}",
len,
&self.recv_buffer[..len.min(32)]
);
len
}
Err(e)
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
{
debug!("IDN stream: receive timeout ({:?})", timeout);
return Err(CommunicationError::Response(ResponseError::Timeout));
}
Err(e) => {
warn!("IDN stream: receive error: {}", e);
return Err(CommunicationError::Io(e));
}
};
if len < PacketHeader::SIZE_BYTES {
warn!(
"IDN stream: received packet too small ({} bytes, need {})",
len,
PacketHeader::SIZE_BYTES
);
return Err(CommunicationError::Protocol(ProtocolError::BufferTooSmall));
}
Ok(len)
}
fn write_config(&mut self, service_id: u8, now: Instant) -> Result<()> {
let flags = IDNFLG_CHNCFG_ROUTING | self.sdm_flags();
let service_mode = match self.frame_mode {
FrameMode::Wave => IDNVAL_SMOD_LPGRF_CONTINUOUS,
FrameMode::Frame => IDNVAL_SMOD_LPGRF_DISCRETE,
};
let config = ChannelConfigHeader {
word_count: self.point_format.word_count(),
flags,
service_id,
service_mode,
};
trace!(
"IDN stream: write_config - service_id={}, word_count={}, flags=0x{:02x}, \
service_mode=0x{:02x}, format={:?}, descriptors={:?}",
service_id,
config.word_count,
flags,
service_mode,
self.point_format,
self.point_format
.descriptors()
.iter()
.map(|d| format!("0x{:04x}", d))
.collect::<Vec<_>>(),
);
self.packet_buffer.write_bytes(config)?;
for &desc in self.point_format.descriptors() {
self.packet_buffer.push((desc >> 8) as u8);
self.packet_buffer.push(desc as u8);
}
self.last_config_time = Some(now);
self.previous_format = Some(self.point_format);
Ok(())
}
fn chunk_type(&self, is_first: bool, is_only: bool) -> u8 {
match self.frame_mode {
FrameMode::Wave => IDNVAL_CNKTYPE_LPGRF_WAVE,
FrameMode::Frame if is_only => IDNVAL_CNKTYPE_LPGRF_FRAME,
FrameMode::Frame if is_first => IDNVAL_CNKTYPE_LPGRF_FRAME_FIRST,
FrameMode::Frame => IDNVAL_CNKTYPE_LPGRF_FRAME_SEQUEL,
}
}
fn channel_id(&self) -> u16 {
let service_id = self.dac.service_id();
((service_id.saturating_sub(1)) as u16 & 0x3F) << 8
}
fn sdm_flags(&self) -> u8 {
((self.service_data_match & 1) | 2) << 4
}
fn next_sequence(&mut self) -> u16 {
let seq = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
seq
}
fn check_sequence(&self, expected: u16, actual: u16) -> Result<()> {
if actual != expected {
return Err(CommunicationError::Protocol(
ProtocolError::SequenceMismatch { expected, actual },
));
}
Ok(())
}
fn pad_points<P: Point>(points: &[P]) -> Vec<P> {
let last = *points.last().unwrap();
let mut padded = Vec::with_capacity(MIN_SAMPLES_PER_FRAME);
padded.extend_from_slice(points);
padded.resize(MIN_SAMPLES_PER_FRAME, last);
padded
}
}
impl Drop for Stream {
fn drop(&mut self) {
debug!(
"IDN stream: dropping (frames sent: {}, timestamp: {})",
self.frame_count, self.timestamp
);
let _ = self.close();
}
}
fn param_commands(service_id: u8) -> (u8, u8) {
if service_id == 0 {
(IDNCMD_UNIT_PARAMS_REQUEST, IDNCMD_UNIT_PARAMS_RESPONSE)
} else {
(
IDNCMD_SERVICE_PARAMS_REQUEST,
IDNCMD_SERVICE_PARAMS_RESPONSE,
)
}
}
fn check_parameter_response(response: &ParameterResponse) -> Result<()> {
if !response.is_success() {
if let Some(error) = ResponseError::from_ack_code(response.result_code) {
return Err(CommunicationError::Response(error));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn point_format_size_bytes() {
assert_eq!(PointFormat::Xyrgbi.size_bytes(), 8);
assert_eq!(PointFormat::XyrgbHighRes.size_bytes(), 10);
assert_eq!(PointFormat::Extended.size_bytes(), 20);
}
#[test]
fn point_format_word_count() {
assert_eq!(PointFormat::Xyrgbi.word_count(), 4);
assert_eq!(PointFormat::XyrgbHighRes.word_count(), 5);
assert_eq!(PointFormat::Extended.word_count(), 10);
}
#[test]
fn point_format_descriptors_length() {
assert_eq!(PointFormat::Xyrgbi.descriptors().len(), 8);
assert_eq!(PointFormat::XyrgbHighRes.descriptors().len(), 10);
assert_eq!(PointFormat::Extended.descriptors().len(), 20);
}
#[test]
fn point_format_descriptors_not_empty() {
for &desc in PointFormat::Xyrgbi.descriptors() {
assert_ne!(desc, 0);
}
for &desc in PointFormat::XyrgbHighRes.descriptors() {
assert_ne!(desc, 0);
}
for &desc in PointFormat::Extended.descriptors() {
assert_ne!(desc, 0);
}
}
#[test]
fn duration_calculation() {
let points = 1000u64;
let scan_speed = 30000u64;
let duration = (points * 1_000_000) / scan_speed;
assert_eq!(duration, 33333);
let points = 30000u64;
let duration = (points * 1_000_000) / scan_speed;
assert_eq!(duration, 1_000_000);
}
#[test]
fn max_points_per_packet_with_config() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ ChannelConfigHeader::SIZE_BYTES
+ PointFormat::Xyrgbi.descriptors().len() * 2
+ SampleChunkHeader::SIZE_BYTES;
assert_eq!(header_size, 36);
let max_points = (MAX_UDP_PAYLOAD - header_size) / 8;
assert_eq!(max_points, 177);
}
#[test]
fn max_points_per_packet_without_config() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ SampleChunkHeader::SIZE_BYTES;
assert_eq!(header_size, 16);
let max_points_xyrgbi = (MAX_UDP_PAYLOAD - header_size) / 8;
assert_eq!(max_points_xyrgbi, 179);
let max_points_highres = (MAX_UDP_PAYLOAD - header_size) / 10;
assert_eq!(max_points_highres, 143);
let max_points_extended = (MAX_UDP_PAYLOAD - header_size) / 20;
assert_eq!(max_points_extended, 71);
}
#[test]
fn content_id_construction() {
let service_id: u8 = 1;
let channel_id = ((service_id.saturating_sub(1)) as u16 & 0x3F) << 8;
assert_eq!(channel_id, 0x0000);
let service_id: u8 = 2;
let channel_id = ((service_id.saturating_sub(1)) as u16 & 0x3F) << 8;
assert_eq!(channel_id, 0x0100);
let content_id = IDNFLG_CONTENTID_CHANNELMSG | channel_id;
assert_eq!(content_id, 0x8100);
}
#[test]
fn sequence_number_wrapping() {
let mut seq: u16 = u16::MAX - 1;
seq = seq.wrapping_add(1);
assert_eq!(seq, u16::MAX);
seq = seq.wrapping_add(1);
assert_eq!(seq, 0);
seq = seq.wrapping_add(1);
assert_eq!(seq, 1);
}
#[test]
fn service_data_match_wrapping() {
let mut sdm: u8 = 254;
sdm = sdm.wrapping_add(1);
assert_eq!(sdm, 255);
sdm = sdm.wrapping_add(1);
assert_eq!(sdm, 0);
}
#[test]
fn config_needed_on_first_frame() {
assert!(Stream::needs_config(0, None, PointFormat::Xyrgbi));
}
#[test]
fn config_needed_on_format_change() {
assert!(Stream::needs_config(
42,
Some(PointFormat::Xyrgbi),
PointFormat::XyrgbHighRes
));
}
#[test]
fn config_not_needed_for_periodic_refresh_only() {
assert!(!Stream::needs_config(
42,
Some(PointFormat::Xyrgbi),
PointFormat::Xyrgbi
));
}
#[test]
fn timestamp_truncation_to_u32() {
let timestamp: u64 = 0x1_0000_ABCD; let truncated = (timestamp & 0xFFFF_FFFF) as u32;
assert_eq!(truncated, 0x0000_ABCD);
let timestamp: u64 = 0xFFFF_FFFF;
let truncated = (timestamp & 0xFFFF_FFFF) as u32;
assert_eq!(truncated, 0xFFFF_FFFF);
}
use crate::protocols::idn::protocol::PointXyrgbi;
#[test]
fn test_pad_points_pads_to_minimum() {
let points: Vec<PointXyrgbi> = (0..5)
.map(|i| PointXyrgbi::new(i as i16, i as i16, 255, 0, 0, 255))
.collect();
let padded = Stream::pad_points(&points);
assert_eq!(padded.len(), MIN_SAMPLES_PER_FRAME);
for i in 0..5 {
assert_eq!(padded[i], points[i]);
}
let last = points[4];
for p in &padded[5..] {
assert_eq!(*p, last);
}
}
#[test]
fn test_pad_points_single_point() {
let points = vec![PointXyrgbi::new(100, -200, 128, 64, 32, 255)];
let padded = Stream::pad_points(&points);
assert_eq!(padded.len(), MIN_SAMPLES_PER_FRAME);
for p in &padded {
assert_eq!(*p, points[0]);
}
}
#[test]
fn test_pad_points_nineteen() {
let points: Vec<PointXyrgbi> = (0..19)
.map(|i| PointXyrgbi::new(i as i16, 0, 0, 0, 0, 0))
.collect();
let padded = Stream::pad_points(&points);
assert_eq!(padded.len(), MIN_SAMPLES_PER_FRAME);
for i in 0..19 {
assert_eq!(padded[i], points[i]);
}
assert_eq!(padded[19], points[18]);
}
#[test]
fn test_even_distribution_300_points() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ ChannelConfigHeader::SIZE_BYTES
+ PointFormat::Xyrgbi.descriptors().len() * 2
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / PointXyrgbi::SIZE_BYTES;
assert_eq!(max_points_per_packet, 177);
let total = 300usize;
let num_packets = total.div_ceil(max_points_per_packet);
let points_to_send = total / num_packets;
assert_eq!(num_packets, 2);
assert_eq!(points_to_send, 150);
}
#[test]
fn test_even_distribution_500_points() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ ChannelConfigHeader::SIZE_BYTES
+ PointFormat::Xyrgbi.descriptors().len() * 2
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / PointXyrgbi::SIZE_BYTES;
let total = 500usize;
let num_packets = total.div_ceil(max_points_per_packet);
let points_to_send = total / num_packets;
assert_eq!(num_packets, 3);
assert_eq!(points_to_send, 166);
}
#[test]
fn test_even_distribution_small_frame() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ ChannelConfigHeader::SIZE_BYTES
+ PointFormat::Xyrgbi.descriptors().len() * 2
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / PointXyrgbi::SIZE_BYTES;
let total = 50usize;
let num_packets = total.div_ceil(max_points_per_packet);
let points_to_send = total / num_packets;
assert_eq!(num_packets, 1);
assert_eq!(points_to_send, 50);
}
#[test]
fn test_even_distribution_exact_max() {
let header_size = PacketHeader::SIZE_BYTES
+ ChannelMessageHeader::SIZE_BYTES
+ SampleChunkHeader::SIZE_BYTES;
let max_points_per_packet = (MAX_UDP_PAYLOAD - header_size) / PointXyrgbi::SIZE_BYTES;
assert_eq!(max_points_per_packet, 179);
let total = 179usize;
let num_packets = total.div_ceil(max_points_per_packet);
let points_to_send = total / num_packets;
assert_eq!(num_packets, 1);
assert_eq!(points_to_send, 179);
}
}