hdds_micro/core/
reader.rs1use crate::error::{Error, Result};
7use crate::rtps::submessages::Data;
8use crate::rtps::{EntityId, GuidPrefix, RtpsHeader, GUID};
9use crate::transport::Transport;
10use crate::MAX_PACKET_SIZE;
11
12#[derive(Debug)]
14pub struct Sample<'a> {
15 pub writer_guid: GUID,
17
18 pub sequence_number: crate::rtps::SequenceNumber,
20
21 pub payload: &'a [u8],
23}
24
25#[derive(Debug, PartialEq)]
52pub struct MicroReader {
53 guid: GUID,
55
56 topic_name: [u8; 64],
58 topic_len: usize,
59
60 rx_buffer: [u8; MAX_PACKET_SIZE],
62}
63
64impl MicroReader {
65 pub fn new(guid_prefix: GuidPrefix, entity_id: EntityId, topic_name: &str) -> Result<Self> {
73 if topic_name.len() > 63 {
74 return Err(Error::InvalidParameter);
75 }
76
77 let mut topic_name_buf = [0u8; 64];
78 topic_name_buf[0..topic_name.len()].copy_from_slice(topic_name.as_bytes());
79
80 Ok(Self {
81 guid: GUID::new(guid_prefix, entity_id),
82 topic_name: topic_name_buf,
83 topic_len: topic_name.len(),
84 rx_buffer: [0u8; MAX_PACKET_SIZE],
85 })
86 }
87
88 pub const fn guid(&self) -> GUID {
90 self.guid
91 }
92
93 pub fn topic_name(&self) -> &str {
95 core::str::from_utf8(&self.topic_name[0..self.topic_len]).unwrap_or("")
96 }
97
98 pub fn read<T: Transport>(&mut self, transport: &mut T) -> Result<Option<Sample<'_>>> {
102 let (bytes_received, _source_locator) = match transport.try_recv(&mut self.rx_buffer) {
104 Ok(result) => result,
105 Err(Error::ResourceExhausted) => return Ok(None), Err(e) => return Err(e),
107 };
108
109 let header = RtpsHeader::decode(&self.rx_buffer[0..bytes_received])?;
111
112 let (data, payload_offset) = Data::decode(&self.rx_buffer[RtpsHeader::SIZE..])?;
114
115 if data.reader_id != EntityId::UNKNOWN && data.reader_id != self.guid.entity_id {
117 return Ok(None); }
119
120 let payload_start = RtpsHeader::SIZE + payload_offset;
122 let payload_end = bytes_received;
123
124 if payload_start >= payload_end {
125 return Err(Error::DecodingError);
126 }
127
128 let payload = &self.rx_buffer[payload_start..payload_end];
129
130 let writer_guid = GUID::new(header.guid_prefix, data.writer_id);
132
133 Ok(Some(Sample {
134 writer_guid,
135 sequence_number: data.writer_sn,
136 payload,
137 }))
138 }
139
140 }
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use crate::transport::NullTransport;
149
150 #[test]
151 fn test_reader_creation() {
152 let reader = MicroReader::new(
153 GuidPrefix::new([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
154 EntityId::new([0, 0, 0, 0xc7]),
155 "TestTopic",
156 )
157 .unwrap();
158
159 assert_eq!(reader.topic_name(), "TestTopic");
160 }
161
162 #[test]
163 fn test_reader_no_data() {
164 let mut reader =
165 MicroReader::new(GuidPrefix::default(), EntityId::default(), "TestTopic").unwrap();
166
167 let mut transport = NullTransport::default();
168
169 let result = reader.read(&mut transport).unwrap();
170 assert!(result.is_none());
171 }
172
173 #[test]
174 fn test_reader_topic_name_too_long() {
175 let long_name = "a".repeat(100);
176 let result = MicroReader::new(GuidPrefix::default(), EntityId::default(), &long_name);
177
178 assert_eq!(result, Err(Error::InvalidParameter));
179 }
180}