use std::{cmp::min, collections::BTreeSet, convert::TryInto, io};
#[allow(unused_imports)]
use log::{debug, error, trace, warn};
use speedy::{Context, Endianness, Readable, Writable, Writer};
use enumflags2::BitFlags;
use bytes::Bytes;
use crate::{
  dds::ddsdata::DDSData,
  messages::{
    header::Header,
    protocol_id::ProtocolId,
    protocol_version::ProtocolVersion,
    submessages::{
      elements::{
        parameter::Parameter, parameter_list::ParameterList, serialized_payload::SerializedPayload,
      },
      submessage::WriterSubmessage,
      submessages::{SubmessageKind, *},
    },
    vendor_id::VendorId,
  },
  rtps::{writer::Writer as RtpsWriter, Submessage, SubmessageBody},
  structure::{
    cache_change::CacheChange,
    entity::RTPSEntity,
    guid::{EntityId, EntityKind, GuidPrefix, GUID},
    parameter_id::ParameterId,
    sequence_number::{FragmentNumber, SequenceNumber, SequenceNumberSet},
    time::Timestamp,
  },
  RepresentationIdentifier,
};
#[cfg(feature = "security")]
use crate::{
  security::{security_plugins::SecurityPluginsHandle, SecurityError},
  security_error,
};
#[cfg(not(feature = "security"))]
use crate::no_security::SecurityPluginsHandle;
#[derive(Debug, Clone)]
pub struct Message {
  pub header: Header,
  pub submessages: Vec<Submessage>,
}
impl Message {
  pub fn add_submessage(&mut self, submessage: Submessage) {
    self.submessages.push(submessage);
  }
  #[cfg(test)]
  pub fn submessages(self) -> Vec<Submessage> {
    self.submessages
  }
  #[cfg(test)]
  pub fn set_header(&mut self, header: Header) {
    self.header = header;
  }
  pub fn read_from_buffer(buffer: &Bytes) -> io::Result<Self> {
    let rtps_header =
      Header::read_from_buffer(buffer).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
    let mut message = Self::new(rtps_header);
    let mut submessages_left: Bytes = buffer.slice(20..); while !submessages_left.is_empty() {
      if let Some(submessage) = Submessage::read_from_buffer(&mut submessages_left)? {
        message.submessages.push(submessage);
      }
    } Ok(message)
  }
}
impl Message {
  pub fn new(header: Header) -> Self {
    Self {
      header,
      submessages: vec![],
    }
  }
}
impl Default for Message {
  fn default() -> Self {
    Self {
      header: Header::new(GuidPrefix::UNKNOWN),
      submessages: vec![],
    }
  }
}
impl<C: Context> Writable<C> for Message {
  fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
    writer.write_value(&self.header)?;
    for x in &self.submessages {
      writer.write_value(&x)?;
    }
    Ok(())
  }
}
#[derive(Default, Clone)]
pub(crate) struct MessageBuilder {
  submessages: Vec<Submessage>,
}
impl MessageBuilder {
  pub fn new() -> Self {
    Self::default()
  }
  pub fn dst_submessage(mut self, endianness: Endianness, guid_prefix: GuidPrefix) -> Self {
    let flags = BitFlags::<INFODESTINATION_Flags>::from_endianness(endianness);
    let submessage_header = SubmessageHeader {
      kind: SubmessageKind::INFO_DST,
      flags: flags.bits(),
      content_length: 12u16,
      };
    let dst_submessage = Submessage {
      header: submessage_header,
      body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoDestination(
        InfoDestination { guid_prefix },
        flags,
      )),
      original_bytes: None,
    };
    self.submessages.push(dst_submessage);
    self
  }
  pub fn ts_msg(mut self, endianness: Endianness, timestamp: Option<Timestamp>) -> Self {
    let mut flags = BitFlags::<INFOTIMESTAMP_Flags>::from_endianness(endianness);
    if timestamp.is_none() {
      flags |= INFOTIMESTAMP_Flags::Invalidate;
    }
    let content_length = match timestamp {
      Some(_) => 8, None => 0,    };
    let submessage_header = SubmessageHeader {
      kind: SubmessageKind::INFO_TS,
      flags: flags.bits(),
      content_length,
    };
    let submsg = Submessage {
      header: submessage_header,
      body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoTimestamp(
        InfoTimestamp { timestamp },
        flags,
      )),
      original_bytes: None,
    };
    self.submessages.push(submsg);
    self
  }
  pub fn data_msg(
    mut self,
    cache_change: &CacheChange,
    reader_entity_id: EntityId, writer_guid: GUID,
    endianness: Endianness,
    _security_plugins: Option<&SecurityPluginsHandle>,
  ) -> Self {
    let writer_entity_id = writer_guid.entity_id;
    let mut param_list = ParameterList::new(); match cache_change.data_value {
      DDSData::Data { .. } | DDSData::DisposeByKey { .. } => (), DDSData::DisposeByKeyHash { key_hash, .. } => {
        param_list.push(Parameter {
          parameter_id: ParameterId::PID_KEY_HASH,
          value: key_hash.to_vec(),
        });
        let status_info = Parameter::create_pid_status_info_parameter(
          true, true, false,
        );
        param_list.push(status_info);
      }
    }
    if let Some(si) = cache_change.write_options.related_sample_identity() {
      let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
      param_list.push(Parameter {
        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
        value: related_sample_identity_serialized,
      });
    }
    let serialized_payload = match cache_change.data_value {
      DDSData::Data {
        ref serialized_payload,
      } => Some(serialized_payload.clone()), DDSData::DisposeByKey { ref key, .. } => Some(key.clone()),
      DDSData::DisposeByKeyHash { .. } => None,
    };
    let serialized_payload = serialized_payload.map(|serialized_payload| {
      if writer_entity_id.kind() == EntityKind::WRITER_WITH_KEY_BUILT_IN {
        SerializedPayload {
          representation_identifier: RepresentationIdentifier::PL_CDR_LE,
          ..serialized_payload
        }
      } else {
        serialized_payload
      }
    });
    #[cfg(not(feature = "security"))]
    let encoded_payload = serialized_payload;
    #[cfg(feature = "security")]
    let encoded_payload = match serialized_payload
      .map(|serialized_payload| {
        serialized_payload
          .write_to_vec()
          .map_err(|e| security_error!("{e:?}"))
          .and_then(|serialized_payload| {
            _security_plugins
              .map(SecurityPluginsHandle::get_plugins)
              .map_or(
                Ok(serialized_payload.clone()),
                |security_plugins| {
                  security_plugins
                    .encode_serialized_payload(serialized_payload, &writer_guid)
                    .map(|(encoded_payload, extra_inline_qos)| {
                      param_list.concat(extra_inline_qos);
                      encoded_payload
                    })
                },
              )
          })
      })
      .transpose()
    {
      Ok(encoded_payload) => encoded_payload,
      Err(e) => {
        error!("{e:?}");
        return self;
      }
    }; let have_inline_qos = !param_list.is_empty(); let inline_qos = if have_inline_qos {
      Some(param_list)
    } else {
      None
    };
    let data_message = Data {
      reader_id: reader_entity_id,
      writer_id: writer_entity_id,
      writer_sn: cache_change.sequence_number,
      inline_qos,
      encoded_payload: encoded_payload.map(Bytes::from),
    };
    let flags: BitFlags<DATA_Flags> = BitFlags::<DATA_Flags>::from_endianness(endianness)
      | (match cache_change.data_value {
        DDSData::Data { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Data),
        DDSData::DisposeByKey { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Key),
        DDSData::DisposeByKeyHash { .. } => {
          BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
        }
      })
      | (if have_inline_qos {
        BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
      } else {
        BitFlags::<DATA_Flags>::empty()
      });
    self.submessages.push(Submessage {
      header: SubmessageHeader {
        kind: SubmessageKind::DATA,
        flags: flags.bits(),
        content_length: data_message.len_serialized() as u16, },
      body: SubmessageBody::Writer(WriterSubmessage::Data(data_message, flags)),
      original_bytes: None,
    });
    self
  }
  #[allow(clippy::too_many_arguments)]
  pub fn data_frag_msg(
    mut self,
    cache_change: &CacheChange,
    reader_entity_id: EntityId,
    writer_guid: GUID,
    fragment_number: FragmentNumber, fragment_size: u16,
    sample_size: u32, endianness: Endianness,
    _security_plugins: Option<&SecurityPluginsHandle>,
  ) -> Self {
    let writer_entity_id = writer_guid.entity_id;
    let mut param_list = ParameterList::new(); match cache_change.data_value {
      DDSData::Data { .. } | DDSData::DisposeByKey { .. } => (), DDSData::DisposeByKeyHash { .. } => {
        error!(
          "data_frag_msg: Called with DDSData::DisposeByKeyHash. This is not legit! Discarding."
        );
        return self;
      }
    }
    if let Some(si) = cache_change.write_options.related_sample_identity() {
      let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
      param_list.parameters.push(Parameter {
        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
        value: related_sample_identity_serialized,
      });
    }
    let have_inline_qos = !param_list.is_empty(); let from_byte: usize = (usize::from(fragment_number) - 1) * usize::from(fragment_size);
    let up_to_before_byte: usize = min(
      usize::from(fragment_number) * usize::from(fragment_size),
      sample_size.try_into().unwrap(),
    );
    let serialized_payload = Vec::from(
      cache_change
        .data_value
        .bytes_slice(from_byte, up_to_before_byte),
    );
    #[cfg(not(feature = "security"))]
    let encoded_payload = serialized_payload;
    #[cfg(feature = "security")]
    let encoded_payload = match _security_plugins
      .map(SecurityPluginsHandle::get_plugins)
      .map_or(
        Ok(serialized_payload.clone()),
        |security_plugins| {
          security_plugins
            .encode_serialized_payload(serialized_payload, &writer_guid)
            .map(|(encoded_payload, extra_inline_qos)| {
              param_list.concat(extra_inline_qos);
              encoded_payload
            })
        },
      ) {
      Ok(encoded_payload) => encoded_payload,
      Err(e) => {
        error!("{e:?}");
        return self;
      }
    }; let data_message = DataFrag {
      reader_id: reader_entity_id,
      writer_id: writer_entity_id,
      writer_sn: cache_change.sequence_number,
      fragment_starting_num: fragment_number,
      fragments_in_submessage: 1,
      data_size: sample_size, fragment_size,
      inline_qos: if have_inline_qos {
        Some(param_list)
      } else {
        None
      },
      encoded_payload: Bytes::from(encoded_payload),
    };
    let flags: BitFlags<DATAFRAG_Flags> =
      BitFlags::<DATAFRAG_Flags>::from_endianness(endianness)
      | (match cache_change.data_value {
        DDSData::Data { .. } => BitFlags::<DATAFRAG_Flags>::empty(),
        DDSData::DisposeByKey { .. } => BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::Key),
        DDSData::DisposeByKeyHash { .. } => unreachable!(),
      })
      | (if have_inline_qos {
        BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::InlineQos)
      } else {
        BitFlags::<DATAFRAG_Flags>::empty()
      });
    self.submessages.push(Submessage {
      header: SubmessageHeader {
        kind: SubmessageKind::DATA_FRAG,
        flags: flags.bits(),
        content_length: data_message.len_serialized() as u16, },
      body: SubmessageBody::Writer(WriterSubmessage::DataFrag(data_message, flags)),
      original_bytes: None,
    });
    self
  }
  pub fn gap_msg(
    mut self,
    irrelevant_sns: &BTreeSet<SequenceNumber>,
    writer_entity_id: EntityId,
    writer_endianness: Endianness,
    reader_guid: GUID,
  ) -> Self {
    match (
      irrelevant_sns.iter().next(),
      irrelevant_sns.iter().next_back(),
    ) {
      (Some(&base), Some(&_top)) => {
        let gap_list = SequenceNumberSet::from_base_and_set(base, irrelevant_sns);
        let gap = Gap {
          reader_id: reader_guid.entity_id,
          writer_id: writer_entity_id,
          gap_start: base,
          gap_list,
        };
        let gap_flags = BitFlags::<GAP_Flags>::from_endianness(writer_endianness);
        gap
          .create_submessage(gap_flags)
          .map(|s| self.submessages.push(s));
      }
      (_, _) => error!("gap_msg called with empty SN set. Skipping GAP submessage"),
    }
    self
  }
  pub fn heartbeat_msg(
    mut self,
    writer: &RtpsWriter,
    reader_entity_id: EntityId,
    set_final_flag: bool,
    set_liveliness_flag: bool,
  ) -> Self {
    let first = writer.first_change_sequence_number;
    let last = writer.last_change_sequence_number;
    let heartbeat = Heartbeat {
      reader_id: reader_entity_id,
      writer_id: writer.entity_id(),
      first_sn: first,
      last_sn: last,
      count: writer.heartbeat_message_counter,
    };
    let mut flags = BitFlags::<HEARTBEAT_Flags>::from_endianness(writer.endianness);
    if set_final_flag {
      flags.insert(HEARTBEAT_Flags::Final);
    }
    if set_liveliness_flag {
      flags.insert(HEARTBEAT_Flags::Liveliness);
    }
    let submessage = heartbeat.create_submessage(flags);
    match submessage {
      Some(sm) => self.submessages.push(sm),
      None => return self,
    }
    self
  }
  pub fn add_header_and_build(self, guid_prefix: GuidPrefix) -> Message {
    Message {
      header: Header {
        protocol_id: ProtocolId::default(),
        protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
        vendor_id: VendorId::THIS_IMPLEMENTATION,
        guid_prefix,
      },
      submessages: self.submessages,
    }
  }
}
#[cfg(test)]
mod tests {
  #![allow(non_snake_case)]
  use log::info;
  use speedy::Writable;
  use super::*;
  #[test]
  fn rtps_message_test_shapes_demo_message_deserialization() {
    let bits1 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
      0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
      0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x1a, 0x15, 0xf3, 0x5e, 0x00,
      0xcc, 0xfb, 0x13, 0x15, 0x05, 0x2c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x07,
      0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
      0x00, 0x04, 0x00, 0x00, 0x00, 0x52, 0x45, 0x44, 0x00, 0x69, 0x00, 0x00, 0x00, 0x17, 0x00,
      0x00, 0x00, 0x1e, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x00, 0x07, 0x00,
      0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x5b, 0x00, 0x00, 0x00, 0x1f, 0x00, 0x00, 0x00,
    ]);
    let rtps = Message::read_from_buffer(&bits1).unwrap();
    info!("{:?}", rtps);
    let serialized = Bytes::from(
      rtps
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits1, serialized);
  }
  #[test]
  fn rtps_message_test_shapes_demo_DataP() {
    let bits2 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
      0xa2, 0x28, 0x20, 0x02, 0x08, 0x15, 0x05, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x00, 0x00, 0x00,
      0x03, 0x00, 0x00, 0x77, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x00, 0x04, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x04, 0x00, 0x02, 0x04, 0x00, 0x00, 0x50, 0x00, 0x10,
      0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x00, 0x00,
      0x01, 0xc1, 0x16, 0x00, 0x04, 0x00, 0x01, 0x03, 0x00, 0x00, 0x44, 0x00, 0x04, 0x00, 0x3f,
      0x0c, 0x00, 0x00, 0x58, 0x00, 0x04, 0x00, 0x3f, 0x0c, 0x00, 0x00, 0x32, 0x00, 0x18, 0x00,
      0x01, 0x00, 0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0xc9, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00,
      0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x00, 0xc0, 0xa8, 0x45, 0x14, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00,
      0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x00, 0xac, 0x11, 0x00, 0x01, 0x33, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xea, 0x1c,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xef,
      0xff, 0x00, 0x01, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00,
      0x01, 0x48, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00, 0x01, 0x34,
      0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0xb0, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00,
      0x02, 0x00, 0x08, 0x00, 0x2c, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
      0x00,
    ]);
    let rtps_data = Message::read_from_buffer(&bits2).unwrap();
    let serialized_data = Bytes::from(
      rtps_data
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits2, serialized_data);
  }
  #[test]
  fn rtps_message_test_shapes_demo_info_TS_dataP() {
    let bits1 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
      0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
      0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
      0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
      0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
      0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
      0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
      0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
      0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
      0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
      0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
    ]);
    let rtps = Message::read_from_buffer(&bits1).unwrap();
    info!("{:?}", rtps);
    let serialized = Bytes::from(
      rtps
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits1, serialized);
  }
  #[test]
  fn rtps_message_test_shapes_demo_info_TS_AckNack() {
    let bits1 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
      0xa2, 0x28, 0x20, 0x02, 0x08, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34,
      0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00,
      0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x04, 0xc7, 0x00, 0x00, 0x04,
      0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
      0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x02, 0x00, 0xc7, 0x00, 0x02, 0x00, 0xc2, 0x00,
      0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
    ]);
    let rtps = Message::read_from_buffer(&bits1).unwrap();
    info!("{:?}", rtps);
    let serialized = Bytes::from(
      rtps
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits1, serialized);
  }
  #[test]
  fn rtps_message_info_ts_and_dataP() {
    let bits1 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
      0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
      0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
      0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
      0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
      0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
      0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
      0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
      0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
      0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
      0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
    ]);
    let rtps = Message::read_from_buffer(&bits1).unwrap();
    info!("{:?}", rtps);
    let serialized = Bytes::from(
      rtps
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits1, serialized);
  }
  #[test]
  fn rtps_message_infoDST_infoTS_Data_w_heartbeat() {
    let bits1 = Bytes::from_static(&[
      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
      0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
      0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x12, 0x15, 0xf3, 0x5e, 0x00,
      0xc8, 0xa9, 0xfa, 0x15, 0x05, 0x0c, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x03, 0xc7,
      0x00, 0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00,
      0x00, 0x2f, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0x68, 0x50,
      0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
      0x00, 0x00, 0x01, 0xc1, 0x05, 0x00, 0x0c, 0x00, 0x07, 0x00, 0x00, 0x00, 0x53, 0x71, 0x75,
      0x61, 0x72, 0x65, 0x00, 0x00, 0x07, 0x00, 0x10, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x53, 0x68,
      0x61, 0x70, 0x65, 0x54, 0x79, 0x70, 0x65, 0x00, 0x00, 0x00, 0x70, 0x00, 0x10, 0x00, 0x01,
      0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02,
      0x5a, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00,
      0x00, 0x00, 0x00, 0x01, 0x02, 0x60, 0x00, 0x04, 0x00, 0x5f, 0x01, 0x00, 0x00, 0x15, 0x00,
      0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00, 0x00, 0x1d,
      0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f,
      0xff, 0xff, 0xff, 0xff, 0x27, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x1b, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff,
      0xff, 0xff, 0x1a, 0x00, 0x0c, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x9a,
      0x99, 0x99, 0x19, 0x2b, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff, 0xff, 0xff,
      0x1f, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00,
      0x00, 0x01, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00, 0x00,
      0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
      0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
    ]);
    let rtps = Message::read_from_buffer(&bits1).unwrap();
    info!("{:?}", rtps);
    let data_submessage = match &rtps.submessages[2] {
      Submessage {
        header: _,
        body: SubmessageBody::Writer(WriterSubmessage::Data(d, _flags)),
        ..
      } => d,
      wtf => panic!("Unexpected message structure {wtf:?}"),
    };
    let serialized_payload = data_submessage
      .no_crypto_decoded()
      .serialized_payload
      .as_ref()
      .unwrap()
      .value
      .clone();
    info!("{:x?}", serialized_payload);
    let serialized = Bytes::from(
      rtps
        .write_to_vec_with_ctx(Endianness::LittleEndian)
        .unwrap(),
    );
    assert_eq!(bits1, serialized);
  }
  #[test]
  fn fuzz_rtps() {
    use hex_literal::hex;
    let bits = Bytes::copy_from_slice(&hex!(
      "
      52 54 50 53
      02 02 ff ff 01 0f 45 d2 b3 f5 58 b9 01 00 00 00
      15 0b 18 00 00 00 00 00 00 00 02 c2 00 00 00 00
      7d 00 00 00 00 01 00 00
    "
    ));
    info!("bytes = {bits:?}");
    let rtps = Message::read_from_buffer(&bits);
    info!("read_from_buffer() --> {rtps:?}");
    }
}