1extern crate alloc;
19use alloc::string::String;
20use alloc::vec::Vec;
21
22use crate::endpoint_security_info::EndpointSecurityInfo;
23use crate::error::WireError;
24use crate::parameter_list::{Parameter, ParameterList, pid};
25use crate::participant_data::{Duration, ENCAPSULATION_PL_CDR_LE};
26use crate::wire_types::Guid;
27
28pub use zerodds_qos::DurabilityKind;
33
34pub use zerodds_qos::ReliabilityKind;
38
39pub use zerodds_qos::ReliabilityQosPolicy as ReliabilityQos;
44
45pub mod data_representation {
51 pub const XCDR: i16 = 0;
54 pub const XML: i16 = 1;
56 pub const XCDR2: i16 = 2;
59
60 pub const DEFAULT_OFFER: [i16; 2] = [XCDR, XCDR2];
78
79 #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91 pub enum DataRepMatchMode {
92 Strict,
94 #[default]
97 Tolerant,
98 }
99
100 #[must_use]
111 pub fn negotiate(
112 writer_offered: &[i16],
113 reader_accepted: &[i16],
114 mode: DataRepMatchMode,
115 ) -> Option<i16> {
116 let w_default = [XCDR];
118 let r_default = [XCDR];
119 let w: &[i16] = if writer_offered.is_empty() {
120 &w_default
121 } else {
122 writer_offered
123 };
124 let r: &[i16] = if reader_accepted.is_empty() {
125 &r_default
126 } else {
127 reader_accepted
128 };
129
130 match mode {
131 DataRepMatchMode::Strict => {
132 let first = w.first().copied()?;
134 if r.contains(&first) {
135 Some(first)
136 } else {
137 None
138 }
139 }
140 DataRepMatchMode::Tolerant => {
141 w.iter().copied().find(|id| r.contains(id))
145 }
146 }
147 }
148
149 #[must_use]
157 pub fn encap_for_final_le(id: i16) -> [u8; 4] {
158 match id {
159 XCDR2 => [0x00, 0x07, 0x00, 0x00], _ => [0x00, 0x01, 0x00, 0x00], }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct PublicationBuiltinTopicData {
168 pub key: Guid,
170 pub participant_key: Guid,
172 pub topic_name: String,
174 pub type_name: String,
176 pub durability: DurabilityKind,
178 pub reliability: ReliabilityQos,
180 pub ownership: zerodds_qos::OwnershipKind,
182 pub ownership_strength: i32,
185 pub liveliness: zerodds_qos::LivelinessQosPolicy,
187 pub deadline: zerodds_qos::DeadlineQosPolicy,
189 pub lifespan: zerodds_qos::LifespanQosPolicy,
191 pub partition: Vec<String>,
193 pub user_data: Vec<u8>,
196 pub topic_data: Vec<u8>,
199 pub group_data: Vec<u8>,
202 pub type_information: Option<Vec<u8>>,
207 pub data_representation: Vec<i16>,
213 pub security_info: Option<EndpointSecurityInfo>,
218 pub service_instance_name: Option<String>,
222 pub related_entity_guid: Option<Guid>,
228 pub topic_aliases: Option<Vec<String>>,
231 pub type_identifier: zerodds_types::TypeIdentifier,
235}
236
237impl PublicationBuiltinTopicData {
238 pub fn to_pl_cdr_le(&self) -> Result<Vec<u8>, WireError> {
245 let mut params = ParameterList::new();
246
247 params.push(Parameter::new(
249 pid::PARTICIPANT_GUID,
250 self.participant_key.to_bytes().to_vec(),
251 ));
252
253 params.push(Parameter::new(
255 pid::ENDPOINT_GUID,
256 self.key.to_bytes().to_vec(),
257 ));
258
259 params.push(Parameter::new(
261 pid::TOPIC_NAME,
262 encode_cdr_string_le(&self.topic_name)?,
263 ));
264
265 params.push(Parameter::new(
267 pid::TYPE_NAME,
268 encode_cdr_string_le(&self.type_name)?,
269 ));
270
271 params.push(Parameter::new(
273 pid::DURABILITY,
274 (self.durability as u32).to_le_bytes().to_vec(),
275 ));
276
277 let mut rel = Vec::with_capacity(12);
279 rel.extend_from_slice(&(self.reliability.kind as u32).to_le_bytes());
280 rel.extend_from_slice(&self.reliability.max_blocking_time.to_bytes_le());
281 params.push(Parameter::new(pid::RELIABILITY, rel));
282
283 params.push(Parameter::new(
285 pid::OWNERSHIP,
286 encode_u32_le(self.ownership as u32).to_vec(),
287 ));
288
289 params.push(Parameter::new(
292 pid::OWNERSHIP_STRENGTH,
293 encode_u32_le(self.ownership_strength as u32).to_vec(),
294 ));
295
296 params.push(Parameter::new(
298 pid::LIVELINESS,
299 encode_liveliness_le(self.liveliness),
300 ));
301
302 params.push(Parameter::new(
304 pid::DEADLINE,
305 encode_duration_le(self.deadline.period).to_vec(),
306 ));
307
308 params.push(Parameter::new(
310 pid::LIFESPAN,
311 encode_duration_le(self.lifespan.duration).to_vec(),
312 ));
313
314 if !self.partition.is_empty() {
316 params.push(Parameter::new(
317 pid::PARTITION,
318 encode_partition_le(&self.partition)?,
319 ));
320 }
321
322 if !self.user_data.is_empty() {
326 params.push(Parameter::new(
327 pid::USER_DATA,
328 encode_octet_seq_le(&self.user_data)?,
329 ));
330 }
331 if !self.topic_data.is_empty() {
332 params.push(Parameter::new(
333 pid::TOPIC_DATA,
334 encode_octet_seq_le(&self.topic_data)?,
335 ));
336 }
337 if !self.group_data.is_empty() {
338 params.push(Parameter::new(
339 pid::GROUP_DATA,
340 encode_octet_seq_le(&self.group_data)?,
341 ));
342 }
343
344 if let Some(ti) = &self.type_information {
347 params.push(Parameter::new(pid::TYPE_INFORMATION, ti.clone()));
348 }
349
350 if let Some(info) = self.security_info {
354 params.push(Parameter::new(
355 pid::ENDPOINT_SECURITY_INFO,
356 info.to_bytes(true).to_vec(),
357 ));
358 }
359
360 if let Some(name) = &self.service_instance_name {
364 params.push(Parameter::new(
365 pid::SERVICE_INSTANCE_NAME,
366 encode_cdr_string_le(name)?,
367 ));
368 }
369 if let Some(guid) = self.related_entity_guid {
370 params.push(Parameter::new(
371 pid::RELATED_ENTITY_GUID,
372 guid.to_bytes().to_vec(),
373 ));
374 }
375 if let Some(aliases) = &self.topic_aliases {
376 params.push(Parameter::new(
377 pid::TOPIC_ALIASES,
378 encode_partition_le(aliases)?,
379 ));
380 }
381
382 if self.type_identifier != zerodds_types::TypeIdentifier::None {
384 let mut w = zerodds_cdr::BufferWriter::new(zerodds_cdr::Endianness::Little);
385 self.type_identifier
386 .encode_into(&mut w)
387 .map_err(|_| WireError::ValueOutOfRange {
388 message: "type_identifier encoding failed",
389 })?;
390 params.push(Parameter::new(pid::ZERODDS_TYPE_ID, w.into_bytes()));
391 }
392
393 if !self.data_representation.is_empty() {
395 let mut dr = Vec::with_capacity(4 + 2 * self.data_representation.len());
396 let len = u32::try_from(self.data_representation.len()).map_err(|_| {
397 WireError::ValueOutOfRange {
398 message: "data_representation length exceeds u32::MAX",
399 }
400 })?;
401 dr.extend_from_slice(&len.to_le_bytes());
402 for rep in &self.data_representation {
403 dr.extend_from_slice(&rep.to_le_bytes());
404 }
405 params.push(Parameter::new(pid::DATA_REPRESENTATION, dr));
406 }
407
408 let mut out = Vec::with_capacity(params.parameters.len() * 24 + 16);
409 out.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
410 out.extend_from_slice(&[0, 0]); out.extend_from_slice(¶ms.to_bytes(true));
412 Ok(out)
413 }
414
415 pub fn from_pl_cdr_le(bytes: &[u8]) -> Result<Self, WireError> {
423 if bytes.len() < 4 {
424 return Err(WireError::UnexpectedEof {
425 needed: 4,
426 offset: 0,
427 });
428 }
429 let little_endian = match &bytes[..2] {
430 b if b == ENCAPSULATION_PL_CDR_LE => true,
431 [0x00, 0x02] => false,
432 other => {
433 return Err(WireError::UnsupportedEncapsulation {
434 kind: [other[0], other[1]],
435 });
436 }
437 };
438 let pl = ParameterList::from_bytes(&bytes[4..], little_endian)?;
439
440 let key = pl
441 .find(pid::ENDPOINT_GUID)
442 .and_then(guid_from_param)
443 .ok_or(WireError::ValueOutOfRange {
444 message: "ENDPOINT_GUID missing or wrong length",
445 })?;
446
447 let participant_key = pl
450 .find(pid::PARTICIPANT_GUID)
451 .and_then(guid_from_param)
452 .unwrap_or_else(|| {
453 Guid::new(key.prefix, crate::wire_types::EntityId::PARTICIPANT)
455 });
456
457 let topic_name = pl
458 .find(pid::TOPIC_NAME)
459 .map(|p| decode_cdr_string(&p.value, little_endian))
460 .transpose()?
461 .ok_or(WireError::ValueOutOfRange {
462 message: "TOPIC_NAME missing",
463 })?;
464
465 let type_name = pl
466 .find(pid::TYPE_NAME)
467 .map(|p| decode_cdr_string(&p.value, little_endian))
468 .transpose()?
469 .ok_or(WireError::ValueOutOfRange {
470 message: "TYPE_NAME missing",
471 })?;
472
473 let durability = pl
474 .find(pid::DURABILITY)
475 .and_then(|p| {
476 if p.value.len() >= 4 {
477 let mut b = [0u8; 4];
478 b.copy_from_slice(&p.value[..4]);
479 Some(DurabilityKind::from_u32(if little_endian {
480 u32::from_le_bytes(b)
481 } else {
482 u32::from_be_bytes(b)
483 }))
484 } else {
485 None
486 }
487 })
488 .unwrap_or_default();
489
490 let reliability = pl
491 .find(pid::RELIABILITY)
492 .and_then(|p| {
493 if p.value.len() >= 12 {
494 let mut k = [0u8; 4];
495 k.copy_from_slice(&p.value[..4]);
496 let kind = ReliabilityKind::from_u32(if little_endian {
497 u32::from_le_bytes(k)
498 } else {
499 u32::from_be_bytes(k)
500 });
501 let mut d = [0u8; 8];
502 d.copy_from_slice(&p.value[4..12]);
503 let max_blocking_time = if little_endian {
504 Duration::from_bytes_le(d)
505 } else {
506 let mut s = [0u8; 4];
508 s.copy_from_slice(&d[..4]);
509 let mut f = [0u8; 4];
510 f.copy_from_slice(&d[4..]);
511 Duration {
512 seconds: i32::from_be_bytes(s),
513 fraction: u32::from_be_bytes(f),
514 }
515 };
516 Some(ReliabilityQos {
517 kind,
518 max_blocking_time,
519 })
520 } else {
521 None
522 }
523 })
524 .unwrap_or_default();
525
526 let ownership = pl
527 .find(pid::OWNERSHIP)
528 .and_then(|p| decode_u32(&p.value, little_endian))
529 .map(zerodds_qos::OwnershipKind::from_u32)
530 .unwrap_or_default();
531
532 let ownership_strength = pl
533 .find(pid::OWNERSHIP_STRENGTH)
534 .and_then(|p| decode_i32(&p.value, little_endian))
535 .unwrap_or(0);
536
537 let liveliness = pl
538 .find(pid::LIVELINESS)
539 .and_then(|p| decode_liveliness(&p.value, little_endian))
540 .unwrap_or_default();
541
542 let deadline = pl
543 .find(pid::DEADLINE)
544 .and_then(|p| decode_duration(&p.value, little_endian))
545 .map(|period| zerodds_qos::DeadlineQosPolicy { period })
546 .unwrap_or_default();
547
548 let lifespan = pl
549 .find(pid::LIFESPAN)
550 .and_then(|p| decode_duration(&p.value, little_endian))
551 .map(|duration| zerodds_qos::LifespanQosPolicy { duration })
552 .unwrap_or_default();
553
554 let partition = pl
555 .find(pid::PARTITION)
556 .and_then(|p| decode_partition(&p.value, little_endian))
557 .unwrap_or_default();
558
559 let user_data = pl
560 .find(pid::USER_DATA)
561 .and_then(|p| decode_octet_seq(&p.value, little_endian))
562 .unwrap_or_default();
563 let topic_data = pl
564 .find(pid::TOPIC_DATA)
565 .and_then(|p| decode_octet_seq(&p.value, little_endian))
566 .unwrap_or_default();
567 let group_data = pl
568 .find(pid::GROUP_DATA)
569 .and_then(|p| decode_octet_seq(&p.value, little_endian))
570 .unwrap_or_default();
571
572 let type_information = pl.find(pid::TYPE_INFORMATION).map(|p| p.value.clone());
573
574 let security_info = pl
575 .find(pid::ENDPOINT_SECURITY_INFO)
576 .and_then(|p| EndpointSecurityInfo::from_bytes(&p.value, little_endian).ok());
577
578 let service_instance_name = pl
579 .find(pid::SERVICE_INSTANCE_NAME)
580 .map(|p| decode_cdr_string(&p.value, little_endian))
581 .transpose()
582 .ok()
583 .flatten();
584 let related_entity_guid = pl.find(pid::RELATED_ENTITY_GUID).and_then(guid_from_param);
585 let topic_aliases = pl
586 .find(pid::TOPIC_ALIASES)
587 .and_then(|p| decode_partition(&p.value, little_endian));
588
589 let type_identifier = pl
590 .find(pid::ZERODDS_TYPE_ID)
591 .and_then(|p| {
592 let mut r =
593 zerodds_cdr::BufferReader::new(&p.value, zerodds_cdr::Endianness::Little);
594 zerodds_types::TypeIdentifier::decode_from(&mut r).ok()
595 })
596 .unwrap_or_default();
597
598 let data_representation = pl
599 .find(pid::DATA_REPRESENTATION)
600 .map(|p| {
601 let v = &p.value;
602 if v.len() < 4 {
603 return Vec::new();
604 }
605 let mut n_bytes = [0u8; 4];
606 n_bytes.copy_from_slice(&v[..4]);
607 let n = if little_endian {
608 u32::from_le_bytes(n_bytes)
609 } else {
610 u32::from_be_bytes(n_bytes)
611 } as usize;
612 let cap = n.min(v.len().saturating_sub(4) / 2);
616 let mut reps = Vec::with_capacity(cap);
617 for i in 0..n {
618 let off = 4 + i * 2;
619 if off + 2 > v.len() {
620 break;
621 }
622 let mut b = [0u8; 2];
623 b.copy_from_slice(&v[off..off + 2]);
624 reps.push(if little_endian {
625 i16::from_le_bytes(b)
626 } else {
627 i16::from_be_bytes(b)
628 });
629 }
630 reps
631 })
632 .unwrap_or_default();
633
634 Ok(Self {
635 key,
636 participant_key,
637 topic_name,
638 type_name,
639 durability,
640 reliability,
641 ownership,
642 ownership_strength,
643 liveliness,
644 deadline,
645 lifespan,
646 partition,
647 user_data,
648 topic_data,
649 group_data,
650 type_information,
651 data_representation,
652 security_info,
653 service_instance_name,
654 related_entity_guid,
655 topic_aliases,
656 type_identifier,
657 })
658 }
659}
660
661pub fn inject_pid_shm_locator(bytes: &mut Vec<u8>, locator_bytes: &[u8]) -> Result<(), WireError> {
686 use crate::parameter_list::pid;
687 if bytes.len() < 4 {
688 return Err(WireError::ValueOutOfRange {
689 message: "inject_pid_shm_locator: bytes too short",
690 });
691 }
692 let sentinel_pos = bytes.len() - 4;
693 if bytes[sentinel_pos..] != [0x01, 0x00, 0x00, 0x00] {
695 return Err(WireError::ValueOutOfRange {
696 message: "inject_pid_shm_locator: missing PID_SENTINEL trailer",
697 });
698 }
699 let padded_len = (locator_bytes.len() + 3) & !3;
701 if padded_len > u16::MAX as usize {
702 return Err(WireError::ValueOutOfRange {
703 message: "inject_pid_shm_locator: locator > u16::MAX",
704 });
705 }
706 let mut inject = Vec::with_capacity(4 + padded_len + 4);
707 inject.extend_from_slice(&pid::SHM_LOCATOR.to_le_bytes());
708 inject.extend_from_slice(&(padded_len as u16).to_le_bytes());
709 inject.extend_from_slice(locator_bytes);
710 inject.resize(inject.len() + (padded_len - locator_bytes.len()), 0);
712 inject.extend_from_slice(&bytes[sentinel_pos..]);
714 bytes.truncate(sentinel_pos);
715 bytes.extend_from_slice(&inject);
716 Ok(())
717}
718
719pub(crate) fn guid_from_param(p: &Parameter) -> Option<Guid> {
720 if p.value.len() == 16 {
721 let mut g = [0u8; 16];
722 g.copy_from_slice(&p.value);
723 Some(Guid::from_bytes(g))
724 } else {
725 None
726 }
727}
728
729pub(crate) fn encode_duration_le(d: Duration) -> [u8; 8] {
735 let mut out = [0u8; 8];
736 out[..4].copy_from_slice(&d.seconds.to_le_bytes());
737 out[4..].copy_from_slice(&d.fraction.to_le_bytes());
738 out
739}
740
741pub(crate) fn decode_duration(value: &[u8], little_endian: bool) -> Option<Duration> {
742 if value.len() < 8 {
743 return None;
744 }
745 let mut s = [0u8; 4];
746 s.copy_from_slice(&value[..4]);
747 let mut f = [0u8; 4];
748 f.copy_from_slice(&value[4..8]);
749 if little_endian {
750 Some(Duration {
751 seconds: i32::from_le_bytes(s),
752 fraction: u32::from_le_bytes(f),
753 })
754 } else {
755 Some(Duration {
756 seconds: i32::from_be_bytes(s),
757 fraction: u32::from_be_bytes(f),
758 })
759 }
760}
761
762pub(crate) fn encode_u32_le(v: u32) -> [u8; 4] {
764 v.to_le_bytes()
765}
766
767pub(crate) fn decode_u32(value: &[u8], little_endian: bool) -> Option<u32> {
768 if value.len() < 4 {
769 return None;
770 }
771 let mut b = [0u8; 4];
772 b.copy_from_slice(&value[..4]);
773 if little_endian {
774 Some(u32::from_le_bytes(b))
775 } else {
776 Some(u32::from_be_bytes(b))
777 }
778}
779
780pub(crate) fn decode_i32(value: &[u8], little_endian: bool) -> Option<i32> {
781 decode_u32(value, little_endian).map(|u| u as i32)
782}
783
784pub(crate) fn encode_liveliness_le(l: zerodds_qos::LivelinessQosPolicy) -> Vec<u8> {
786 let mut out = Vec::with_capacity(12);
787 out.extend_from_slice(&(l.kind as u32).to_le_bytes());
788 out.extend_from_slice(&encode_duration_le(l.lease_duration));
789 out
790}
791
792pub(crate) fn decode_liveliness(
793 value: &[u8],
794 little_endian: bool,
795) -> Option<zerodds_qos::LivelinessQosPolicy> {
796 if value.len() < 12 {
797 return None;
798 }
799 let kind_u = decode_u32(&value[..4], little_endian)?;
800 let lease = decode_duration(&value[4..12], little_endian)?;
801 Some(zerodds_qos::LivelinessQosPolicy {
802 kind: zerodds_qos::LivelinessKind::from_u32(kind_u),
803 lease_duration: lease,
804 })
805}
806
807pub fn encode_octet_seq_le(data: &[u8]) -> Result<Vec<u8>, WireError> {
812 let len = u32::try_from(data.len()).map_err(|_| WireError::ValueOutOfRange {
813 message: "octet sequence length exceeds u32::MAX",
814 })?;
815 let mut out = Vec::with_capacity(4 + data.len() + 3);
816 out.extend_from_slice(&len.to_le_bytes());
817 out.extend_from_slice(data);
818 while out.len() % 4 != 0 {
819 out.push(0);
820 }
821 Ok(out)
822}
823
824pub fn decode_octet_seq(value: &[u8], little_endian: bool) -> Option<Vec<u8>> {
826 let n = decode_u32(value, little_endian)? as usize;
827 if 4 + n > value.len() {
828 return None;
829 }
830 Some(value[4..4 + n].to_vec())
831}
832
833pub(crate) fn encode_partition_le(partitions: &[String]) -> Result<Vec<u8>, WireError> {
834 let mut out = Vec::new();
835 let len = u32::try_from(partitions.len()).map_err(|_| WireError::ValueOutOfRange {
836 message: "partition count exceeds u32::MAX",
837 })?;
838 out.extend_from_slice(&len.to_le_bytes());
839 for p in partitions {
840 out.extend_from_slice(&encode_cdr_string_le(p)?);
845 }
846 Ok(out)
847}
848
849pub(crate) fn decode_partition(value: &[u8], little_endian: bool) -> Option<Vec<String>> {
850 let n = decode_u32(value, little_endian)? as usize;
851 let cap = n.min(value.len().saturating_sub(4) / 5);
855 let mut out = Vec::with_capacity(cap);
856 let mut pos = 4;
857 for _ in 0..n {
858 if pos + 4 > value.len() {
859 return None;
860 }
861 let mut lb = [0u8; 4];
862 lb.copy_from_slice(&value[pos..pos + 4]);
863 let slen = if little_endian {
864 u32::from_le_bytes(lb)
865 } else {
866 u32::from_be_bytes(lb)
867 } as usize;
868 let next_raw_end = pos + 4 + slen;
869 if next_raw_end > value.len() {
870 return None;
871 }
872 let s =
873 decode_cdr_string(&value[pos..next_raw_end.min(value.len())], little_endian).ok()?;
874 out.push(s);
875 let padded_end = (next_raw_end + 3) & !3;
877 pos = padded_end;
878 }
879 Some(out)
880}
881
882pub(crate) fn encode_cdr_string_le(s: &str) -> Result<Vec<u8>, WireError> {
883 let bytes = s.as_bytes();
884 let len =
885 u32::try_from(bytes.len().saturating_add(1)).map_err(|_| WireError::ValueOutOfRange {
886 message: "CDR string length exceeds u32::MAX",
887 })?;
888 let mut out = Vec::with_capacity(4 + bytes.len() + 4);
889 out.extend_from_slice(&len.to_le_bytes());
890 out.extend_from_slice(bytes);
891 out.push(0); while out.len() % 4 != 0 {
894 out.push(0);
895 }
896 Ok(out)
897}
898
899pub(crate) fn decode_cdr_string(value: &[u8], little_endian: bool) -> Result<String, WireError> {
901 if value.len() < 4 {
902 return Err(WireError::UnexpectedEof {
903 needed: 4,
904 offset: 0,
905 });
906 }
907 let mut lb = [0u8; 4];
908 lb.copy_from_slice(&value[..4]);
909 let len = if little_endian {
910 u32::from_le_bytes(lb)
911 } else {
912 u32::from_be_bytes(lb)
913 } as usize;
914 if len == 0 {
915 return Err(WireError::ValueOutOfRange {
916 message: "CDR string length 0 (missing null terminator)",
917 });
918 }
919 if value.len() < 4 + len {
920 return Err(WireError::UnexpectedEof {
921 needed: 4 + len,
922 offset: 0,
923 });
924 }
925 let raw = &value[4..4 + len];
926 if raw[len - 1] != 0 {
927 return Err(WireError::ValueOutOfRange {
928 message: "CDR string missing null terminator",
929 });
930 }
931 String::from_utf8(raw[..len - 1].to_vec()).map_err(|_| WireError::ValueOutOfRange {
932 message: "CDR string is not valid UTF-8",
933 })
934}
935
936#[cfg(test)]
937#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
938mod tests {
939 use super::*;
940
941 #[test]
942 fn durability_try_from_u32_rejects_unknown() {
943 assert_eq!(
944 DurabilityKind::try_from_u32(0),
945 Some(DurabilityKind::Volatile)
946 );
947 assert_eq!(
948 DurabilityKind::try_from_u32(1),
949 Some(DurabilityKind::TransientLocal)
950 );
951 assert_eq!(
952 DurabilityKind::try_from_u32(3),
953 Some(DurabilityKind::Persistent)
954 );
955 assert_eq!(DurabilityKind::try_from_u32(99), None);
956 }
957
958 #[test]
959 fn reliability_try_from_u32_rejects_unknown() {
960 assert_eq!(
961 ReliabilityKind::try_from_u32(1),
962 Some(ReliabilityKind::BestEffort)
963 );
964 assert_eq!(
965 ReliabilityKind::try_from_u32(2),
966 Some(ReliabilityKind::Reliable)
967 );
968 assert_eq!(ReliabilityKind::try_from_u32(0), None);
970 assert_eq!(ReliabilityKind::try_from_u32(42), None);
971 }
972
973 #[test]
974 fn legacy_from_u32_still_defaults_for_sedp_forward_compat() {
975 assert_eq!(DurabilityKind::from_u32(99), DurabilityKind::Volatile);
977 assert_eq!(ReliabilityKind::from_u32(99), ReliabilityKind::BestEffort);
978 }
979 use crate::wire_types::{EntityId, GuidPrefix};
980 use alloc::vec;
981
982 fn sample_data() -> PublicationBuiltinTopicData {
983 PublicationBuiltinTopicData {
984 key: Guid::new(
985 GuidPrefix::from_bytes([1; 12]),
986 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
987 ),
988 participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
989 topic_name: "ChatterTopic".into(),
990 type_name: "std_msgs::String".into(),
991 durability: DurabilityKind::Volatile,
992 reliability: ReliabilityQos {
993 kind: ReliabilityKind::Reliable,
994 max_blocking_time: Duration::from_secs(10),
995 },
996 ownership: zerodds_qos::OwnershipKind::Shared,
997 ownership_strength: 0,
998 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
999 deadline: zerodds_qos::DeadlineQosPolicy::default(),
1000 lifespan: zerodds_qos::LifespanQosPolicy::default(),
1001 partition: alloc::vec::Vec::new(),
1002 user_data: alloc::vec::Vec::new(),
1003 topic_data: alloc::vec::Vec::new(),
1004 group_data: alloc::vec::Vec::new(),
1005 type_information: None,
1006 data_representation: alloc::vec::Vec::new(),
1007 security_info: None,
1008 service_instance_name: None,
1009 related_entity_guid: None,
1010 topic_aliases: None,
1011 type_identifier: zerodds_types::TypeIdentifier::None,
1012 }
1013 }
1014
1015 #[test]
1016 fn roundtrip_le() {
1017 let d = sample_data();
1018 let bytes = d.to_pl_cdr_le().unwrap();
1019 assert_eq!(&bytes[..2], &[0x00, 0x03]); let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1021 assert_eq!(decoded, d);
1022 }
1023
1024 #[test]
1025 fn security_info_roundtrip() {
1026 use crate::endpoint_security_info::{EndpointSecurityInfo, attrs, plugin_attrs};
1027 let mut d = sample_data();
1028 d.security_info = Some(EndpointSecurityInfo {
1029 endpoint_security_attributes: attrs::IS_VALID | attrs::IS_SUBMESSAGE_PROTECTED,
1030 plugin_endpoint_security_attributes: plugin_attrs::IS_VALID
1031 | plugin_attrs::IS_SUBMESSAGE_ENCRYPTED,
1032 });
1033 let bytes = d.to_pl_cdr_le().unwrap();
1034 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1035 assert_eq!(decoded.security_info, d.security_info);
1036 }
1037
1038 #[test]
1039 fn legacy_peer_without_security_info_parses_ok() {
1040 let d = sample_data();
1041 assert!(d.security_info.is_none());
1042 let bytes = d.to_pl_cdr_le().unwrap();
1043 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1044 assert!(decoded.security_info.is_none());
1045 }
1046
1047 #[test]
1048 fn roundtrip_utf8_topic_name() {
1049 let mut d = sample_data();
1050 d.topic_name = "Zählung".into();
1051 let bytes = d.to_pl_cdr_le().unwrap();
1052 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1053 assert_eq!(decoded.topic_name, "Zählung");
1054 }
1055
1056 #[test]
1057 fn decode_rejects_unknown_encapsulation() {
1058 let mut bytes = vec![0xFF, 0xFF, 0x00, 0x00];
1059 bytes.extend_from_slice(&[0u8; 16]);
1060 let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1061 assert!(matches!(
1062 res,
1063 Err(WireError::UnsupportedEncapsulation { .. })
1064 ));
1065 }
1066
1067 #[test]
1068 fn decode_rejects_missing_topic_name() {
1069 let mut pl = ParameterList::new();
1070 pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
1071 pl.push(Parameter::new(
1072 pid::TYPE_NAME,
1073 encode_cdr_string_le("T").unwrap(),
1074 ));
1075 let mut bytes = Vec::new();
1076 bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1077 bytes.extend_from_slice(&[0, 0]);
1078 bytes.extend_from_slice(&pl.to_bytes(true));
1079 let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1080 assert!(
1081 matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TOPIC_NAME"))
1082 );
1083 }
1084
1085 #[test]
1086 fn decode_rejects_missing_type_name() {
1087 let mut pl = ParameterList::new();
1088 pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
1089 pl.push(Parameter::new(
1090 pid::TOPIC_NAME,
1091 encode_cdr_string_le("T").unwrap(),
1092 ));
1093 let mut bytes = Vec::new();
1094 bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1095 bytes.extend_from_slice(&[0, 0]);
1096 bytes.extend_from_slice(&pl.to_bytes(true));
1097 let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1098 assert!(
1099 matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TYPE_NAME"))
1100 );
1101 }
1102
1103 #[test]
1104 fn decode_rejects_missing_endpoint_guid() {
1105 let mut pl = ParameterList::new();
1106 pl.push(Parameter::new(
1107 pid::TOPIC_NAME,
1108 encode_cdr_string_le("T").unwrap(),
1109 ));
1110 pl.push(Parameter::new(
1111 pid::TYPE_NAME,
1112 encode_cdr_string_le("U").unwrap(),
1113 ));
1114 let mut bytes = Vec::new();
1115 bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1116 bytes.extend_from_slice(&[0, 0]);
1117 bytes.extend_from_slice(&pl.to_bytes(true));
1118 let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1119 assert!(
1120 matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("ENDPOINT_GUID"))
1121 );
1122 }
1123
1124 #[test]
1125 fn unknown_pids_are_skipped() {
1126 let mut bytes = sample_data().to_pl_cdr_le().unwrap();
1127 let sentinel_pos = bytes.len() - 4;
1130 let mut inject = vec![0xFFu8, 0x7F, 4, 0, 0xDE, 0xAD, 0xBE, 0xEF];
1131 inject.extend_from_slice(&bytes[sentinel_pos..]);
1132 bytes.truncate(sentinel_pos);
1133 bytes.extend_from_slice(&inject);
1134 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1135 assert_eq!(decoded, sample_data());
1136 }
1137
1138 #[test]
1139 fn inject_pid_shm_locator_appends_before_sentinel() {
1140 let mut locator = Vec::new();
1143 locator.extend_from_slice(&0xDEAD_BEEFu32.to_le_bytes()); locator.extend_from_slice(&1000u32.to_le_bytes()); locator.extend_from_slice(&64u32.to_le_bytes()); locator.extend_from_slice(&4096u32.to_le_bytes()); let path = "/dev/shm/zd-1";
1149 locator.extend_from_slice(&((path.len() as u32) + 1).to_le_bytes());
1150 locator.extend_from_slice(path.as_bytes());
1151 locator.push(0);
1152 let pad = (4 - locator.len() % 4) % 4;
1154 locator.resize(locator.len() + pad, 0);
1155
1156 let mut bytes = sample_data().to_pl_cdr_le().unwrap();
1157 let len_before = bytes.len();
1158 super::inject_pid_shm_locator(&mut bytes, &locator).unwrap();
1159 assert!(bytes.len() > len_before);
1161 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1165 assert_eq!(decoded, sample_data());
1166 let pid_found = bytes.windows(2).any(|w| w == 0x8001u16.to_le_bytes());
1168 assert!(pid_found, "PID_SHM_LOCATOR should appear in bytes");
1169 }
1170
1171 #[test]
1172 fn inject_pid_shm_locator_rejects_missing_sentinel() {
1173 let mut bytes = vec![0u8; 8];
1174 let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
1175 assert!(res.is_err());
1176 }
1177
1178 #[test]
1179 fn inject_pid_shm_locator_rejects_too_short() {
1180 let mut bytes = vec![0u8, 1u8];
1181 let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
1182 assert!(res.is_err());
1183 }
1184
1185 #[test]
1186 fn participant_key_fallback_when_pid_missing() {
1187 let d = sample_data();
1190 let mut pl = ParameterList::new();
1191 pl.push(Parameter::new(
1192 pid::ENDPOINT_GUID,
1193 d.key.to_bytes().to_vec(),
1194 ));
1195 pl.push(Parameter::new(
1196 pid::TOPIC_NAME,
1197 encode_cdr_string_le(&d.topic_name).unwrap(),
1198 ));
1199 pl.push(Parameter::new(
1200 pid::TYPE_NAME,
1201 encode_cdr_string_le(&d.type_name).unwrap(),
1202 ));
1203 let mut bytes = Vec::new();
1204 bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1205 bytes.extend_from_slice(&[0, 0]);
1206 bytes.extend_from_slice(&pl.to_bytes(true));
1207 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1208 assert_eq!(decoded.participant_key.prefix, d.key.prefix);
1209 assert_eq!(decoded.participant_key.entity_id, EntityId::PARTICIPANT);
1210 }
1211
1212 #[test]
1213 fn durability_kind_from_u32_unknown_defaults_volatile() {
1214 assert_eq!(DurabilityKind::from_u32(0), DurabilityKind::Volatile);
1215 assert_eq!(DurabilityKind::from_u32(1), DurabilityKind::TransientLocal);
1216 assert_eq!(DurabilityKind::from_u32(999), DurabilityKind::Volatile);
1217 }
1218
1219 #[test]
1220 fn rpc_discovery_pids_roundtrip() {
1221 let mut d = sample_data();
1222 d.service_instance_name = Some("CalcInstance-1".into());
1223 d.related_entity_guid = Some(Guid::new(
1224 crate::wire_types::GuidPrefix::from_bytes([7; 12]),
1225 crate::wire_types::EntityId::user_reader_with_key([0xAA, 0xBB, 0xCC]),
1226 ));
1227 d.topic_aliases = Some(alloc::vec!["LegacyCalc_Request".into(), "v2_Req".into()]);
1228
1229 let bytes = d.to_pl_cdr_le().unwrap();
1230 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1231 assert_eq!(decoded.service_instance_name, d.service_instance_name);
1232 assert_eq!(decoded.related_entity_guid, d.related_entity_guid);
1233 assert_eq!(decoded.topic_aliases, d.topic_aliases);
1234 }
1235
1236 #[test]
1237 fn rpc_pids_optional_legacy_peer_parses_ok() {
1238 let d = sample_data();
1239 assert!(d.service_instance_name.is_none());
1240 assert!(d.related_entity_guid.is_none());
1241 assert!(d.topic_aliases.is_none());
1242 let bytes = d.to_pl_cdr_le().unwrap();
1243 let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1244 assert!(decoded.service_instance_name.is_none());
1245 assert!(decoded.related_entity_guid.is_none());
1246 assert!(decoded.topic_aliases.is_none());
1247 }
1248
1249 #[test]
1250 fn rpc_pid_constants_in_emitted_bytes() {
1251 let mut d = sample_data();
1255 d.service_instance_name = Some("X".into());
1256 d.related_entity_guid = Some(Guid::new(
1257 crate::wire_types::GuidPrefix::from_bytes([1; 12]),
1258 crate::wire_types::EntityId::PARTICIPANT,
1259 ));
1260 d.topic_aliases = Some(alloc::vec!["A".into()]);
1261 let bytes = d.to_pl_cdr_le().unwrap();
1262 let mut found_080 = false;
1264 let mut found_081 = false;
1265 let mut found_082 = false;
1266 for w in bytes.windows(2) {
1267 if w == [0x80, 0x00] {
1268 found_080 = true;
1269 }
1270 if w == [0x81, 0x00] {
1271 found_081 = true;
1272 }
1273 if w == [0x82, 0x00] {
1274 found_082 = true;
1275 }
1276 }
1277 assert!(found_080 && found_081 && found_082);
1278 }
1279
1280 #[test]
1281 fn reliability_kind_from_u32_unknown_defaults_best_effort() {
1282 assert_eq!(ReliabilityKind::from_u32(1), ReliabilityKind::BestEffort);
1283 assert_eq!(ReliabilityKind::from_u32(2), ReliabilityKind::Reliable);
1284 assert_eq!(ReliabilityKind::from_u32(999), ReliabilityKind::BestEffort);
1285 }
1286}