1use cdr::{Bounded, CdrBe, Infinite};
21
22
23use serde::{de::DeserializeOwned, Serialize};
24use std::io::prelude::*;
25
26use std::ptr::NonNull;
27
28use std::{
29 ffi::{c_void, CStr},
30 marker::PhantomData,
31 ops::Deref,
32 sync::Arc,
33};
34
35use cyclonedds_sys::*;
36use murmur3::murmur3_32;
38use std::io::Cursor;
39
40#[repr(C)]
41pub struct SerType<T> {
42 sertype: ddsi_sertype,
43 _phantom: PhantomData<T>,
44}
45
46pub trait TopicType: Serialize + DeserializeOwned {
47 fn hash(&self, basehash : u32) -> u32 {
50 let cdr = self.key_cdr();
51 let mut cursor = Cursor::new(cdr.as_slice());
52 murmur3_32(&mut cursor, 0).unwrap() ^ basehash
53 }
54
55 fn is_fixed_size() -> bool {
56 false
57 }
58 fn typename() -> std::ffi::CString {
60 let ty_name_parts: String = std::any::type_name::<Self>()
61 .split("::")
62 .skip(1)
63 .collect::<Vec<_>>()
64 .join("::");
65
66
67 std::ffi::CString::new(ty_name_parts).expect("Unable to create CString for type name")
69 }
70
71 fn topic_name(maybe_prefix: Option<&str>) -> String {
75 let topic_name_parts: String = format!(
76 "/{}",
77 std::any::type_name::<Self>()
78 .to_string()
79 .split("::")
80 .skip(1)
81 .collect::<Vec<_>>()
82 .join("/")
83 );
84
85 if let Some(prefix) = maybe_prefix {
86 let mut path = String::from(prefix);
87 path.push_str(&topic_name_parts);
88 path
89 } else {
90 topic_name_parts
91 }
92 }
93
94 fn has_key() -> bool;
95 fn key_cdr(&self) -> Vec<u8>;
101
102 fn force_md5_keyhash() -> bool;
105}
106
107impl<'a, T> SerType<T> {
108 pub fn new() -> Box<SerType<T>>
109 where
110 T: DeserializeOwned + Serialize + TopicType,
111 {
112 Box::<SerType<T>>::new(SerType {
113 sertype: {
114 let mut sertype = std::mem::MaybeUninit::uninit();
115 unsafe {
116 let type_name = T::typename();
117 ddsi_sertype_init(
118 sertype.as_mut_ptr(),
119 type_name.as_ptr(),
120 Box::into_raw(create_sertype_ops::<T>()),
121 Box::into_raw(create_serdata_ops::<T>()),
122 !T::has_key(),
123 );
124 let mut sertype = sertype.assume_init();
125 sertype.set_fixed_size(if T::is_fixed_size() { 1 } else { 0 });
126 sertype.iox_size = std::mem::size_of::<T>() as u32;
127 sertype
128 }
129 },
130 _phantom: PhantomData,
131 })
132 }
133
134 pub fn into_sertype(sertype: Box<SerType<T>>) -> *mut ddsi_sertype {
138 Box::<SerType<T>>::into_raw(sertype) as *mut ddsi_sertype
139 }
140
141 pub fn try_from_sertype(sertype: *const ddsi_sertype) -> Option<Box<SerType<T>>> {
142 let ptr = sertype as *mut SerType<T>;
143 if !ptr.is_null() {
144 Some(unsafe { Box::from_raw(ptr) })
145 } else {
146 None
147 }
148 }
149}
150
151#[derive(Clone)]
152pub enum SampleStorage<T> {
153 Owned(Arc<T>),
154 Loaned(Arc<NonNull<T>>),
155}
156
157impl<T> Deref for SampleStorage<T> {
158 type Target = T;
159
160 fn deref(&self) -> &Self::Target {
161 match self {
162 SampleStorage::Owned(t) => t.deref(),
163 SampleStorage::Loaned(t) => unsafe { t.as_ref().as_ref() },
164 }
165 }
166}
167
168impl<T> Drop for SampleStorage<T> {
169 fn drop(&mut self) {
170 match self {
171 SampleStorage::Loaned(_t) => {
172 }
173 _ => {
174
175 }
176 }
177 }
178}
179
180
181pub struct Sample<T> {
182 serdata: Option<*mut ddsi_serdata>,
185 sample: Option<SampleStorage<T>>,
187}
188
189impl<'a,T> Sample<T>
190where
191 T: TopicType
192{
193 pub fn try_deref<>(&self) -> Option<&T> {
194 if let Some(serdata) = self.serdata {
195 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
196 match &serdata.sample {
197 SampleData::Uninitialized => None,
198 SampleData::SDKKey => None,
199 SampleData::SDKData(it) => Some(it.as_ref()),
200 SampleData::SHMData(it) => unsafe { Some(it.as_ref())},
201 }
202 } else {
203 None
204 }
205
206 }
207
208 pub fn get_sample(&self) -> Option<SampleStorage<T>> {
209 match self.sample.as_ref() {
211 Some(s) => match s {
212 SampleStorage::Owned(s) => Some(SampleStorage::Owned(s.clone())),
213 SampleStorage::Loaned(s) => Some(SampleStorage::Loaned(s.clone())),
214 },
215 None => None,
216 }
217 }
218
219 #[deprecated]
221 pub (crate)fn get(&self) -> Option<Arc<T>> {
222 match &self.sample {
224 Some(SampleStorage::Owned(t)) => Some(t.clone()),
225 Some(SampleStorage::Loaned(_t)) => {
226 None
227 }
228 None => {
229 None
230 }
231 }
232 }
233
234 pub(crate) fn set_serdata(&mut self,serdata:*mut ddsi_serdata) {
235 unsafe {ddsi_serdata_addref(serdata);}
237 self.serdata = Some(serdata)
238 }
239
240 pub fn set(&mut self, t: Arc<T>) {
241 self.sample.replace(SampleStorage::Owned(t));
243 }
244
245 pub fn set_loaned(&mut self, t: NonNull<T>) {
246 self.sample.replace(SampleStorage::Loaned(Arc::new(t)));
248 }
249
250 pub fn clear(&mut self) {
251 let t = self.sample.take();
253
254 match &t {
255 Some(SampleStorage::Owned(_o)) => {}
256 Some(SampleStorage::Loaned(_o)) => {}
257 None => {}
258 }
259 }
260
261 pub fn from(it: Arc<T>) -> Self {
262 Self {
263 serdata : None,
264 sample: Some(SampleStorage::Owned(it)),
265 }
266 }
267}
268
269impl<T> Default for Sample<T> {
270 fn default() -> Self {
271 Self {
272 serdata : None,
273 sample: None,
274 }
275 }
276}
277
278impl<T> Drop for Sample<T> {
279 fn drop(&mut self) {
280 if let Some(serdata) = self.serdata {
281 unsafe {ddsi_serdata_removeref(serdata)};
282 }
283 }
284}
285
286
287
288
289unsafe impl<T> Send for SampleBuffer<T> {}
302pub struct SampleBuffer<T> {
303 pub(crate) buffer: Vec<*mut Sample<T>>,
305 pub(crate) sample_info: Vec<cyclonedds_sys::dds_sample_info>,
306}
307
308impl<'a, T:TopicType> SampleBuffer<T> {
309 pub fn new(len: usize) -> Self {
310 let mut buf = Self {
311 buffer: Vec::new(),
312 sample_info: vec![cyclonedds_sys::dds_sample_info::default(); len],
313 };
314
315 for _i in 0..len {
316 let p = Box::into_raw(Box::default());
317 buf.buffer.push(p);
318 }
319 buf
320 }
321
322 pub fn is_valid_sample(&self, index: usize) -> bool {
325 self.sample_info[index].valid_data
326 }
327
328 pub fn len(&self) -> usize {
329 self.buffer.len()
330 }
331
332 pub fn iter(&'a self) -> impl Iterator<Item = &T> {
333 let p = self.buffer.iter().filter_map(|p| {
334 let sample = unsafe { &*(*p) };
335 sample.try_deref()
336
337 });
338 p
339 }
340
341 pub fn get(&self, index: usize) -> &Sample<T> {
343 let p_sample = self.buffer[index];
344 unsafe { &*p_sample }
345 }
346
347 pub unsafe fn as_mut_ptr(&mut self) -> (*mut *mut Sample<T>, *mut dds_sample_info) {
351 (self.buffer.as_mut_ptr(), self.sample_info.as_mut_ptr())
352 }
353}
354
355impl<'a, T> Drop for SampleBuffer<T> {
356 fn drop(&mut self) {
357 for p in &self.buffer {
358 unsafe {
359 let _it = Box::from_raw(*p);
360 }
361 }
362 }
363}
364#[allow(dead_code)]
374unsafe extern "C" fn zero_samples<T>(
375 _sertype: *const ddsi_sertype,
376 _ptr: *mut std::ffi::c_void,
377 _len: size_t,
378) {
379} #[allow(dead_code)]
382extern "C" fn realloc_samples<T>(
383 ptrs: *mut *mut std::ffi::c_void,
384 _sertype: *const ddsi_sertype,
385 old: *mut std::ffi::c_void,
386 old_count: size_t,
387 new_count: size_t,
388) {
389 let old = unsafe {
391 Vec::<*mut Sample<T>>::from_raw_parts(
392 old as *mut *mut Sample<T>,
393 old_count as usize,
394 old_count as usize,
395 )
396 };
397 let mut new = Vec::<*mut Sample<T>>::with_capacity(new_count as usize);
398
399 if new_count >= old_count {
400 for entry in old {
401 new.push(entry);
402 }
403
404 for _i in 0..(new_count - old_count) {
405 new.push(Box::into_raw(Box::default()));
406 }
407 } else {
408 for e in old.into_iter().take(new_count as usize) {
409 new.push(e)
410 }
411 }
412
413 let leaked = new.leak();
414
415 let (raw, _length) = (leaked.as_ptr(), leaked.len());
416 unsafe {
419 *ptrs = raw as *mut std::ffi::c_void;
420 }
421}
422
423#[allow(dead_code)]
424extern "C" fn free_samples<T>(
425 _sertype: *const ddsi_sertype,
426 ptrs: *mut *mut std::ffi::c_void,
427 len: size_t,
428 op: dds_free_op_t,
429) where
430 T: TopicType,
431{
432 let ptrs_v: *mut *mut Sample<T> = ptrs as *mut *mut Sample<T>;
433
434 if (op & DDS_FREE_ALL_BIT) != 0 {
435 let _samples =
436 unsafe { Vec::<Sample<T>>::from_raw_parts(*ptrs_v, len as usize, len as usize) };
437 } else {
439 assert_ne!(op & DDS_FREE_CONTENTS_BIT, 0);
440 let mut samples =
441 unsafe { Vec::<Sample<T>>::from_raw_parts(*ptrs_v, len as usize, len as usize) };
442 for sample in samples.iter_mut() {
443 sample.clear()
445 }
447 let _intentional_leak = samples.leak();
448 }
449}
450
451#[allow(dead_code)]
452unsafe extern "C" fn free_sertype<T>(sertype: *mut cyclonedds_sys::ddsi_sertype) {
453 ddsi_sertype_fini(sertype);
454
455 let _sertype_ops = Box::<ddsi_sertype_ops>::from_raw((*sertype).ops as *mut ddsi_sertype_ops);
456 let _serdata_ops =
457 Box::<ddsi_serdata_ops>::from_raw((*sertype).serdata_ops as *mut ddsi_serdata_ops);
458 let sertype = sertype as *mut SerType<T>;
462 let _it = Box::<SerType<T>>::from_raw(sertype);
463}
464
465#[allow(dead_code)]
467unsafe extern "C" fn serdata_from_fragchain<T>(
468 sertype: *const ddsi_sertype,
469 kind: u32,
470 mut fragchain: *const nn_rdata,
471 size: size_t,
472) -> *mut ddsi_serdata
473where
474 T: DeserializeOwned + TopicType,
475{
476 let mut off: u32 = 0;
478 let size = size as usize;
479 let fragchain_ref = &*fragchain;
480
481 let mut serdata = SerData::<T>::new(sertype, kind);
482
483 assert_eq!(fragchain_ref.min, 0);
484 assert!(fragchain_ref.maxp1 >= off);
485
486 let mut sg_list = Vec::new();
488
489 while !fragchain.is_null() {
490 let fragchain_ref = &*fragchain;
491 if fragchain_ref.maxp1 > off {
492 let payload =
493 nn_rmsg_payload_offset(fragchain_ref.rmsg, nn_rdata_payload_offset(fragchain));
494 let src = payload.add((off - fragchain_ref.min) as usize);
495 let n_bytes = fragchain_ref.maxp1 - off;
496 sg_list.push(std::slice::from_raw_parts(src, n_bytes as usize));
497 off = fragchain_ref.maxp1;
498 assert!(off as usize <= size);
499 }
500 fragchain = fragchain_ref.nextfrag;
501 }
502 let reader = SGReader::new(&sg_list);
504 if let Ok(decoded) = cdr::deserialize_from::<_, T, _>(reader, Bounded(size as u64)) {
505 if T::has_key() {
506 let key_cdr = decoded.key_cdr();
508 let key_cdr = &key_cdr[4..];
510 compute_key_hash(key_cdr, &mut serdata);
511 }
512 serdata.serdata.hash = decoded.hash((*sertype).serdata_basehash);
513 let sample = std::sync::Arc::new(decoded);
514 serdata.sample = SampleData::SDKData(sample);
516 } else {
517 println!("Deserialization error!");
518 return std::ptr::null_mut();
519 }
520
521 let ptr = Box::into_raw(serdata);
525 ptr as *mut ddsi_serdata
527}
528
529fn copy_raw_key_hash<T>(key: &[u8], serdata: &mut Box<SerData<T>>) {
530 let mut raw_key = [0u8; 16];
531 for (i, data) in key.iter().enumerate() {
532 raw_key[i] = *data;
533 }
534 serdata.key_hash = KeyHash::RawKey(raw_key)
535}
536
537fn compute_key_hash<T>(key_cdr: &[u8], serdata: &mut SerData<T>)
538where
539 T: TopicType,
540{
541 let mut cdr_key = [0u8; 20];
542
543 if T::force_md5_keyhash() || key_cdr.len() > 16 {
544 let mut md5st = ddsrt_md5_state_t::default();
545 let md5set = &mut md5st as *mut ddsrt_md5_state_s;
546 unsafe {
547 ddsrt_md5_init(md5set);
548 ddsrt_md5_append(md5set, key_cdr.as_ptr(), key_cdr.len() as u32);
549 ddsrt_md5_finish(md5set, cdr_key.as_mut_ptr());
550 }
551 } else {
552 for (i, data) in key_cdr.iter().enumerate() {
553 cdr_key[i] = *data;
554 }
555 }
556 serdata.key_hash = KeyHash::CdrKey(cdr_key)
557}
558
559#[allow(dead_code)]
560unsafe extern "C" fn serdata_from_keyhash<T>(
561 sertype: *const ddsi_sertype,
562 keyhash: *const ddsi_keyhash,
563) -> *mut ddsi_serdata
564where
565 T: TopicType,
566{
567 let keyhash = (*keyhash).value;
568 if T::force_md5_keyhash() {
571 std::ptr::null_mut()
573 } else {
574 let mut serdata = SerData::<T>::new(sertype, ddsi_serdata_kind_SDK_KEY);
575 serdata.sample = SampleData::SDKKey;
576
577 let mut key_hash_buffer = [0u8; 20];
578 let key_hash = &mut key_hash_buffer[4..];
579
580 for (i, b) in keyhash.iter().enumerate() {
581 key_hash[i] = *b;
582 }
583
584 serdata.key_hash = KeyHash::CdrKey(key_hash_buffer);
585
586 let ptr = Box::into_raw(serdata);
587 ptr as *mut ddsi_serdata
589 }
590}
591
592#[allow(dead_code)]
593#[allow(non_upper_case_globals)]
594unsafe extern "C" fn serdata_from_sample<T>(
595 sertype: *const ddsi_sertype,
596 kind: u32,
597 sample: *const c_void,
598) -> *mut ddsi_serdata
599where
600 T: TopicType,
601{
602 let mut serdata = SerData::<T>::new(sertype, kind);
604 let sample = sample as *const Sample<T>;
605 let sample = &*sample;
606
607 match kind {
608 #[allow(non_upper_case_globals)]
609 ddsi_serdata_kind_SDK_DATA => {
610 let sample = sample.get().unwrap();
611 serdata.serdata.hash = sample.hash((*sertype).serdata_basehash);
612 serdata.sample = SampleData::SDKData(sample);
613 }
614 ddsi_serdata_kind_SDK_KEY => {
615 panic!("Don't know how to create serdata from sample for SDK_KEY");
616 }
617 _ => panic!("Unexpected kind"),
618 }
619
620 let ptr = Box::into_raw(serdata);
621 ptr as *mut ddsi_serdata
623}
624
625#[allow(dead_code)]
626unsafe extern "C" fn serdata_from_iov<T>(
627 sertype: *const ddsi_sertype,
628 kind: u32,
629 niov: size_t,
630 iov: *const iovec,
631 size: size_t,
632) -> *mut ddsi_serdata
633where
634 T: DeserializeOwned + TopicType,
635{
636 let size = size as usize;
637 let niov = niov as usize;
638 let mut serdata = SerData::<T>::new(sertype, kind);
641
642 let iovs = std::slice::from_raw_parts(iov as *const cyclonedds_sys::iovec, niov);
643
644 let iov_slices: Vec<&[u8]> = iovs
645 .iter()
646 .map(|iov| {
647 let iov = iov;
648
649 std::slice::from_raw_parts(iov.iov_base as *const u8, iov.iov_len as usize)
650 })
651 .collect();
652
653 let reader = SGReader::new(&iov_slices);
655
656 if let Ok(decoded) = cdr::deserialize_from::<_, T, _>(reader, Bounded(size as u64)) {
657 if T::has_key() {
658 let key_cdr = decoded.key_cdr();
660 let key_cdr = &key_cdr[4..];
662 compute_key_hash(key_cdr, &mut serdata);
663 }
664 serdata.serdata.hash = decoded.hash((*sertype).serdata_basehash);
665 let sample = std::sync::Arc::new(decoded);
666 serdata.sample = SampleData::SDKData(sample);
668 } else {
669 return std::ptr::null_mut();
671 }
672
673 let ptr = Box::into_raw(serdata);
675 ptr as *mut ddsi_serdata
677}
678
679#[allow(dead_code)]
680unsafe extern "C" fn free_serdata<T>(serdata: *mut ddsi_serdata) {
681 let ptr = serdata as *mut SerData<T>;
684
685 let serdata = &mut *ptr;
686
687 if !serdata.serdata.iox_subscriber.is_null() {
688 let iox_subscriber: *mut iox_sub_t = serdata.serdata.iox_subscriber as *mut iox_sub_t;
689 let chunk = &mut serdata.serdata.iox_chunk;
690 let chunk = chunk as *mut *mut c_void;
691 free_iox_chunk(iox_subscriber, chunk);
693 }
694
695 let _data = Box::from_raw(ptr);
696 }
698
699#[allow(dead_code)]
700unsafe extern "C" fn get_size<T>(serdata: *const ddsi_serdata) -> u32
701where
702 T: Serialize + TopicType,
703{
704 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
705 let size = match &serdata.sample {
706 SampleData::Uninitialized => 0,
707 SampleData::SDKKey => serdata.key_hash.key_length() as u32,
708 SampleData::SDKData(sample) => {
710 serdata.serialized_size =
711 Some((cdr::calc_serialized_size::<T>(sample.deref())) as u32);
712 *serdata.serialized_size.as_ref().unwrap()
713 }
714 SampleData::SHMData(_sample) => {
715 0
717 }
722 };
723 size
724}
725
726#[allow(dead_code)]
727unsafe extern "C" fn eqkey<T>(
728 serdata_a: *const ddsi_serdata,
729 serdata_b: *const ddsi_serdata,
730) -> bool {
731 let a = SerData::<T>::mut_ref_from_serdata(serdata_a);
732 let b = SerData::<T>::mut_ref_from_serdata(serdata_b);
733 a.key_hash == b.key_hash
734}
735
736#[allow(dead_code)]
737unsafe extern "C" fn serdata_to_ser<T>(
738 serdata: *const ddsi_serdata,
739 size: size_t,
740 offset: size_t,
741 buf: *mut c_void,
742) where
743 T: Serialize + TopicType,
744{
745 let serdata = SerData::<T>::const_ref_from_serdata(serdata);
747 let buf = buf as *mut u8;
748 let buf = buf.add(offset as usize);
749
750 if size == 0 {
751 return;
752 }
753
754 match &serdata.sample {
755 SampleData::Uninitialized => {
756 panic!("Attempt to serialize uninitialized serdata")
757 }
758 SampleData::SDKKey => match &serdata.key_hash {
759 KeyHash::None => {}
760 KeyHash::CdrKey(k) => std::ptr::copy_nonoverlapping(k.as_ptr(), buf, size as usize),
761 KeyHash::RawKey(k) => std::ptr::copy_nonoverlapping(k.as_ptr(), buf, size as usize),
762 },
763 SampleData::SDKData(serdata) => {
765 let buf_slice = std::slice::from_raw_parts_mut(buf, size as usize);
766 if let Err(e) = cdr::serialize_into::<_, T, _, CdrBe>(
767 buf_slice,
768 serdata.deref(),
769 Bounded(size),
770 ) {
771 panic!("Unable to serialize type {:?} due to {}", T::typename(), e);
772 }
773 }
774 SampleData::SHMData(serdata) => {
775 let buf_slice = std::slice::from_raw_parts_mut(buf, size as usize);
776 if let Err(e) = cdr::serialize_into::<_, T, _, CdrBe>(
777 buf_slice,
778 serdata.as_ref(),
779 Bounded(size),
780 ) {
781 panic!("Unable to serialize type {:?} due to {}", T::typename(), e);
782 }
783 }
784 }
785}
786
787#[allow(dead_code)]
788unsafe extern "C" fn serdata_to_ser_ref<T>(
789 serdata: *const ddsi_serdata,
790 offset: size_t,
791 size: size_t,
792 iov: *mut iovec,
793) -> *mut ddsi_serdata
794where
795 T: Serialize + TopicType,
796{
797 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
799 let iov = &mut *iov;
800
801 match &serdata.sample {
802 SampleData::Uninitialized => panic!("Attempt to serialize uninitialized Sample"),
803 SampleData::SDKKey => {
804 let (p, len) = match &serdata.key_hash {
805 KeyHash::None => (std::ptr::null(), 0),
806 KeyHash::CdrKey(k) => (k.as_ptr(), k.len()),
807 KeyHash::RawKey(k) => (k.as_ptr(), k.len()),
808 };
809
810 iov.iov_base = p as *mut c_void;
811 iov.iov_len = len as size_t;
812 }
813 SampleData::SDKData(sample) => {
814 if serdata.cdr.is_none() {
815 serdata.cdr = serialize_type::<T>(sample, serdata.serialized_size).ok();
816 }
817 if let Some(cdr) = &serdata.cdr {
818 let offset = offset as usize;
819 let mut last = offset + size as usize;
820 if last > cdr.len() - 1 {
821 last = cdr.len() - 1;
822 }
823 let cdr = &cdr[offset..last];
824 iov.iov_base = cdr.as_ptr() as *mut c_void;
827 iov.iov_len = size; } else {
829 println!("Serialization error!");
830 return std::ptr::null_mut();
831 }
832 }
833
834 SampleData::SHMData(sample) => {
835 if serdata.cdr.is_none() {
836 serdata.cdr = serialize_type::<T>(sample.as_ref(), serdata.serialized_size).ok();
837 }
838 if let Some(cdr) = &serdata.cdr {
839 let offset = offset as usize;
840 let last = offset + size as usize;
841 let cdr = &cdr[offset..last];
842 iov.iov_base = cdr.as_ptr() as *mut c_void;
843 iov.iov_len = cdr.len() as size_t;
844 } else {
845 println!("Serialization error (SHM)!");
846 return std::ptr::null_mut();
847 }
848 }
849 }
850 ddsi_serdata_addref(&serdata.serdata)
851}
852
853fn serialize_type<T: Serialize>(sample: &T, maybe_size: Option<u32>) -> Result<Vec<u8>, ()> {
854 if let Some(size) = maybe_size {
855 let size = (size + 3) & !3u32;
857 let mut buffer = Vec::<u8>::with_capacity(size as usize);
858 if let Ok(()) = cdr::serialize_into::<_, T, _, CdrBe>(&mut buffer, sample, Infinite) {
859 Ok(buffer)
860 } else {
861 Err(())
862 }
863 } else if let Ok(data) = cdr::serialize::<T, _, CdrBe>(sample, Infinite) {
864 Ok(data)
865 } else {
866 Err(())
867 }
868}
869
870#[allow(dead_code)]
871unsafe extern "C" fn serdata_to_ser_unref<T>(serdata: *mut ddsi_serdata, _iov: *const iovec) {
872 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
874 ddsi_serdata_removeref(&mut serdata.serdata)
875}
876
877fn deserialize_type<T>(data:&[u8]) -> Result<Arc<T>,()>
878 where
879 T: DeserializeOwned {
880 cdr::deserialize::<Box<T>>(data).map(Arc::from).map_err(|_e|())
881 }
882
883#[allow(dead_code)]
884unsafe extern "C" fn serdata_to_sample<T>(
885 serdata_ptr: *const ddsi_serdata,
886 sample: *mut c_void,
887 _bufptr: *mut *mut c_void,
888 _buflim: *mut c_void,
889) -> bool
890where
891 T: DeserializeOwned + TopicType,
892{
893 let mut serdata = SerData::<T>::mut_ref_from_serdata(serdata_ptr);
898 let mut s = Box::<Sample<T>>::from_raw(sample as *mut Sample<T>);
899 assert!(!sample.is_null());
900
901 let ret = if !serdata.serdata.iox_chunk.is_null() {
903 let hdr = iceoryx_header_from_chunk(serdata.serdata.iox_chunk);
905 if (*hdr).shm_data_state == iox_shm_data_state_t_IOX_CHUNK_CONTAINS_SERIALIZED_DATA {
906 let reader = std::slice::from_raw_parts(
908 serdata.serdata.iox_chunk as *const u8,
909 (*hdr).data_size as usize,
910 );
911 if serdata.serdata.kind == ddsi_serdata_kind_SDK_KEY {
912 compute_key_hash(reader, serdata);
913 serdata.sample = SampleData::SDKKey;
914 Ok(())
915 } else if let Ok(decoded) = deserialize_type::<T>(reader) {
916 if T::has_key() {
917 let key_cdr = decoded.key_cdr();
919 let key_cdr = &key_cdr[4..];
921 compute_key_hash(key_cdr, serdata);
922 }
923 s.set(decoded.clone());
926 serdata.sample = SampleData::SDKData(decoded);
927
928 Ok(())
929 } else {
930 println!("Deserialization error!");
931 Err(())
932 }
933 } else {
934 assert_eq!((*hdr).data_size as usize, std::mem::size_of::<T>());
936 if std::mem::size_of::<T>() == (*hdr).data_size as usize {
937 let p: *mut T = serdata.serdata.iox_chunk as *mut T;
941 serdata.sample = SampleData::SHMData(NonNull::new_unchecked(p));
942 Ok(())
943 } else {
944 Err(())
945 }
946 }
947 } else {
948 Ok(())
949 };
950
951 let ret = if let Ok(()) = ret {
952 match &serdata.sample {
953 SampleData::Uninitialized => true,
954 SampleData::SDKKey => true,
955 SampleData::SDKData(_data) => {
956 s.set_serdata(serdata_ptr as *mut ddsi_serdata);
957 false
959 }
960 SampleData::SHMData(_data) => {
961 s.set_serdata(serdata_ptr as *mut ddsi_serdata);
962 false
964 }
965 }
966 } else {
967 true
968 };
969
970 let _intentional_leak = Box::into_raw(s);
972 ret
973}
974
975#[allow(dead_code)]
976unsafe extern "C" fn serdata_to_untyped<T>(serdata: *const ddsi_serdata) -> *mut ddsi_serdata {
977 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
979
980 let mut untyped_serdata = SerData::<T>::new(serdata.serdata.type_, ddsi_serdata_kind_SDK_KEY);
982 untyped_serdata.serdata.type_ = std::ptr::null_mut();
984 untyped_serdata.sample = SampleData::SDKKey;
985
986 untyped_serdata.key_hash = serdata.key_hash.clone();
988 untyped_serdata.serdata.hash = serdata.serdata.hash;
989
990 let ptr = Box::into_raw(untyped_serdata);
991
992 ptr as *mut ddsi_serdata
993 }
998
999#[allow(dead_code)]
1000unsafe extern "C" fn untyped_to_sample<T>(
1001 _sertype: *const ddsi_sertype,
1002 _serdata: *const ddsi_serdata,
1003 sample: *mut c_void,
1004 _buf: *mut *mut c_void,
1005 _buflim: *mut c_void,
1006) -> bool
1007where
1008 T: TopicType,
1009{
1010 if !sample.is_null() {
1012 let mut sample = Box::<Sample<T>>::from_raw(sample as *mut Sample<T>);
1013 sample.clear();
1016 let _leaked = Box::<Sample<T>>::into_raw(sample);
1018 true
1019 } else {
1020 false
1021 }
1022}
1023
1024#[allow(dead_code)]
1025unsafe extern "C" fn get_keyhash<T>(
1026 serdata: *const ddsi_serdata,
1027 keyhash: *mut ddsi_keyhash,
1028 _force_md5: bool,
1029) {
1030 let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
1031 let keyhash = &mut *keyhash;
1032
1033 let src = match &serdata.key_hash {
1034 KeyHash::None => &[],
1035 KeyHash::CdrKey(k) => &k[4..],
1036 KeyHash::RawKey(k) => &k[..],
1037 };
1038
1039 for (i, b) in src.iter().enumerate() {
1041 keyhash.value[i] = *b;
1042 }
1043}
1044
1045#[allow(dead_code)]
1046unsafe extern "C" fn print<T>(
1047 _sertype: *const ddsi_sertype,
1048 _serdata: *const ddsi_serdata,
1049 _buf: *mut std::os::raw::c_char,
1050 _bufsize: size_t,
1051) -> size_t {
1052 0
1053}
1054
1055fn create_sertype_ops<T>() -> Box<ddsi_sertype_ops>
1056where
1057 T: TopicType,
1058{
1059 Box::new(ddsi_sertype_ops {
1060 version: Some(ddsi_sertype_v0),
1061 arg: std::ptr::null_mut(),
1062 free: Some(free_sertype::<T>),
1063 zero_samples: Some(zero_samples::<T>),
1064 realloc_samples: Some(realloc_samples::<T>),
1065 free_samples: Some(free_samples::<T>),
1066 equal: Some(equal::<T>),
1067 hash: Some(hash::<T>),
1068 ..Default::default()
1069 })
1070}
1071
1072#[cfg(feature = "shm")]
1073#[allow(dead_code)]
1074unsafe extern "C" fn get_sample_size(serdata: *const ddsi_serdata) -> u32 {
1075 let serdata = *serdata;
1076 (*serdata.type_).iox_size
1077}
1078
1079#[cfg(feature = "shm")]
1080#[allow(dead_code)]
1081unsafe extern "C" fn from_iox_buffer<T>(
1082 sertype: *const ddsi_sertype,
1083 kind: ddsi_serdata_kind,
1084 sub: *mut ::std::os::raw::c_void,
1086 buffer: *mut ::std::os::raw::c_void,
1087) -> *mut ddsi_serdata {
1088 if sertype.is_null() {
1091 return std::ptr::null::<ddsi_serdata>() as *mut ddsi_serdata;
1092 }
1093
1094 let mut d = SerData::<T>::new(sertype, kind);
1095
1096 if sub.is_null() {
1098 d.serdata.iox_chunk = buffer;
1099 } else {
1100 d.serdata.iox_chunk = buffer;
1103 d.serdata.iox_subscriber = sub;
1104 let hdr = iceoryx_header_from_chunk(buffer);
1105 copy_raw_key_hash(&(*hdr).keyhash.value, &mut d);
1107 }
1108
1109 d.sample = SampleData::SHMData(NonNull::new_unchecked(buffer as *mut T));
1111
1112 let ptr = Box::into_raw(d);
1113 ptr as *mut ddsi_serdata
1115}
1116
1117fn create_serdata_ops<T>() -> Box<ddsi_serdata_ops>
1118where
1119 T: DeserializeOwned + TopicType + Serialize ,
1120{
1121 Box::new(ddsi_serdata_ops {
1122 eqkey: Some(eqkey::<T>),
1123 get_size: Some(get_size::<T>),
1124 from_ser: Some(serdata_from_fragchain::<T>),
1125 from_ser_iov: Some(serdata_from_iov::<T>),
1126 from_keyhash: Some(serdata_from_keyhash::<T>),
1127 from_sample: Some(serdata_from_sample::<T>),
1128 to_ser: Some(serdata_to_ser::<T>),
1129 to_ser_ref: Some(serdata_to_ser_ref::<T>),
1130 to_ser_unref: Some(serdata_to_ser_unref::<T>),
1131 to_sample: Some(serdata_to_sample::<T>),
1132 to_untyped: Some(serdata_to_untyped::<T>),
1133 untyped_to_sample: Some(untyped_to_sample::<T>),
1134 free: Some(free_serdata::<T>),
1135 print: Some(print::<T>),
1136 get_keyhash: Some(get_keyhash::<T>),
1137 #[cfg(feature = "shm")]
1138 get_sample_size: Some(get_sample_size),
1139 #[cfg(feature = "shm")]
1140 from_iox_buffer: Some(from_iox_buffer::<T>),
1141 ..Default::default()
1142 })
1143}
1144
1145unsafe extern "C" fn hash<T: TopicType>(tp: *const ddsi_sertype) -> u32
1163{
1164 if let Some(ser_type) = SerType::<T>::try_from_sertype(tp) {
1165 let type_name = CStr::from_ptr(ser_type.sertype.type_name);
1166 let type_name_bytes = type_name.to_bytes();
1167 let type_size = core::mem::size_of::<T>().to_ne_bytes();
1168 let sg_list = [type_name_bytes,&type_size];
1169 let mut sg_buffer = SGReader::new(&sg_list);
1170
1171 let hash = murmur3_32(&mut sg_buffer, 0);
1172
1173 let _intentional_leak = SerType::<T>::into_sertype(ser_type);
1174 hash.unwrap_or(0)
1175
1176 } else {
1177 0
1178 }
1179}
1180
1181unsafe extern "C" fn equal<T>(acmn: *const ddsi_sertype, bcmn: *const ddsi_sertype) -> bool {
1182 let acmn = CStr::from_ptr((*acmn).type_name as *mut std::os::raw::c_char);
1183 let bcmn = CStr::from_ptr((*bcmn).type_name as *mut std::os::raw::c_char);
1184 acmn == bcmn
1185}
1186
1187#[derive(Clone)]
1188enum SampleData<T> {
1189 Uninitialized,
1190 SDKKey,
1191 SDKData(std::sync::Arc<T>),
1192 SHMData(NonNull<T>),
1193}
1194
1195impl<T> Default for SampleData<T> {
1196 fn default() -> Self {
1197 Self::Uninitialized
1198 }
1199}
1200
1201
1202#[derive(PartialEq, Clone)]
1203enum KeyHash {
1204 None,
1205 CdrKey([u8; 20]),
1206 RawKey([u8; 16]),
1207}
1208
1209impl Default for KeyHash {
1210 fn default() -> Self {
1211 Self::None
1212 }
1213}
1214
1215impl KeyHash {
1216 fn get_key_hash(&self) -> &[u8] {
1217 match self {
1218 KeyHash::None => &[],
1219 KeyHash::CdrKey(cdr_key_hash) => cdr_key_hash,
1220 KeyHash::RawKey(raw_key_hash) => raw_key_hash,
1221 }
1222 }
1223 fn key_length(&self) -> usize {
1224 match self {
1225 KeyHash::CdrKey(k) => k.len(),
1226 KeyHash::RawKey(k) => k.len(),
1227 _ => 0,
1228 }
1229 }
1230}
1231
1232#[repr(C)]
1234pub (crate)struct SerData<T> {
1235 serdata: ddsi_serdata,
1236 sample: SampleData<T>,
1237 cdr: Option<Vec<u8>>,
1240 key_hash: KeyHash,
1244 serialized_size: Option<u32>,
1246}
1247
1248impl<'a, T> SerData<T> {
1249 fn new(sertype: *const ddsi_sertype, kind: u32) -> Box<SerData<T>> {
1250 Box::<SerData<T>>::new(SerData {
1251 serdata: {
1252 let mut data = std::mem::MaybeUninit::uninit();
1253 unsafe {
1254 ddsi_serdata_init(data.as_mut_ptr(), sertype, kind);
1255 data.assume_init()
1256 }
1257 },
1258 sample: SampleData::default(),
1259 cdr: None,
1260 key_hash: KeyHash::default(),
1261 serialized_size: None,
1262 })
1263 }
1264
1265 fn const_ref_from_serdata(serdata: *const ddsi_serdata) -> &'a Self {
1266 let ptr = serdata as *const SerData<T>;
1267 unsafe { &*ptr }
1268 }
1269
1270 fn mut_ref_from_serdata(serdata: *const ddsi_serdata) -> &'a mut Self {
1271 let ptr = serdata as *mut SerData<T>;
1272 unsafe { &mut *ptr }
1273 }
1274}
1275
1276impl <T>Clone for SerData<T> {
1277 fn clone(&self) -> Self {
1278 Self {
1279 serdata: {
1280 let mut newdata = self.serdata;
1281 unsafe {ddsi_serdata_addref(&mut newdata)};
1282 newdata
1283 }, sample: match &self.sample {
1284 SampleData::Uninitialized => SampleData::Uninitialized,
1285 SampleData::SDKKey => SampleData::SDKKey,
1286 SampleData::SDKData(d) => SampleData::SDKData(d.clone()),
1287 SampleData::SHMData(d) => SampleData::SHMData(*d),
1288 }, cdr: self.cdr.clone(), key_hash: self.key_hash.clone(), serialized_size: self.serialized_size }
1289 }
1290}
1291
1292
1293
1294fn nn_rdata_payload_offset(rdata: *const nn_rdata) -> usize {
1300 unsafe { (*rdata).payload_zoff as usize }
1301}
1302
1303fn nn_rmsg_payload(rmsg: *const nn_rmsg) -> *const u8 {
1304 unsafe { rmsg.add(1) as *const u8 }
1305}
1306
1307fn nn_rmsg_payload_offset(rmsg: *const nn_rmsg, offset: usize) -> *const u8 {
1308 unsafe { nn_rmsg_payload(rmsg).add(offset) }
1309}
1310
1311struct SGReader<'a> {
1313 sc_list: Option< &'a[&'a [u8]]>,
1314 slice_cursor: usize,
1316 slice_offset: usize,
1318}
1319
1320impl<'a> SGReader<'a> {
1321 pub fn new(sc_list: &'a[&'a [u8]]) -> Self {
1322 SGReader {
1323 sc_list: Some(sc_list),
1324 slice_cursor: 0,
1325 slice_offset: 0,
1326 }
1327 }
1328}
1329
1330impl<'a> Read for SGReader<'a> {
1331 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1332 let read_buf_len = buf.len();
1333 if self.sc_list.is_some() {
1334 let source_slice = self.sc_list.as_ref().unwrap()[self.slice_cursor];
1335 let num_slices = self.sc_list.as_ref().unwrap().len();
1336 let source_slice_rem = source_slice.len() - self.slice_offset;
1337 let source_slice = &source_slice[self.slice_offset..];
1338
1339 let copy_length = std::cmp::min(source_slice_rem, read_buf_len);
1340
1341 buf[..copy_length].copy_from_slice(&source_slice[..copy_length]);
1343
1344 if copy_length == source_slice_rem {
1345 self.slice_cursor += 1;
1347 self.slice_offset = 0;
1348
1349 if self.slice_cursor >= num_slices {
1350 let _ = self.sc_list.take();
1352 }
1353 } else {
1354 self.slice_offset += copy_length;
1356 }
1357
1358 Ok(copy_length)
1359 } else {
1360 Ok(0)
1362 }
1363 }
1364}
1365
1366#[cfg(test)]
1367mod test {
1368 use super::*;
1369 use crate::{DdsListener, DdsParticipant, DdsQos, DdsTopic};
1370 use cdds_derive::Topic;
1371 use serde_derive::{Deserialize, Serialize};
1372 use std::ffi::CString;
1373
1374 #[test]
1375 fn scatter_gather() {
1376 let a = vec![1, 2, 3, 4, 5, 6];
1377 let b = vec![7, 8, 9, 10, 11];
1378 let c = vec![12, 13, 14, 15];
1379 let d = vec![16, 17, 18, 19, 20, 21];
1380
1381 let sla = unsafe { std::slice::from_raw_parts(a.as_ptr(), a.len()) };
1382 let slb = unsafe { std::slice::from_raw_parts(b.as_ptr(), b.len()) };
1383 let slc = unsafe { std::slice::from_raw_parts(c.as_ptr(), c.len()) };
1384 let sld = unsafe { std::slice::from_raw_parts(d.as_ptr(), d.len()) };
1385
1386 let sc_list = vec![sla, slb, slc, sld];
1387
1388 let mut reader = SGReader::new(&sc_list);
1389
1390 let mut buf = vec![0, 0, 0, 0, 0];
1391 if let Ok(n) = reader.read(&mut buf) {
1392 assert_eq!(&buf[..n], vec![1, 2, 3, 4, 5]);
1393 } else {
1394 panic!("should not panic");
1395 }
1396 if let Ok(n) = reader.read(&mut buf) {
1397 assert_eq!(&buf[..n], vec![6]);
1398 } else {
1399 panic!("should not panic");
1400 }
1401 }
1402
1403 #[test]
1404 fn keyhash_basic() {
1405 #[derive(Serialize, Deserialize, Topic, Default)]
1406 struct Foo {
1407 #[topic_key]
1408 id: i32,
1409 x: u32,
1410 y: u32,
1411 }
1412 let foo = Foo {
1413 id: 0x12345678,
1414 x: 10,
1415 y: 20,
1416 };
1417 let key_cdr = foo.key_cdr();
1418 assert_eq!(key_cdr, vec![0, 0, 0, 0, 0x12u8, 0x34u8, 0x56u8, 0x78u8]);
1419 }
1420 #[test]
1421 fn keyhash_simple() {
1422 #[derive(Serialize, Deserialize, Topic, Default)]
1423 struct Foo {
1424 #[topic_key]
1425 id: i32,
1426 x: u32,
1427 #[topic_key]
1428 s: String,
1429 y: u32,
1430 }
1431 let foo = Foo {
1432 id: 0x12345678,
1433 x: 10,
1434 s: String::from("boo"),
1435 y: 20,
1436 };
1437 let key_cdr = foo.key_cdr();
1438 assert_eq!(
1439 key_cdr,
1440 vec![0, 0, 0, 0, 18, 52, 86, 120, 0, 0, 0, 4, 98, 111, 111, 0]
1441 );
1442 }
1443
1444 #[test]
1445 fn keyhash_nested() {
1446 #[derive(Serialize, Deserialize, Topic, Default)]
1447 struct NestedFoo {
1448 name: String,
1449 val: u64,
1450 #[topic_key]
1451 instance: u32,
1452 }
1453
1454 assert_eq!(
1455 NestedFoo::typename(),
1456 CString::new("serdes::test::keyhash_nested::NestedFoo").unwrap()
1457 );
1458
1459 impl NestedFoo {
1460 fn new() -> Self {
1461 Self {
1462 name: "my name".to_owned(),
1463 val: 42,
1464 instance: 25,
1465 }
1466 }
1467 }
1468
1469 #[derive(Serialize, Deserialize, Topic, Default)]
1470 struct Foo {
1471 #[topic_key]
1472 id: i32,
1473 x: u32,
1474 #[topic_key]
1475 s: String,
1476 y: u32,
1477 #[topic_key]
1478 inner: NestedFoo,
1479 }
1480 let foo = Foo {
1481 id: 0x12345678,
1482 x: 10,
1483 s: String::from("boo"),
1484 y: 20,
1485 inner: NestedFoo::new(),
1486 };
1487 let key_cdr = foo.key_cdr();
1488 assert_eq!(
1489 key_cdr,
1490 vec![0, 0, 0, 0, 18, 52, 86, 120, 0, 0, 0, 4, 98, 111, 111, 0, 0, 0, 0, 25]
1491 );
1492 }
1493
1494 #[test]
1495 fn primitive_array_as_key() {
1496 #[derive(Serialize, Deserialize, Topic, Default)]
1497 struct Foo {
1498 #[topic_key]
1499 a: [u8; 8],
1500 b: u32,
1501 c: String,
1502 }
1503
1504 let foo = Foo {
1505 a: [0, 0, 0, 0, 0, 0, 0, 0],
1506 b: 42,
1507 c: "foo".to_owned(),
1508 };
1509
1510 let key_cdr = foo.key_cdr();
1511 assert_eq!(key_cdr, vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
1512 assert_eq!(false, Foo::force_md5_keyhash());
1513 }
1514
1515 #[test]
1516 fn primitive_array_and_string_as_key() {
1517 #[derive(Serialize, Deserialize, Topic, Default)]
1518 struct Foo {
1519 #[topic_key]
1520 a: [u8; 8],
1521 b: u32,
1522 #[topic_key]
1523 c: String,
1524 }
1525
1526 let foo = Foo {
1527 a: [0, 0, 0, 0, 0, 0, 0, 0],
1528 b: 42,
1529 c: "foo".to_owned(),
1530 };
1531
1532 let key_cdr = foo.key_cdr();
1533 assert_eq!(
1534 key_cdr,
1535 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 102, 111, 111, 0]
1536 );
1537 assert_eq!(true, Foo::force_md5_keyhash());
1538 }
1539
1540 #[test]
1541 fn basic() {
1542 #[derive(Serialize, Deserialize, Topic, Default)]
1543 struct NestedFoo {
1544 name: String,
1545 val: u64,
1546 #[topic_key]
1547 instance: u32,
1548 }
1549
1550 impl NestedFoo {
1551 fn new() -> Self {
1552 Self {
1553 name: "my name".to_owned(),
1554 val: 42,
1555 instance: 25,
1556 }
1557 }
1558 }
1559
1560 #[derive(Serialize, Deserialize, Topic, Default)]
1561 struct Foo {
1562 #[topic_key]
1563 id: i32,
1564 x: u32,
1565 #[topic_key]
1566 s: String,
1567 y: u32,
1568 #[topic_key]
1569 inner: NestedFoo,
1570 }
1571 let _foo = Foo {
1572 id: 0x12345678,
1573 x: 10,
1574 s: String::from("boo"),
1575 y: 20,
1576 inner: NestedFoo::new(),
1577 };
1578 let t = SerType::<Foo>::new();
1579 let mut t = SerType::into_sertype(t);
1580 let tt = &mut t as *mut *mut ddsi_sertype;
1581 unsafe {
1582 let p = dds_create_participant(0, std::ptr::null_mut(), std::ptr::null_mut());
1583 let topic_name = CString::new("topic_name").unwrap();
1584 let topic = dds_create_topic_sertype(
1585 p,
1586 topic_name.as_ptr(),
1587 tt,
1588 std::ptr::null_mut(),
1589 std::ptr::null_mut(),
1590 std::ptr::null_mut(),
1591 );
1592
1593 dds_delete(topic);
1594 }
1595 }
1596}