rustdds 0.11.8

Native Rust DDS implementation with RTPS
Documentation
use std::io;

#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use speedy::{Context, Error, Readable, Writable, Writer};
use enumflags2::BitFlags;
use bytes::Bytes;

use crate::{
  messages::submessages::{elements::parameter_list::ParameterList, submessages::*},
  structure::{
    guid::EntityId,
    sequence_number::{FragmentNumber, SequenceNumber},
  },
};

/// The DataFrag Submessage extends the Data Submessage by enabling the
/// serializedData to be fragmented and sent as multiple DataFrag Submessages.
/// The fragments contained in the DataFrag Submessages are then re-assembled by
/// the RTPS Reader.
#[derive(Debug, PartialEq, Eq, Clone)]
#[cfg_attr(test, derive(Default))]
pub struct DataFrag {
  /// Identifies the RTPS Reader entity that is being informed of the change
  /// to the data-object.
  pub reader_id: EntityId,

  /// Identifies the RTPS Writer entity that made the change to the
  /// data-object.
  pub writer_id: EntityId,

  /// Uniquely identifies the change and the relative order for all changes
  /// made by the RTPS Writer identified by the writerGuid.
  /// Each change gets a consecutive sequence number.
  /// Each RTPS Writer maintains is own sequence number.
  pub writer_sn: SequenceNumber,

  /// Indicates the starting fragment for the series of fragments in
  /// serialized_data. Fragment numbering starts with number 1.
  pub fragment_starting_num: FragmentNumber,

  /// The number of consecutive fragments contained in this Submessage,
  /// starting at fragment_starting_num.
  pub fragments_in_submessage: u16,

  /// The total size in bytes of the original data before fragmentation.
  pub data_size: u32,

  /// The size of an individual fragment in bytes. The maximum fragment size
  /// equals 64K.
  pub fragment_size: u16,

  /// Contains QoS that may affect the interpretation of the message.
  /// Present only if the InlineQosFlag is set in the header.
  pub inline_qos: Option<ParameterList>,

  /// Encapsulation of a consecutive series of fragments, starting at
  /// fragment_starting_num for a total of fragments_in_submessage.
  /// Represents part of the new value of the data-object
  /// after the change.
  ///
  /// If payloads are protected, contains the buffer that decodes to the series
  /// of fragments. In particular, a CryptoHeader, a plaintext buffer or
  /// CryptoContent depending on the transformation kind, and
  /// CryptoFooter, which have been serialized and concatenated.
  ///
  /// Note: RTPS spec says the serialized_payload is of type SerializedPayload,
  /// but that is technically incorrect. It is a fragment of
  /// SerializedPayload. The headers at the beginning of SerializedPayload
  /// appear only at the first fragment. The fragmentation mechanism here
  /// should treat serialized_payload as an opaque stream of bytes.
  pub serialized_payload: Bytes,
}

impl DataFrag {
  // Serialized length of DataFrag submessage without submessage header.
  // This is compatible with the definition of the definition of
  // "octetsToNextHeader" field in RTPS spec v2.5 Section "9.4.5.1 Submessage
  // Header".
  pub fn len_serialized(&self) -> usize {
    2 + // extraFlags (unused in RTPS v2.5)
    2 + // octetsToInlineSos
    4 + // readerId
    4 + // writerId
    8 + // writerSN
    4 + // fragmentStartingNum
    2 + // fragmentsInSubmessage
    2 + // fragmentSize
    4 + // sampleSize
    self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + // QoS ParameterList
    self.serialized_payload.len()
  }

  /// Spec talks about (expected) total number of fragments.
  /// This is technically the last (Expected) fragment number, which should be
  /// the same value.
  pub fn total_number_of_fragments(&self) -> FragmentNumber {
    // RTPS spec v2.5 Section "8.3.8.3.5 Logical Interpretation" defines this as
    // follows "The total number of fragments to expect equals:
    // (dataSize / fragmentSize) + ((dataSize % fragmentSize) ? 1 : 0) "
    //
    // Note: The above formula is a bit suspect, since fragmentSize == 0 seems to
    // be allowed by the spec.

    // This is a integer division with rounding up.
    let frag_size = self.fragment_size as u32;
    if frag_size < 1 {
      FragmentNumber::INVALID
    } else {
      FragmentNumber::new(self.data_size.div_ceil(frag_size))
    }
  }

