Skip to main content

hdds_micro/core/
reader.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! MicroReader - DDS DataReader for embedded
5
6use crate::error::{Error, Result};
7use crate::rtps::submessages::Data;
8use crate::rtps::{EntityId, GuidPrefix, RtpsHeader, GUID};
9use crate::transport::Transport;
10use crate::MAX_PACKET_SIZE;
11
12/// Sample received from reader
13#[derive(Debug)]
14pub struct Sample<'a> {
15    /// Source writer GUID
16    pub writer_guid: GUID,
17
18    /// Sequence number
19    pub sequence_number: crate::rtps::SequenceNumber,
20
21    /// Payload (CDR-encoded)
22    pub payload: &'a [u8],
23}
24
25/// MicroReader - DDS DataReader
26///
27/// Receives data samples from a topic.
28///
29/// # Design
30///
31/// - BEST_EFFORT QoS (no acknowledgments)
32/// - No history cache (process immediately)
33/// - Fixed-size receive buffer
34///
35/// # Example
36///
37/// ```ignore
38/// let reader = MicroReader::new(
39///     participant.guid_prefix(),
40///     reader_entity_id,
41///     "Temperature",
42/// );
43///
44/// // Read sample
45/// if let Some(sample) = reader.read(participant.transport_mut())? {
46///     let mut decoder = CdrDecoder::new(sample.payload);
47///     let temp: f32 = decoder.decode_f32()?;
48///     let timestamp: i64 = decoder.decode_i64()?;
49/// }
50/// ```
51#[derive(Debug, PartialEq)]
52pub struct MicroReader {
53    /// Reader GUID
54    guid: GUID,
55
56    /// Topic name
57    topic_name: [u8; 64],
58    topic_len: usize,
59
60    /// Receive buffer (reusable)
61    rx_buffer: [u8; MAX_PACKET_SIZE],
62}
63
64impl MicroReader {
65    /// Create a new reader
66    ///
67    /// # Arguments
68    ///
69    /// * `guid_prefix` - Participant's GUID prefix
70    /// * `entity_id` - Reader's entity ID
71    /// * `topic_name` - Topic name (max 63 chars)
72    pub fn new(guid_prefix: GuidPrefix, entity_id: EntityId, topic_name: &str) -> Result<Self> {
73        if topic_name.len() > 63 {
74            return Err(Error::InvalidParameter);
75        }
76
77        let mut topic_name_buf = [0u8; 64];
78        topic_name_buf[0..topic_name.len()].copy_from_slice(topic_name.as_bytes());
79
80        Ok(Self {
81            guid: GUID::new(guid_prefix, entity_id),
82            topic_name: topic_name_buf,
83            topic_len: topic_name.len(),
84            rx_buffer: [0u8; MAX_PACKET_SIZE],
85        })
86    }
87
88    /// Get reader GUID
89    pub const fn guid(&self) -> GUID {
90        self.guid
91    }
92
93    /// Get topic name
94    pub fn topic_name(&self) -> &str {
95        core::str::from_utf8(&self.topic_name[0..self.topic_len]).unwrap_or("")
96    }
97
98    /// Read a sample (non-blocking)
99    ///
100    /// Returns `None` if no sample available.
101    pub fn read<T: Transport>(&mut self, transport: &mut T) -> Result<Option<Sample<'_>>> {
102        // Try to receive packet
103        let (bytes_received, _source_locator) = match transport.try_recv(&mut self.rx_buffer) {
104            Ok(result) => result,
105            Err(Error::ResourceExhausted) => return Ok(None), // No packet available
106            Err(e) => return Err(e),
107        };
108
109        // Parse RTPS header
110        let header = RtpsHeader::decode(&self.rx_buffer[0..bytes_received])?;
111
112        // Parse DATA submessage
113        let (data, payload_offset) = Data::decode(&self.rx_buffer[RtpsHeader::SIZE..])?;
114
115        // Filter by entity ID (if specified)
116        if data.reader_id != EntityId::UNKNOWN && data.reader_id != self.guid.entity_id {
117            return Ok(None); // Not for us
118        }
119
120        // Extract payload
121        let payload_start = RtpsHeader::SIZE + payload_offset;
122        let payload_end = bytes_received;
123
124        if payload_start >= payload_end {
125            return Err(Error::DecodingError);
126        }
127
128        let payload = &self.rx_buffer[payload_start..payload_end];
129
130        // Build writer GUID
131        let writer_guid = GUID::new(header.guid_prefix, data.writer_id);
132
133        Ok(Some(Sample {
134            writer_guid,
135            sequence_number: data.writer_sn,
136            payload,
137        }))
138    }
139
140    // Note: Blocking read is not provided due to borrow checker limitations.
141    // In embedded environments, you should call `read()` in a loop with
142    // appropriate sleep/yield between iterations.
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use crate::transport::NullTransport;
149
150    #[test]
151    fn test_reader_creation() {
152        let reader = MicroReader::new(
153            GuidPrefix::new([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
154            EntityId::new([0, 0, 0, 0xc7]),
155            "TestTopic",
156        )
157        .unwrap();
158
159        assert_eq!(reader.topic_name(), "TestTopic");
160    }
161
162    #[test]
163    fn test_reader_no_data() {
164        let mut reader =
165            MicroReader::new(GuidPrefix::default(), EntityId::default(), "TestTopic").unwrap();
166
167        let mut transport = NullTransport::default();
168
169        let result = reader.read(&mut transport).unwrap();
170        assert!(result.is_none());
171    }
172
173    #[test]
174    fn test_reader_topic_name_too_long() {
175        let long_name = "a".repeat(100);
176        let result = MicroReader::new(GuidPrefix::default(), EntityId::default(), &long_name);
177
178        assert_eq!(result, Err(Error::InvalidParameter));
179    }
180}