use crate::error::{Error, Result};
use crate::rtps::submessages::Data;
use crate::rtps::{EntityId, GuidPrefix, RtpsHeader, GUID};
use crate::transport::Transport;
use crate::MAX_PACKET_SIZE;
#[derive(Debug)]
pub struct Sample<'a> {
pub writer_guid: GUID,
pub sequence_number: crate::rtps::SequenceNumber,
pub payload: &'a [u8],
}
#[derive(Debug, PartialEq)]
pub struct MicroReader {
guid: GUID,
topic_name: [u8; 64],
topic_len: usize,
rx_buffer: [u8; MAX_PACKET_SIZE],
}
impl MicroReader {
pub fn new(guid_prefix: GuidPrefix, entity_id: EntityId, topic_name: &str) -> Result<Self> {
if topic_name.len() > 63 {
return Err(Error::InvalidParameter);
}
let mut topic_name_buf = [0u8; 64];
topic_name_buf[0..topic_name.len()].copy_from_slice(topic_name.as_bytes());
Ok(Self {
guid: GUID::new(guid_prefix, entity_id),
topic_name: topic_name_buf,
topic_len: topic_name.len(),
rx_buffer: [0u8; MAX_PACKET_SIZE],
})
}
pub const fn guid(&self) -> GUID {
self.guid
}
pub fn topic_name(&self) -> &str {
core::str::from_utf8(&self.topic_name[0..self.topic_len]).unwrap_or("")
}
pub fn read<T: Transport>(&mut self, transport: &mut T) -> Result<Option<Sample<'_>>> {
let (bytes_received, _source_locator) = match transport.try_recv(&mut self.rx_buffer) {
Ok(result) => result,
Err(Error::ResourceExhausted) => return Ok(None), Err(e) => return Err(e),
};
let header = RtpsHeader::decode(&self.rx_buffer[0..bytes_received])?;
let (data, payload_offset) = Data::decode(&self.rx_buffer[RtpsHeader::SIZE..])?;
if data.reader_id != EntityId::UNKNOWN && data.reader_id != self.guid.entity_id {
return Ok(None); }
let payload_start = RtpsHeader::SIZE + payload_offset;
let payload_end = bytes_received;
if payload_start >= payload_end {
return Err(Error::DecodingError);
}
let payload = &self.rx_buffer[payload_start..payload_end];
let writer_guid = GUID::new(header.guid_prefix, data.writer_id);
Ok(Some(Sample {
writer_guid,
sequence_number: data.writer_sn,
payload,
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::NullTransport;
#[test]
fn test_reader_creation() {
let reader = MicroReader::new(
GuidPrefix::new([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
EntityId::new([0, 0, 0, 0xc7]),
"TestTopic",
)
.unwrap();
assert_eq!(reader.topic_name(), "TestTopic");
}
#[test]
fn test_reader_no_data() {
let mut reader =
MicroReader::new(GuidPrefix::default(), EntityId::default(), "TestTopic").unwrap();
let mut transport = NullTransport::default();
let result = reader.read(&mut transport).unwrap();
assert!(result.is_none());
}
#[test]
fn test_reader_topic_name_too_long() {
let long_name = "a".repeat(100);
let result = MicroReader::new(GuidPrefix::default(), EntityId::default(), &long_name);
assert_eq!(result, Err(Error::InvalidParameter));
}
}