  pub fn deserialize(buffer: &Bytes, flags: BitFlags<DATAFRAG_Flags>) -> io::Result<Self> {
    let mut cursor = io::Cursor::new(&buffer);
    let endianness = endianness_flag(flags.bits());
    let map_speedy_err = |p: Error| io::Error::new(io::ErrorKind::Other, p);

    let _extra_flags =
      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
    let octets_to_inline_qos =
      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
    let reader_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
      .map_err(map_speedy_err)?;
    let writer_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
      .map_err(map_speedy_err)?;
    let writer_sn = SequenceNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
      .map_err(map_speedy_err)?;
    let fragment_starting_num =
      FragmentNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
        .map_err(map_speedy_err)?;
    let fragments_in_submessage =
      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
    let fragment_size =
      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
    let data_size =
      u32::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;

    let expect_qos = flags.contains(DATAFRAG_Flags::InlineQos);
    // let expect_key = flags.contains(DATAFRAG_Flags::Key);

    // Size of header after "octets_to_inline_qos" field:
    // reader_id: 4
    // writer_id: 4
    // writer_sn: 8
    // fragment_starting_num: 4
    // fragments_in_submessage: 2
    // fragment_size: 2
    // data_size: 4
    //
    // Total: 28

    // Skip any possible fields we do not know about.
    let rtps_v25_header_size: u16 = 28;
    if octets_to_inline_qos < rtps_v25_header_size {
      return Err(io::Error::new(
        io::ErrorKind::Other,
        format!("DataFrag has too low octetsToInlineQos = {octets_to_inline_qos}"),
      ));
    }
    // condition to avoid subtract overflow
    if octets_to_inline_qos > rtps_v25_header_size {
      let extra_octets = octets_to_inline_qos - rtps_v25_header_size;
      cursor.set_position(cursor.position() + u64::from(extra_octets));

      if cursor.position() > buffer.len().try_into().unwrap() {
        // octets_to_inline_qos told us to skip past the end of the message.
        // This is a malformed message.
        return Err(io::Error::new(
          io::ErrorKind::InvalidData,
          format!(
            "DATAFRAG submessage octets_to_inline_qos points to byte {}, but message len={}.",
            cursor.position(),
            buffer.len()
          ),
        ));
      }
    }

    let inline_qos = if expect_qos {
      Some(
        ParameterList::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
          .map_err(map_speedy_err)?,
      )
    } else {
      None
    };

    // Validity checks from RTPS spec v2.5 Section 8.3.8.3.3 "Validity"

    // writer_sn strictly positive
    if writer_sn < SequenceNumber::new(1) {
      return Err(io::Error::new(
        io::ErrorKind::Other,
        "DataFrag SequenceNumber < 1. Discarding as invalid.",
      ));
    }

    // Fragment size == 0 is not strictly forbidden in the RTPS spec, but
    // it smells so fishy that we just drop it to avoid confusion later.
    // TODO:
    // Is there any valid use case for sending zero-sized fragments?
    // Sending zero-sized data may be ok, but it could be sent as zero fragments of
    // some positive size, or preferably as non-fragmented DATA submessage.
    if fragment_size < 1 || (fragment_size as u32) > data_size {
      return Err(io::Error::new(
        io::ErrorKind::Other,
        format!(
          "Invalid DataFrag. fragment_size={fragment_size} data_size={data_size}  Expected 1 <= \
           fragment_size <= data_size."
        ),
      ));
    }

    // Payload should be always present, be it data or key fragments.
    let serialized_payload = buffer.clone().split_off(cursor.position() as usize);

    let datafrag = Self {
      reader_id,
      writer_id,
      writer_sn,
      fragment_starting_num,
      fragments_in_submessage,
      data_size,
      fragment_size,
      inline_qos,
      serialized_payload,
    };

    // fragment_starting_num strictly positive and must not exceed total number of
    // fragments

    let expected_total = datafrag.total_number_of_fragments();
    if fragment_starting_num < FragmentNumber::new(1) || fragment_starting_num > expected_total {
      return Err(io::Error::new(
        io::ErrorKind::Other,
        format!(
          "DataFrag fragmentStartingNum={fragment_starting_num:?} \
           expected_total={expected_total:?}.  Expected 1 <= fragmentStartingNum <= \
           expected_total.  Discarding as invalid."
        ),
      ));
    }

    Ok(datafrag)
  }
}

impl<C: Context> Writable<C> for DataFrag {
  fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
    writer.write_u16(0)?; // extraflags
    writer.write_u16(28)?; // See calculation of this value in deserialization above.
                           // We always write constant 28 here, because this implementation does not (yet)
                           // write any fields between sampleSize ( = data_size)
                           // and inline QoS. If some future protocol version adds fields there, then this
                           // must be changed.
    writer.write_value(&self.reader_id)?;
    writer.write_value(&self.writer_id)?;
    writer.write_value(&self.writer_sn)?;
    writer.write_value(&self.fragment_starting_num)?;
    writer.write_value(&self.fragments_in_submessage)?;
    writer.write_value(&self.fragment_size)?;
    writer.write_value(&self.data_size)?;
    if self.inline_qos.is_some() && !self.inline_qos.as_ref().unwrap().parameters.is_empty() {
      writer.write_value(&self.inline_qos)?;
    }
    writer.write_bytes(&self.serialized_payload)?;
    Ok(())
  }
}

impl HasEntityIds for DataFrag {
  fn receiver_entity_id(&self) -> EntityId {
    self.reader_id
  }
  fn sender_entity_id(&self) -> EntityId {
    self.writer_id
  }
}