cyclonedds_rs/
serdes.rs

1/*
2    Copyright 2021 Sojan James
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15*/
16
17// Rust deserializer for CycloneDDS.
18// See discussion at https://github.com/eclipse-cyclonedds/cyclonedds/issues/830
19
20use cdr::{Bounded, CdrBe, Infinite};
21
22
23use serde::{de::DeserializeOwned, Serialize};
24use std::io::prelude::*;
25
26use std::ptr::NonNull;
27
28use std::{
29    ffi::{c_void, CStr},
30    marker::PhantomData,
31    ops::Deref,
32    sync::Arc,
33};
34
35use cyclonedds_sys::*;
36//use fasthash::{murmur3::Hasher32, FastHasher};
37use murmur3::murmur3_32;
38use std::io::Cursor;
39
40#[repr(C)]
41pub struct SerType<T> {
42    sertype: ddsi_sertype,
43    _phantom: PhantomData<T>,
44}
45
46pub trait TopicType: Serialize + DeserializeOwned {
47    // generate a non-cryptographic hash of the key values to be used internally
48    // in cyclonedds
49    fn hash(&self, basehash : u32) -> u32 {
50        let cdr = self.key_cdr();
51        let mut cursor = Cursor::new(cdr.as_slice());
52        murmur3_32(&mut cursor, 0).unwrap() ^ basehash
53    }
54
55    fn is_fixed_size() -> bool {
56        false
57    }
58    /// The type name for this topic
59    fn typename() -> std::ffi::CString {
60        let ty_name_parts: String = std::any::type_name::<Self>()
61            .split("::")
62            .skip(1)
63            .collect::<Vec<_>>()
64            .join("::");
65
66        
67        //println!("Typename:{:?}", &typename);
68        std::ffi::CString::new(ty_name_parts).expect("Unable to create CString for type name")
69    }
70
71    /// The default topic_name to use when creating a topic of this type. The default
72    /// implementation uses '/' instead of '::' to form a unix like path.
73    /// A prefix can optionally be added
74    fn topic_name(maybe_prefix: Option<&str>) -> String {
75        let topic_name_parts: String = format!(
76            "/{}",
77            std::any::type_name::<Self>()
78                .to_string()
79                .split("::")
80                .skip(1)
81                .collect::<Vec<_>>()
82                .join("/")
83        );
84
85        if let Some(prefix) = maybe_prefix {
86            let mut path = String::from(prefix);
87            path.push_str(&topic_name_parts);
88            path
89        } else {
90            topic_name_parts
91        }
92    }
93
94    fn has_key() -> bool;
95    // this is the key as defined in the DDS-RTPS spec.
96    // KeyHash (PID_KEY_HASH). This function does not
97    // hash the key. Use the force_md5_keyhash to know
98    // whether to use md5 even if the the key cdr is 16 bytes
99    // or shorter.
100    fn key_cdr(&self) -> Vec<u8>;
101
102    // force the use of md5 even if the serialized size is less than 16
103    // as per the standard, we need to check the potential field size and not the actual.
104    fn force_md5_keyhash() -> bool;
105}
106
107impl<'a, T> SerType<T> {
108    pub fn new() -> Box<SerType<T>>
109    where
110        T: DeserializeOwned + Serialize + TopicType,
111    {
112        Box::<SerType<T>>::new(SerType {
113            sertype: {
114                let mut sertype = std::mem::MaybeUninit::uninit();
115                unsafe {
116                    let type_name = T::typename();
117                    ddsi_sertype_init(
118                        sertype.as_mut_ptr(),
119                        type_name.as_ptr(),
120                        Box::into_raw(create_sertype_ops::<T>()),
121                        Box::into_raw(create_serdata_ops::<T>()),
122                        !T::has_key(),
123                    );
124                    let mut sertype = sertype.assume_init();
125                    sertype.set_fixed_size(if T::is_fixed_size() { 1 } else { 0 });
126                    sertype.iox_size = std::mem::size_of::<T>() as u32;
127                    sertype
128                }
129            },
130            _phantom: PhantomData,
131        })
132    }
133
134    // cast into cyclone dds sertype.  Rust relinquishes ownership here.
135    // Cyclone DDS will free this. But if you need to free this pointer
136    // before handing it over to cyclone, make sure you explicitly free it
137    pub fn into_sertype(sertype: Box<SerType<T>>) -> *mut ddsi_sertype {
138        Box::<SerType<T>>::into_raw(sertype) as *mut ddsi_sertype
139    }
140
141    pub fn try_from_sertype(sertype: *const ddsi_sertype) -> Option<Box<SerType<T>>> {
142        let ptr = sertype as *mut SerType<T>;
143        if !ptr.is_null() {
144            Some(unsafe { Box::from_raw(ptr) })
145        } else {
146            None
147        }
148    }
149}
150
151#[derive(Clone)]
152pub enum SampleStorage<T> {
153    Owned(Arc<T>),
154    Loaned(Arc<NonNull<T>>),
155}
156
157impl<T> Deref for SampleStorage<T> {
158    type Target = T;
159
160    fn deref(&self) -> &Self::Target {
161        match self {
162            SampleStorage::Owned(t) => t.deref(),
163            SampleStorage::Loaned(t) => unsafe { t.as_ref().as_ref() },
164        }
165    }
166}
167
168impl<T> Drop for SampleStorage<T> {
169    fn drop(&mut self) {
170        match self {
171            SampleStorage::Loaned(_t) => {
172            }
173            _ => {
174
175            }
176        }
177    }
178}
179
180
181pub struct Sample<T> {
182    //Serdata is used for incoming samples. We hold a reference to the ddsi_serdata which contains 
183    // the sample
184    serdata: Option<*mut ddsi_serdata>,
185    // sample is used for outgoing samples.
186    sample: Option<SampleStorage<T>>,
187}
188
189impl<'a,T> Sample<T>
190where
191    T: TopicType
192{
193    pub fn try_deref<>(&self) -> Option<&T> {       
194            if let Some(serdata) = self.serdata {
195                let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
196                match &serdata.sample {
197                    SampleData::Uninitialized => None,
198                    SampleData::SDKKey => None,
199                    SampleData::SDKData(it) => Some(it.as_ref()),
200                    SampleData::SHMData(it) => unsafe { Some(it.as_ref())},
201                }
202            } else {
203                None
204            }
205  
206    }
207
208    pub fn get_sample(&self) -> Option<SampleStorage<T>> {
209        //if let Ok(t) = self.sample.write() {
210            match self.sample.as_ref() {
211                Some(s) => match s {
212                    SampleStorage::Owned(s) => Some(SampleStorage::Owned(s.clone())),
213                    SampleStorage::Loaned(s) => Some(SampleStorage::Loaned(s.clone())),
214                },
215                None => None,
216            }
217    }
218
219    // Deprecated as this function can panic
220    #[deprecated]
221    pub (crate)fn get(&self) -> Option<Arc<T>> {
222        //let t = self.sample;
223        match &self.sample {
224            Some(SampleStorage::Owned(t)) => Some(t.clone()),
225            Some(SampleStorage::Loaned(_t)) => {
226                None
227            }
228            None => {
229                None
230            }
231        }
232    }
233
234    pub(crate) fn set_serdata(&mut self,serdata:*mut ddsi_serdata) {
235        // Increment the reference count
236        unsafe {ddsi_serdata_addref(serdata);}
237        self.serdata = Some(serdata)
238    }
239
240    pub fn set(&mut self, t: Arc<T>) {
241        //let mut sample = self.sample.write().unwrap();
242        self.sample.replace(SampleStorage::Owned(t));
243    }
244
245    pub fn set_loaned(&mut self, t: NonNull<T>) {
246        //let mut sample = self.sample.write().unwrap();
247        self.sample.replace(SampleStorage::Loaned(Arc::new(t)));
248    }
249
250    pub fn clear(&mut self) {
251        //let mut sample = self.sample.write().unwrap();
252        let t = self.sample.take();
253
254        match &t {
255            Some(SampleStorage::Owned(_o)) => {}
256            Some(SampleStorage::Loaned(_o)) => {}
257            None => {}
258        }
259    }
260
261    pub fn from(it: Arc<T>) -> Self {
262        Self {
263            serdata : None,
264            sample: Some(SampleStorage::Owned(it)),
265        }
266    }
267}
268
269impl<T> Default for Sample<T> {
270    fn default() -> Self {
271        Self {
272            serdata : None,
273            sample: None,
274        }
275    }
276}
277
278impl<T> Drop for Sample<T> {
279    fn drop(&mut self) {
280        if let Some(serdata) = self.serdata {
281            unsafe {ddsi_serdata_removeref(serdata)};
282        }
283    }
284}
285
286
287
288
289///
290/// TODO: UNSAFE WARNING Review needed. Forcing SampleBuffer<T> to be Send
291/// DDS read API uses an array of void* pointers. The SampleBuffer<T> structure
292/// is used to create the sample array in the necessary format.
293/// We allocate the Sample<T> structure and set it to deallocated here.
294/// Cyclone does not allocate the sample, it only sets the value of the Arc<T>
295/// inside the Sample<T>::Value<Arc<T>>.
296/// So this structure always points to a valid sample memory, but the serdes callbacks
297/// can change the value of the sample under us.
298/// To be absolutely sure, I think we must put each sample into an RwLock<Arc<T>> instead of
299/// an Arc<T>, I guess this is the cost we pay for zero copy.
300
301unsafe impl<T> Send for SampleBuffer<T> {}
302pub struct SampleBuffer<T> {
303    /// This is !Send. This is the only way to punch through the Cyclone API as we need an array of pointers
304    pub(crate) buffer: Vec<*mut Sample<T>>,
305    pub(crate) sample_info: Vec<cyclonedds_sys::dds_sample_info>,
306}
307
308impl<'a, T:TopicType> SampleBuffer<T> {
309    pub fn new(len: usize) -> Self {
310        let mut buf = Self {
311            buffer: Vec::new(),
312            sample_info: vec![cyclonedds_sys::dds_sample_info::default(); len],
313        };
314
315        for _i in 0..len {
316            let p = Box::into_raw(Box::default());
317            buf.buffer.push(p);
318        }
319        buf
320    }
321
322    /// Check if sample is valid. Will panic if out of
323    /// bounds.
324    pub fn is_valid_sample(&self, index: usize) -> bool {
325        self.sample_info[index].valid_data
326    }
327
328    pub fn len(&self) -> usize {
329        self.buffer.len()
330    }
331
332    pub fn iter(&'a self) -> impl Iterator<Item = &T> {
333        let p = self.buffer.iter().filter_map(|p| {
334            let sample = unsafe { &*(*p) };
335            sample.try_deref()
336            
337        });
338        p
339    }
340
341    /// Get a sample
342    pub fn get(&self, index: usize) -> &Sample<T> {
343        let p_sample = self.buffer[index];
344        unsafe { &*p_sample }
345    }
346
347    /// return a raw pointer to the buffer and the sample info
348    /// to be used in unsafe code that calls the CycloneDDS
349    /// API
350    pub unsafe fn as_mut_ptr(&mut self) -> (*mut *mut Sample<T>, *mut dds_sample_info) {
351        (self.buffer.as_mut_ptr(), self.sample_info.as_mut_ptr())
352    }
353}
354
355impl<'a, T> Drop for SampleBuffer<T> {
356    fn drop(&mut self) {
357        for p in &self.buffer {
358            unsafe {
359                let _it = Box::from_raw(*p);
360            }
361        }
362    }
363}
364/*
365impl <'a,T>Index<usize> for SampleBuffer<T> {
366    type Output = &'a Sample<T>;
367    fn index<'a>(&'a self, i: usize) -> &'a Sample<T> {
368        &self.e[i]
369    }
370}
371*/
372
373#[allow(dead_code)]
374unsafe extern "C" fn zero_samples<T>(
375    _sertype: *const ddsi_sertype,
376    _ptr: *mut std::ffi::c_void,
377    _len: size_t,
378) {
379} // empty implementation
380
381#[allow(dead_code)]
382extern "C" fn realloc_samples<T>(
383    ptrs: *mut *mut std::ffi::c_void,
384    _sertype: *const ddsi_sertype,
385    old: *mut std::ffi::c_void,
386    old_count: size_t,
387    new_count: size_t,
388) {
389    //println!("realloc");
390    let old = unsafe {
391        Vec::<*mut Sample<T>>::from_raw_parts(
392            old as *mut *mut Sample<T>,
393            old_count as usize,
394            old_count as usize,
395        )
396    };
397    let mut new = Vec::<*mut Sample<T>>::with_capacity(new_count as usize);
398
399    if new_count >= old_count {
400        for entry in old {
401            new.push(entry);
402        }
403
404        for _i in 0..(new_count - old_count) {
405            new.push(Box::into_raw(Box::default()));
406        }
407    } else {
408        for e in old.into_iter().take(new_count as usize) {
409            new.push(e)
410        }
411    }
412
413    let leaked = new.leak();
414
415    let (raw, _length) = (leaked.as_ptr(), leaked.len());
416    // if the length and allocated length are not equal, we messed up above.
417    //assert_eq!(length, allocated_length);
418    unsafe {
419        *ptrs = raw as *mut std::ffi::c_void;
420    }
421}
422
423#[allow(dead_code)]
424extern "C" fn free_samples<T>(
425    _sertype: *const ddsi_sertype,
426    ptrs: *mut *mut std::ffi::c_void,
427    len: size_t,
428    op: dds_free_op_t,
429) where
430    T: TopicType,
431{
432    let ptrs_v: *mut *mut Sample<T> = ptrs as *mut *mut Sample<T>;
433
434    if (op & DDS_FREE_ALL_BIT) != 0 {
435        let _samples =
436            unsafe { Vec::<Sample<T>>::from_raw_parts(*ptrs_v, len as usize, len as usize) };
437        // all samples will get freed when samples goes out of scope
438    } else {
439        assert_ne!(op & DDS_FREE_CONTENTS_BIT, 0);
440        let mut samples =
441            unsafe { Vec::<Sample<T>>::from_raw_parts(*ptrs_v, len as usize, len as usize) };
442        for sample in samples.iter_mut() {
443            //let _old_sample = std::mem::take(sample);
444            sample.clear()
445            //_old_sample goes out of scope and the content is freed. The pointer is replaced with a default constructed sample
446        }
447        let _intentional_leak = samples.leak();
448    }
449}
450
451#[allow(dead_code)]
452unsafe extern "C" fn free_sertype<T>(sertype: *mut cyclonedds_sys::ddsi_sertype) {
453    ddsi_sertype_fini(sertype);
454
455    let _sertype_ops = Box::<ddsi_sertype_ops>::from_raw((*sertype).ops as *mut ddsi_sertype_ops);
456    let _serdata_ops =
457        Box::<ddsi_serdata_ops>::from_raw((*sertype).serdata_ops as *mut ddsi_serdata_ops);
458    // this sertype is always constructed in Rust. During destruction,
459    // the Box takes over the pointer and frees it when it goes out
460    // of scope.
461    let sertype = sertype as *mut SerType<T>;
462    let _it = Box::<SerType<T>>::from_raw(sertype);
463}
464
465// create ddsi_serdata from a fragchain
466#[allow(dead_code)]
467unsafe extern "C" fn serdata_from_fragchain<T>(
468    sertype: *const ddsi_sertype,
469    kind: u32,
470    mut fragchain: *const nn_rdata,
471    size: size_t,
472) -> *mut ddsi_serdata
473where
474    T: DeserializeOwned + TopicType,
475{
476    //println!("serdata_from_fragchain");
477    let mut off: u32 = 0;
478    let size = size as usize;
479    let fragchain_ref = &*fragchain;
480
481    let mut serdata = SerData::<T>::new(sertype, kind);
482
483    assert_eq!(fragchain_ref.min, 0);
484    assert!(fragchain_ref.maxp1 >= off);
485
486    // The scatter gather list
487    let mut sg_list = Vec::new();
488
489    while !fragchain.is_null() {
490        let fragchain_ref = &*fragchain;
491        if fragchain_ref.maxp1 > off {
492            let payload =
493                nn_rmsg_payload_offset(fragchain_ref.rmsg, nn_rdata_payload_offset(fragchain));
494            let src = payload.add((off - fragchain_ref.min) as usize);
495            let n_bytes = fragchain_ref.maxp1 - off;
496            sg_list.push(std::slice::from_raw_parts(src, n_bytes as usize));
497            off = fragchain_ref.maxp1;
498            assert!(off as usize <= size);
499        }
500        fragchain = fragchain_ref.nextfrag;
501    }
502    // make a reader out of the sg_list
503    let reader = SGReader::new(&sg_list);
504    if let Ok(decoded) = cdr::deserialize_from::<_, T, _>(reader, Bounded(size as u64)) {
505        if T::has_key() {
506            // compute the 16byte key hash
507            let key_cdr = decoded.key_cdr();
508            // skip the four byte header
509            let key_cdr = &key_cdr[4..];
510            compute_key_hash(key_cdr, &mut serdata);
511        }
512        serdata.serdata.hash = decoded.hash((*sertype).serdata_basehash);
513        let sample = std::sync::Arc::new(decoded);
514        //store the deserialized sample in the serdata. We don't need to deserialize again
515        serdata.sample = SampleData::SDKData(sample);
516    } else {
517        println!("Deserialization error!");
518        return std::ptr::null_mut();
519    }
520
521    //store the hash into the serdata
522
523    // convert into raw pointer and forget about it (for now). Cyclone will take ownership.
524    let ptr = Box::into_raw(serdata);
525    // only we know this ddsi_serdata is really of type SerData
526    ptr as *mut ddsi_serdata
527}
528
529fn copy_raw_key_hash<T>(key: &[u8], serdata: &mut Box<SerData<T>>) {
530    let mut raw_key = [0u8; 16];
531    for (i, data) in key.iter().enumerate() {
532        raw_key[i] = *data;
533    }
534    serdata.key_hash = KeyHash::RawKey(raw_key)
535}
536
537fn compute_key_hash<T>(key_cdr: &[u8], serdata: &mut SerData<T>)
538where
539    T: TopicType,
540{
541    let mut cdr_key = [0u8; 20];
542
543    if T::force_md5_keyhash() || key_cdr.len() > 16 {
544        let mut md5st = ddsrt_md5_state_t::default();
545        let md5set = &mut md5st as *mut ddsrt_md5_state_s;
546        unsafe {
547            ddsrt_md5_init(md5set);
548            ddsrt_md5_append(md5set, key_cdr.as_ptr(), key_cdr.len() as u32);
549            ddsrt_md5_finish(md5set, cdr_key.as_mut_ptr());
550        }
551    } else {
552        for (i, data) in key_cdr.iter().enumerate() {
553            cdr_key[i] = *data;
554        }
555    }
556    serdata.key_hash = KeyHash::CdrKey(cdr_key)
557}
558
559#[allow(dead_code)]
560unsafe extern "C" fn serdata_from_keyhash<T>(
561    sertype: *const ddsi_sertype,
562    keyhash: *const ddsi_keyhash,
563) -> *mut ddsi_serdata
564where
565    T: TopicType,
566{
567    let keyhash = (*keyhash).value;
568    //println!("serdata_from_keyhash");
569
570    if T::force_md5_keyhash() {
571        // this means keyhas fits in 16 bytes
572        std::ptr::null_mut()
573    } else {
574        let mut serdata = SerData::<T>::new(sertype, ddsi_serdata_kind_SDK_KEY);
575        serdata.sample = SampleData::SDKKey;
576
577        let mut key_hash_buffer = [0u8; 20];
578        let key_hash = &mut key_hash_buffer[4..];
579
580        for (i, b) in keyhash.iter().enumerate() {
581            key_hash[i] = *b;
582        }
583
584        serdata.key_hash = KeyHash::CdrKey(key_hash_buffer);
585
586        let ptr = Box::into_raw(serdata);
587        // only we know this ddsi_serdata is really of type SerData
588        ptr as *mut ddsi_serdata
589    }
590}
591
592#[allow(dead_code)]
593#[allow(non_upper_case_globals)]
594unsafe extern "C" fn serdata_from_sample<T>(
595    sertype: *const ddsi_sertype,
596    kind: u32,
597    sample: *const c_void,
598) -> *mut ddsi_serdata
599where
600    T: TopicType,
601{
602    //println!("Serdata from sample {:?}", sample);
603    let mut serdata = SerData::<T>::new(sertype, kind);
604    let sample = sample as *const Sample<T>;
605    let sample = &*sample;
606
607    match kind {
608        #[allow(non_upper_case_globals)]
609        ddsi_serdata_kind_SDK_DATA => {
610            let sample = sample.get().unwrap();
611            serdata.serdata.hash = sample.hash((*sertype).serdata_basehash);
612            serdata.sample = SampleData::SDKData(sample);
613        }
614        ddsi_serdata_kind_SDK_KEY => {
615            panic!("Don't know how to create serdata from sample for SDK_KEY");
616        }
617        _ => panic!("Unexpected kind"),
618    }
619
620    let ptr = Box::into_raw(serdata);
621    // only we know this ddsi_serdata is really of type SerData
622    ptr as *mut ddsi_serdata
623}
624
625#[allow(dead_code)]
626unsafe extern "C" fn serdata_from_iov<T>(
627    sertype: *const ddsi_sertype,
628    kind: u32,
629    niov: size_t,
630    iov: *const iovec,
631    size: size_t,
632) -> *mut ddsi_serdata
633where
634    T: DeserializeOwned + TopicType,
635{
636    let size = size as usize;
637    let niov = niov as usize;
638    //println!("serdata_from_iov");
639
640    let mut serdata = SerData::<T>::new(sertype, kind);
641
642    let iovs = std::slice::from_raw_parts(iov as *const cyclonedds_sys::iovec, niov);
643
644    let iov_slices: Vec<&[u8]> = iovs
645        .iter()
646        .map(|iov| {
647            let iov = iov;
648
649            std::slice::from_raw_parts(iov.iov_base as *const u8, iov.iov_len as usize)
650        })
651        .collect();
652
653    // make a reader out of the sg_list
654    let reader = SGReader::new(&iov_slices);
655
656    if let Ok(decoded) = cdr::deserialize_from::<_, T, _>(reader, Bounded(size as u64)) {
657        if T::has_key() {
658            // compute the 16byte key hash
659            let key_cdr = decoded.key_cdr();
660            // skip the four byte header
661            let key_cdr = &key_cdr[4..];
662            compute_key_hash(key_cdr, &mut serdata);
663        }
664        serdata.serdata.hash = decoded.hash((*sertype).serdata_basehash);
665        let sample = std::sync::Arc::new(decoded);
666        //store the deserialized sample in the serdata. We don't need to deserialize again
667        serdata.sample = SampleData::SDKData(sample);
668    } else {
669        //println!("Deserialization error!");
670        return std::ptr::null_mut();
671    }
672
673    // convert into raw pointer and forget about it as ownership is passed into cyclonedds
674    let ptr = Box::into_raw(serdata);
675    // only we know this ddsi_serdata is really of type SerData
676    ptr as *mut ddsi_serdata
677}
678
679#[allow(dead_code)]
680unsafe extern "C" fn free_serdata<T>(serdata: *mut ddsi_serdata) {
681    //println!("free_serdata");
682    // the pointer is really a *mut SerData
683    let ptr = serdata as *mut SerData<T>;
684
685    let serdata = &mut *ptr;
686
687    if !serdata.serdata.iox_subscriber.is_null() {
688        let iox_subscriber: *mut iox_sub_t = serdata.serdata.iox_subscriber as *mut iox_sub_t;
689        let chunk = &mut serdata.serdata.iox_chunk;
690        let chunk = chunk as *mut *mut c_void;
691        //println!("Free iox chunk");
692        free_iox_chunk(iox_subscriber, chunk);
693    }
694
695    let _data = Box::from_raw(ptr);
696    // _data goes out of scope and frees the SerData. Nothing more to do here.
697}
698
699#[allow(dead_code)]
700unsafe extern "C" fn get_size<T>(serdata: *const ddsi_serdata) -> u32
701where
702    T: Serialize + TopicType,
703{
704    let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
705    let size = match &serdata.sample {
706        SampleData::Uninitialized => 0,
707        SampleData::SDKKey => serdata.key_hash.key_length() as u32,
708        // This function asks for the serialized size so we do this even for SHM Data
709        SampleData::SDKData(sample) => {
710            serdata.serialized_size =
711                Some((cdr::calc_serialized_size::<T>(sample.deref())) as u32);
712            *serdata.serialized_size.as_ref().unwrap()
713        }
714        SampleData::SHMData(_sample) => {
715            // we refuse to serialize SHM data so return 0
716            0
717            /*
718            serdata.serialized_size = Some((cdr::calc_serialized_size::<T>(sample.as_ref())) as u32);
719            *serdata.serialized_size.as_ref().unwrap()
720            */
721        }
722    };
723    size
724}
725
726#[allow(dead_code)]
727unsafe extern "C" fn eqkey<T>(
728    serdata_a: *const ddsi_serdata,
729    serdata_b: *const ddsi_serdata,
730) -> bool {
731    let a = SerData::<T>::mut_ref_from_serdata(serdata_a);
732    let b = SerData::<T>::mut_ref_from_serdata(serdata_b);
733    a.key_hash == b.key_hash
734}
735
736#[allow(dead_code)]
737unsafe extern "C" fn serdata_to_ser<T>(
738    serdata: *const ddsi_serdata,
739    size: size_t,
740    offset: size_t,
741    buf: *mut c_void,
742) where
743    T: Serialize + TopicType,
744{
745    //println!("serdata_to_ser");
746    let serdata = SerData::<T>::const_ref_from_serdata(serdata);
747    let buf = buf as *mut u8;
748    let buf = buf.add(offset as usize);
749
750    if size == 0 {
751        return;
752    }
753
754    match &serdata.sample {
755        SampleData::Uninitialized => {
756            panic!("Attempt to serialize uninitialized serdata")
757        }
758        SampleData::SDKKey => match &serdata.key_hash {
759            KeyHash::None => {}
760            KeyHash::CdrKey(k) => std::ptr::copy_nonoverlapping(k.as_ptr(), buf, size as usize),
761            KeyHash::RawKey(k) => std::ptr::copy_nonoverlapping(k.as_ptr(), buf, size as usize),
762        },
763        // We may serialize both SDK data as well as SHM Data
764        SampleData::SDKData(serdata) => {
765            let buf_slice = std::slice::from_raw_parts_mut(buf, size as usize);
766            if let Err(e) = cdr::serialize_into::<_, T, _, CdrBe>(
767                buf_slice,
768                serdata.deref(),
769                Bounded(size),
770            ) {
771                panic!("Unable to serialize type {:?} due to {}", T::typename(), e);
772            }
773        }
774        SampleData::SHMData(serdata) => {
775            let buf_slice = std::slice::from_raw_parts_mut(buf, size as usize);
776            if let Err(e) = cdr::serialize_into::<_, T, _, CdrBe>(
777                buf_slice,
778                serdata.as_ref(),
779                Bounded(size),
780            ) {
781                panic!("Unable to serialize type {:?} due to {}", T::typename(), e);
782            }
783        }
784    }
785}
786
787#[allow(dead_code)]
788unsafe extern "C" fn serdata_to_ser_ref<T>(
789    serdata: *const ddsi_serdata,
790    offset: size_t,
791    size: size_t,
792    iov: *mut iovec,
793) -> *mut ddsi_serdata
794where
795    T: Serialize + TopicType,
796{
797    //println!("serdata_to_ser_ref");
798    let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
799    let iov = &mut *iov;
800
801    match &serdata.sample {
802        SampleData::Uninitialized => panic!("Attempt to serialize uninitialized Sample"),
803        SampleData::SDKKey => {
804            let (p, len) = match &serdata.key_hash {
805                KeyHash::None => (std::ptr::null(), 0),
806                KeyHash::CdrKey(k) => (k.as_ptr(), k.len()),
807                KeyHash::RawKey(k) => (k.as_ptr(), k.len()),
808            };
809
810            iov.iov_base = p as *mut c_void;
811            iov.iov_len = len as size_t;
812        }
813        SampleData::SDKData(sample) => {
814            if serdata.cdr.is_none() {
815                serdata.cdr = serialize_type::<T>(sample, serdata.serialized_size).ok();
816            }
817            if let Some(cdr) = &serdata.cdr {
818                let offset = offset as usize;
819                let mut last = offset + size as usize;
820                if last > cdr.len() - 1 {
821                    last = cdr.len() - 1;
822                }
823                let cdr = &cdr[offset..last];
824                // cdds rounds up the length into multiple of 4. We mirror that by allocating extra in the
825                // ``serialize_type`` function.
826                iov.iov_base = cdr.as_ptr() as *mut c_void;
827                iov.iov_len = size; //cdr.len() as size_t;
828            } else {
829                println!("Serialization error!");
830                return std::ptr::null_mut();
831            }
832        }
833
834        SampleData::SHMData(sample) => {
835            if serdata.cdr.is_none() {
836                serdata.cdr = serialize_type::<T>(sample.as_ref(), serdata.serialized_size).ok();
837            }
838            if let Some(cdr) = &serdata.cdr {
839                let offset = offset as usize;
840                let last = offset + size as usize;
841                let cdr = &cdr[offset..last];
842                iov.iov_base = cdr.as_ptr() as *mut c_void;
843                iov.iov_len = cdr.len() as size_t;
844            } else {
845                println!("Serialization error (SHM)!");
846                return std::ptr::null_mut();
847            }
848        }
849    }
850    ddsi_serdata_addref(&serdata.serdata)
851}
852
853fn serialize_type<T: Serialize>(sample: &T, maybe_size: Option<u32>) -> Result<Vec<u8>, ()> {
854    if let Some(size) = maybe_size {
855        // Round up allocation to multiple of four
856        let size = (size + 3) & !3u32;
857        let mut buffer = Vec::<u8>::with_capacity(size as usize);
858        if let Ok(()) = cdr::serialize_into::<_, T, _, CdrBe>(&mut buffer, sample, Infinite) {
859            Ok(buffer)
860        } else {
861            Err(())
862        }
863    } else if let Ok(data) = cdr::serialize::<T, _, CdrBe>(sample, Infinite) {
864        Ok(data)
865    } else {
866        Err(())
867    }
868}
869
870#[allow(dead_code)]
871unsafe extern "C" fn serdata_to_ser_unref<T>(serdata: *mut ddsi_serdata, _iov: *const iovec) {
872    //println!("serdata_to_ser_unref");
873    let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
874    ddsi_serdata_removeref(&mut serdata.serdata)
875}
876
877fn deserialize_type<T>(data:&[u8]) -> Result<Arc<T>,()> 
878    where
879    T: DeserializeOwned {
880        cdr::deserialize::<Box<T>>(data).map(Arc::from).map_err(|_e|())
881    }
882
883#[allow(dead_code)]
884unsafe extern "C" fn serdata_to_sample<T>(
885    serdata_ptr: *const ddsi_serdata,
886    sample: *mut c_void,
887    _bufptr: *mut *mut c_void,
888    _buflim: *mut c_void,
889) -> bool
890where
891    T: DeserializeOwned + TopicType,
892{
893    //println!(
894    //    "serdata to sample serdata:{:?} sample:{:?} bufptr:{:?} buflim:{:?}",
895    //    serdata, sample, _bufptr, _buflim
896    //);
897    let mut serdata = SerData::<T>::mut_ref_from_serdata(serdata_ptr);
898    let mut s = Box::<Sample<T>>::from_raw(sample as *mut Sample<T>);
899    assert!(!sample.is_null());
900
901    //#[cfg(shm)]
902    let ret = if !serdata.serdata.iox_chunk.is_null() {
903        // We got data from Iceoryx, deal with it
904        let hdr = iceoryx_header_from_chunk(serdata.serdata.iox_chunk);
905        if (*hdr).shm_data_state == iox_shm_data_state_t_IOX_CHUNK_CONTAINS_SERIALIZED_DATA {
906            // we have to deserialize the data now
907            let reader = std::slice::from_raw_parts(
908                serdata.serdata.iox_chunk as *const u8,
909                (*hdr).data_size as usize,
910            );
911            if serdata.serdata.kind == ddsi_serdata_kind_SDK_KEY {
912                compute_key_hash(reader, serdata);
913                serdata.sample = SampleData::SDKKey;
914                Ok(())
915            } else if let Ok(decoded) = deserialize_type::<T>(reader) {
916                if T::has_key() {
917                    // compute the 16byte key hash
918                    let key_cdr = decoded.key_cdr();
919                    // skip the four byte header
920                    let key_cdr = &key_cdr[4..];
921                    compute_key_hash(key_cdr, serdata);
922                }
923                //let sample = std::sync::Arc::new(decoded);
924                //store the deserialized sample in the serdata. We don't need to deserialize again
925                s.set(decoded.clone());
926                serdata.sample = SampleData::SDKData(decoded);
927
928                Ok(())
929            } else {
930                println!("Deserialization error!");
931                Err(())
932            }
933        } else {
934            // Not serialized data, we make a sample out of the data and store it in our sample
935            assert_eq!((*hdr).data_size as usize, std::mem::size_of::<T>());
936            if std::mem::size_of::<T>() == (*hdr).data_size as usize {
937                // Pay Attention here
938                //
939                //
940                let p: *mut T = serdata.serdata.iox_chunk as *mut T;
941                serdata.sample = SampleData::SHMData(NonNull::new_unchecked(p));
942                Ok(())
943            } else {
944                Err(())
945            }
946        }
947    } else {
948        Ok(())
949    };
950
951    let ret = if let Ok(()) = ret {
952        match &serdata.sample {
953            SampleData::Uninitialized => true,
954            SampleData::SDKKey => true,
955            SampleData::SDKData(_data) => {
956                s.set_serdata(serdata_ptr as *mut ddsi_serdata);
957                //s.set(data.clone());
958                false
959            }
960            SampleData::SHMData(_data) => {
961                s.set_serdata(serdata_ptr as *mut ddsi_serdata);
962                //s.set_loaned(data.clone());
963                false
964            }
965        }
966    } else {
967        true
968    };
969
970    // leak the sample intentionally so it doesn't get deallocated here
971    let _intentional_leak = Box::into_raw(s);
972    ret
973}
974
975#[allow(dead_code)]
976unsafe extern "C" fn serdata_to_untyped<T>(serdata: *const ddsi_serdata) -> *mut ddsi_serdata {
977    //println!("serdata_to_untyped {:?}", serdata);
978    let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
979
980    //if let SampleData::<T>::SDKData(_d) = &serdata.sample {
981    let mut untyped_serdata = SerData::<T>::new(serdata.serdata.type_, ddsi_serdata_kind_SDK_KEY);
982    // untype it
983    untyped_serdata.serdata.type_ = std::ptr::null_mut();
984    untyped_serdata.sample = SampleData::SDKKey;
985
986    //copy the hashes
987    untyped_serdata.key_hash = serdata.key_hash.clone();
988    untyped_serdata.serdata.hash = serdata.serdata.hash;
989
990    let ptr = Box::into_raw(untyped_serdata);
991
992    ptr as *mut ddsi_serdata
993    //} else {
994    //    println!("Error: Cannot convert from untyped to untyped");
995    //    std::ptr::null_mut()
996    //}
997}
998
999#[allow(dead_code)]
1000unsafe extern "C" fn untyped_to_sample<T>(
1001    _sertype: *const ddsi_sertype,
1002    _serdata: *const ddsi_serdata,
1003    sample: *mut c_void,
1004    _buf: *mut *mut c_void,
1005    _buflim: *mut c_void,
1006) -> bool
1007where
1008    T: TopicType,
1009{
1010    //println!("untyped to sample!");
1011    if !sample.is_null() {
1012        let mut sample = Box::<Sample<T>>::from_raw(sample as *mut Sample<T>);
1013        // hmm. We don't store serialized data in serdata. I'm not really sure how
1014        // to implement this. For now, invalidate the sample.
1015        sample.clear();
1016        // leak this as we don't want to deallocate it.
1017        let _leaked = Box::<Sample<T>>::into_raw(sample);
1018        true
1019    } else {
1020        false
1021    }
1022}
1023
1024#[allow(dead_code)]
1025unsafe extern "C" fn get_keyhash<T>(
1026    serdata: *const ddsi_serdata,
1027    keyhash: *mut ddsi_keyhash,
1028    _force_md5: bool,
1029) {
1030    let serdata = SerData::<T>::mut_ref_from_serdata(serdata);
1031    let keyhash = &mut *keyhash;
1032
1033    let src = match &serdata.key_hash {
1034        KeyHash::None => &[],
1035        KeyHash::CdrKey(k) => &k[4..],
1036        KeyHash::RawKey(k) => &k[..],
1037    };
1038
1039    //let source_key_hash = &serdata.key_hash[4..];
1040    for (i, b) in src.iter().enumerate() {
1041        keyhash.value[i] = *b;
1042    }
1043}
1044
1045#[allow(dead_code)]
1046unsafe extern "C" fn print<T>(
1047    _sertype: *const ddsi_sertype,
1048    _serdata: *const ddsi_serdata,
1049    _buf: *mut std::os::raw::c_char,
1050    _bufsize: size_t,
1051) -> size_t {
1052    0
1053}
1054
1055fn create_sertype_ops<T>() -> Box<ddsi_sertype_ops>
1056where
1057    T: TopicType,
1058{
1059    Box::new(ddsi_sertype_ops {
1060        version: Some(ddsi_sertype_v0),
1061        arg: std::ptr::null_mut(),
1062        free: Some(free_sertype::<T>),
1063        zero_samples: Some(zero_samples::<T>),
1064        realloc_samples: Some(realloc_samples::<T>),
1065        free_samples: Some(free_samples::<T>),
1066        equal: Some(equal::<T>),
1067        hash: Some(hash::<T>),
1068        ..Default::default()
1069    })
1070}
1071
1072#[cfg(feature = "shm")]
1073#[allow(dead_code)]
1074unsafe extern "C" fn get_sample_size(serdata: *const ddsi_serdata) -> u32 {
1075    let serdata = *serdata;
1076    (*serdata.type_).iox_size
1077}
1078
1079#[cfg(feature = "shm")]
1080#[allow(dead_code)]
1081unsafe extern "C" fn from_iox_buffer<T>(
1082    sertype: *const ddsi_sertype,
1083    kind: ddsi_serdata_kind,
1084    /*_deserialize_hint : bool,*/
1085    sub: *mut ::std::os::raw::c_void,
1086    buffer: *mut ::std::os::raw::c_void,
1087) -> *mut ddsi_serdata {
1088    //println!("from_iox_buffer");
1089
1090    if sertype.is_null() {
1091        return std::ptr::null::<ddsi_serdata>() as *mut ddsi_serdata;
1092    }
1093
1094    let mut d = SerData::<T>::new(sertype, kind);
1095
1096    // from loaned sample, just take the pointer
1097    if sub.is_null() {
1098        d.serdata.iox_chunk = buffer;
1099    } else {
1100        //println!("from_iox_buffer: take pointer {:?}from iox", buffer);
1101        // from iox buffer
1102        d.serdata.iox_chunk = buffer;
1103        d.serdata.iox_subscriber = sub;
1104        let hdr = iceoryx_header_from_chunk(buffer);
1105        // Copy the key hash (TODO: Check this)
1106        copy_raw_key_hash(&(*hdr).keyhash.value, &mut d);
1107    }
1108
1109    // we don't deserialize right away
1110    d.sample = SampleData::SHMData(NonNull::new_unchecked(buffer as *mut T));
1111
1112    let ptr = Box::into_raw(d);
1113    // only we know this ddsi_serdata is really of type SerData
1114    ptr as *mut ddsi_serdata
1115}
1116
1117fn create_serdata_ops<T>() -> Box<ddsi_serdata_ops>
1118where
1119    T: DeserializeOwned + TopicType + Serialize ,
1120{
1121    Box::new(ddsi_serdata_ops {
1122        eqkey: Some(eqkey::<T>),
1123        get_size: Some(get_size::<T>),
1124        from_ser: Some(serdata_from_fragchain::<T>),
1125        from_ser_iov: Some(serdata_from_iov::<T>),
1126        from_keyhash: Some(serdata_from_keyhash::<T>),
1127        from_sample: Some(serdata_from_sample::<T>),
1128        to_ser: Some(serdata_to_ser::<T>),
1129        to_ser_ref: Some(serdata_to_ser_ref::<T>),
1130        to_ser_unref: Some(serdata_to_ser_unref::<T>),
1131        to_sample: Some(serdata_to_sample::<T>),
1132        to_untyped: Some(serdata_to_untyped::<T>),
1133        untyped_to_sample: Some(untyped_to_sample::<T>),
1134        free: Some(free_serdata::<T>),
1135        print: Some(print::<T>),
1136        get_keyhash: Some(get_keyhash::<T>),
1137        #[cfg(feature = "shm")]
1138        get_sample_size: Some(get_sample_size),
1139        #[cfg(feature = "shm")]
1140        from_iox_buffer: Some(from_iox_buffer::<T>),
1141        ..Default::default()
1142    })
1143}
1144
1145// not sure what this needs to do. The C++ implementation at
1146// https://github.com/eclipse-cyclonedds/cyclonedds-cxx/blob/templated-streaming/src/ddscxx/include/org/eclipse/cyclonedds/topic/datatopic.hpp
1147// just returns 0
1148// Update! : Now I understand this after debugging crashes when stress testing
1149// with a large number of types being published. This hash is used as the hash
1150// lookup in hopscotch.c. 
1151// /*
1152//  * The hopscotch hash table is dependent on a proper functioning hash.
1153//  * If the hash function generates a lot of hash collisions, then it will
1154//  * not be able to handle that by design.
1155//  * It is capable of handling some collisions, but not more than 32 per
1156//  * bucket (less, when other hash values are clustered around the
1157//  * collision value).
1158//  * When proper distributed hash values are generated, then hopscotch
1159//  * works nice and quickly.
1160//  */
1161
1162unsafe extern "C" fn hash<T: TopicType>(tp: *const ddsi_sertype) -> u32  
1163{
1164    if let Some(ser_type) = SerType::<T>::try_from_sertype(tp) {
1165        let type_name =  CStr::from_ptr(ser_type.sertype.type_name);
1166        let type_name_bytes = type_name.to_bytes();
1167        let type_size = core::mem::size_of::<T>().to_ne_bytes();
1168        let sg_list = [type_name_bytes,&type_size];
1169        let mut sg_buffer = SGReader::new(&sg_list);
1170
1171        let hash = murmur3_32(&mut sg_buffer, 0);
1172        
1173        let _intentional_leak = SerType::<T>::into_sertype(ser_type);
1174        hash.unwrap_or(0)
1175
1176    } else {
1177        0
1178    }
1179}
1180
1181unsafe extern "C" fn equal<T>(acmn: *const ddsi_sertype, bcmn: *const ddsi_sertype) -> bool {
1182    let acmn = CStr::from_ptr((*acmn).type_name as *mut std::os::raw::c_char);
1183    let bcmn = CStr::from_ptr((*bcmn).type_name as *mut std::os::raw::c_char);
1184    acmn == bcmn
1185}
1186
1187#[derive(Clone)]
1188enum SampleData<T> {
1189    Uninitialized,
1190    SDKKey,
1191    SDKData(std::sync::Arc<T>),
1192    SHMData(NonNull<T>),
1193}
1194
1195impl<T> Default for SampleData<T> {
1196    fn default() -> Self {
1197        Self::Uninitialized
1198    }
1199}
1200
1201
1202#[derive(PartialEq, Clone)]
1203enum KeyHash {
1204    None,
1205    CdrKey([u8; 20]),
1206    RawKey([u8; 16]),
1207}
1208
1209impl Default for KeyHash {
1210    fn default() -> Self {
1211        Self::None
1212    }
1213}
1214
1215impl KeyHash {
1216    fn get_key_hash(&self) -> &[u8] {
1217        match self {
1218            KeyHash::None => &[],
1219            KeyHash::CdrKey(cdr_key_hash) => cdr_key_hash,
1220            KeyHash::RawKey(raw_key_hash) => raw_key_hash,
1221        }
1222    }
1223    fn key_length(&self) -> usize {
1224        match self {
1225            KeyHash::CdrKey(k) => k.len(),
1226            KeyHash::RawKey(k) => k.len(),
1227            _ => 0,
1228        }
1229    }
1230}
1231
1232/// A representation for the serialized data.
1233#[repr(C)]
1234pub (crate)struct SerData<T> {
1235    serdata: ddsi_serdata,
1236    sample: SampleData<T>,
1237    //data in CDR format. This is put into an option as we only create
1238    //the serialized version when we need it
1239    cdr: Option<Vec<u8>>,
1240    //key_hash: ddsi_keyhash,
1241    // include 4 bytes of CDR encapsulation header
1242    //key_hash: [u8; 20],
1243    key_hash: KeyHash,
1244    // We store the serialized size here if available
1245    serialized_size: Option<u32>,
1246}
1247
1248impl<'a, T> SerData<T> {
1249    fn new(sertype: *const ddsi_sertype, kind: u32) -> Box<SerData<T>> {
1250        Box::<SerData<T>>::new(SerData {
1251            serdata: {
1252                let mut data = std::mem::MaybeUninit::uninit();
1253                unsafe {
1254                    ddsi_serdata_init(data.as_mut_ptr(), sertype, kind);
1255                    data.assume_init()
1256                }
1257            },
1258            sample: SampleData::default(),
1259            cdr: None,
1260            key_hash: KeyHash::default(),
1261            serialized_size: None,
1262        })
1263    }
1264
1265    fn const_ref_from_serdata(serdata: *const ddsi_serdata) -> &'a Self {
1266        let ptr = serdata as *const SerData<T>;
1267        unsafe { &*ptr }
1268    }
1269
1270    fn mut_ref_from_serdata(serdata: *const ddsi_serdata) -> &'a mut Self {
1271        let ptr = serdata as *mut SerData<T>;
1272        unsafe { &mut *ptr }
1273    }
1274}
1275
1276impl <T>Clone for SerData<T> {
1277    fn clone(&self) -> Self {
1278        Self { 
1279                serdata: {
1280                    let mut newdata = self.serdata;
1281                    unsafe {ddsi_serdata_addref(&mut newdata)};
1282                    newdata
1283                }, sample:  match &self.sample {
1284                        SampleData::Uninitialized => SampleData::Uninitialized,
1285                        SampleData::SDKKey => SampleData::SDKKey,
1286                        SampleData::SDKData(d) => SampleData::SDKData(d.clone()),
1287                        SampleData::SHMData(d) => SampleData::SHMData(*d),
1288                    }, cdr: self.cdr.clone(), key_hash: self.key_hash.clone(), serialized_size: self.serialized_size }
1289    }
1290} 
1291
1292
1293
1294/*  These functions are created from the macros in
1295    https://github.com/eclipse-cyclonedds/cyclonedds/blob/f879dc0ef56eb00857c0cbb66ee87c577ff527e8/src/core/ddsi/include/dds/ddsi/q_radmin.h#L108
1296    Bad things will happen if these macros change.
1297    Some discussions here: https://github.com/eclipse-cyclonedds/cyclonedds/issues/830
1298*/
1299fn nn_rdata_payload_offset(rdata: *const nn_rdata) -> usize {
1300    unsafe { (*rdata).payload_zoff as usize }
1301}
1302
1303fn nn_rmsg_payload(rmsg: *const nn_rmsg) -> *const u8 {
1304    unsafe { rmsg.add(1) as *const u8 }
1305}
1306
1307fn nn_rmsg_payload_offset(rmsg: *const nn_rmsg, offset: usize) -> *const u8 {
1308    unsafe { nn_rmsg_payload(rmsg).add(offset) }
1309}
1310
1311/// A reader for a list of scatter gather buffers
1312struct SGReader<'a> {
1313    sc_list: Option<  &'a[&'a [u8]]>,
1314    //the current slice that is used
1315    slice_cursor: usize,
1316    //the current offset within the slice
1317    slice_offset: usize,
1318}
1319
1320impl<'a> SGReader<'a> {
1321    pub fn new(sc_list: &'a[&'a [u8]]) -> Self {
1322        SGReader {
1323            sc_list: Some(sc_list),
1324            slice_cursor: 0,
1325            slice_offset: 0,
1326        }
1327    }
1328}
1329
1330impl<'a> Read for SGReader<'a> {
1331    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1332        let read_buf_len = buf.len();
1333        if self.sc_list.is_some() {
1334            let source_slice = self.sc_list.as_ref().unwrap()[self.slice_cursor];
1335            let num_slices = self.sc_list.as_ref().unwrap().len();
1336            let source_slice_rem = source_slice.len() - self.slice_offset;
1337            let source_slice = &source_slice[self.slice_offset..];
1338
1339            let copy_length = std::cmp::min(source_slice_rem, read_buf_len);
1340
1341            //copy the bytes, lengths have to be the same
1342            buf[..copy_length].copy_from_slice(&source_slice[..copy_length]);
1343
1344            if copy_length == source_slice_rem {
1345                // we have completed this slice. move to the next
1346                self.slice_cursor += 1;
1347                self.slice_offset = 0;
1348
1349                if self.slice_cursor >= num_slices {
1350                    //no more slices, invalidate the sc_list
1351                    let _ = self.sc_list.take();
1352                }
1353            } else {
1354                // we have not completed the current slice, just bump up the slice offset
1355                self.slice_offset += copy_length;
1356            }
1357
1358            Ok(copy_length)
1359        } else {
1360            // No more data
1361            Ok(0)
1362        }
1363    }
1364}
1365
1366#[cfg(test)]
1367mod test {
1368    use super::*;
1369    use crate::{DdsListener, DdsParticipant, DdsQos, DdsTopic};
1370    use cdds_derive::Topic;
1371    use serde_derive::{Deserialize, Serialize};
1372    use std::ffi::CString;
1373
1374    #[test]
1375    fn scatter_gather() {
1376        let a = vec![1, 2, 3, 4, 5, 6];
1377        let b = vec![7, 8, 9, 10, 11];
1378        let c = vec![12, 13, 14, 15];
1379        let d = vec![16, 17, 18, 19, 20, 21];
1380
1381        let sla = unsafe { std::slice::from_raw_parts(a.as_ptr(), a.len()) };
1382        let slb = unsafe { std::slice::from_raw_parts(b.as_ptr(), b.len()) };
1383        let slc = unsafe { std::slice::from_raw_parts(c.as_ptr(), c.len()) };
1384        let sld = unsafe { std::slice::from_raw_parts(d.as_ptr(), d.len()) };
1385
1386        let sc_list = vec![sla, slb, slc, sld];
1387
1388        let mut reader = SGReader::new(&sc_list);
1389
1390        let mut buf = vec![0, 0, 0, 0, 0];
1391        if let Ok(n) = reader.read(&mut buf) {
1392            assert_eq!(&buf[..n], vec![1, 2, 3, 4, 5]);
1393        } else {
1394            panic!("should not panic");
1395        }
1396        if let Ok(n) = reader.read(&mut buf) {
1397            assert_eq!(&buf[..n], vec![6]);
1398        } else {
1399            panic!("should not panic");
1400        }
1401    }
1402
1403    #[test]
1404    fn keyhash_basic() {
1405        #[derive(Serialize, Deserialize, Topic, Default)]
1406        struct Foo {
1407            #[topic_key]
1408            id: i32,
1409            x: u32,
1410            y: u32,
1411        }
1412        let foo = Foo {
1413            id: 0x12345678,
1414            x: 10,
1415            y: 20,
1416        };
1417        let key_cdr = foo.key_cdr();
1418        assert_eq!(key_cdr, vec![0, 0, 0, 0, 0x12u8, 0x34u8, 0x56u8, 0x78u8]);
1419    }
1420    #[test]
1421    fn keyhash_simple() {
1422        #[derive(Serialize, Deserialize, Topic, Default)]
1423        struct Foo {
1424            #[topic_key]
1425            id: i32,
1426            x: u32,
1427            #[topic_key]
1428            s: String,
1429            y: u32,
1430        }
1431        let foo = Foo {
1432            id: 0x12345678,
1433            x: 10,
1434            s: String::from("boo"),
1435            y: 20,
1436        };
1437        let key_cdr = foo.key_cdr();
1438        assert_eq!(
1439            key_cdr,
1440            vec![0, 0, 0, 0, 18, 52, 86, 120, 0, 0, 0, 4, 98, 111, 111, 0]
1441        );
1442    }
1443
1444    #[test]
1445    fn keyhash_nested() {
1446        #[derive(Serialize, Deserialize, Topic, Default)]
1447        struct NestedFoo {
1448            name: String,
1449            val: u64,
1450            #[topic_key]
1451            instance: u32,
1452        }
1453
1454        assert_eq!(
1455            NestedFoo::typename(),
1456            CString::new("serdes::test::keyhash_nested::NestedFoo").unwrap()
1457        );
1458
1459        impl NestedFoo {
1460            fn new() -> Self {
1461                Self {
1462                    name: "my name".to_owned(),
1463                    val: 42,
1464                    instance: 25,
1465                }
1466            }
1467        }
1468
1469        #[derive(Serialize, Deserialize, Topic, Default)]
1470        struct Foo {
1471            #[topic_key]
1472            id: i32,
1473            x: u32,
1474            #[topic_key]
1475            s: String,
1476            y: u32,
1477            #[topic_key]
1478            inner: NestedFoo,
1479        }
1480        let foo = Foo {
1481            id: 0x12345678,
1482            x: 10,
1483            s: String::from("boo"),
1484            y: 20,
1485            inner: NestedFoo::new(),
1486        };
1487        let key_cdr = foo.key_cdr();
1488        assert_eq!(
1489            key_cdr,
1490            vec![0, 0, 0, 0, 18, 52, 86, 120, 0, 0, 0, 4, 98, 111, 111, 0, 0, 0, 0, 25]
1491        );
1492    }
1493
1494    #[test]
1495    fn primitive_array_as_key() {
1496        #[derive(Serialize, Deserialize, Topic, Default)]
1497        struct Foo {
1498            #[topic_key]
1499            a: [u8; 8],
1500            b: u32,
1501            c: String,
1502        }
1503
1504        let foo = Foo {
1505            a: [0, 0, 0, 0, 0, 0, 0, 0],
1506            b: 42,
1507            c: "foo".to_owned(),
1508        };
1509
1510        let key_cdr = foo.key_cdr();
1511        assert_eq!(key_cdr, vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
1512        assert_eq!(false, Foo::force_md5_keyhash());
1513    }
1514
1515    #[test]
1516    fn primitive_array_and_string_as_key() {
1517        #[derive(Serialize, Deserialize, Topic, Default)]
1518        struct Foo {
1519            #[topic_key]
1520            a: [u8; 8],
1521            b: u32,
1522            #[topic_key]
1523            c: String,
1524        }
1525
1526        let foo = Foo {
1527            a: [0, 0, 0, 0, 0, 0, 0, 0],
1528            b: 42,
1529            c: "foo".to_owned(),
1530        };
1531
1532        let key_cdr = foo.key_cdr();
1533        assert_eq!(
1534            key_cdr,
1535            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 102, 111, 111, 0]
1536        );
1537        assert_eq!(true, Foo::force_md5_keyhash());
1538    }
1539
1540    #[test]
1541    fn basic() {
1542        #[derive(Serialize, Deserialize, Topic, Default)]
1543        struct NestedFoo {
1544            name: String,
1545            val: u64,
1546            #[topic_key]
1547            instance: u32,
1548        }
1549
1550        impl NestedFoo {
1551            fn new() -> Self {
1552                Self {
1553                    name: "my name".to_owned(),
1554                    val: 42,
1555                    instance: 25,
1556                }
1557            }
1558        }
1559
1560        #[derive(Serialize, Deserialize, Topic, Default)]
1561        struct Foo {
1562            #[topic_key]
1563            id: i32,
1564            x: u32,
1565            #[topic_key]
1566            s: String,
1567            y: u32,
1568            #[topic_key]
1569            inner: NestedFoo,
1570        }
1571        let _foo = Foo {
1572            id: 0x12345678,
1573            x: 10,
1574            s: String::from("boo"),
1575            y: 20,
1576            inner: NestedFoo::new(),
1577        };
1578        let t = SerType::<Foo>::new();
1579        let mut t = SerType::into_sertype(t);
1580        let tt = &mut t as *mut *mut ddsi_sertype;
1581        unsafe {
1582            let p = dds_create_participant(0, std::ptr::null_mut(), std::ptr::null_mut());
1583            let topic_name = CString::new("topic_name").unwrap();
1584            let topic = dds_create_topic_sertype(
1585                p,
1586                topic_name.as_ptr(),
1587                tt,
1588                std::ptr::null_mut(),
1589                std::ptr::null_mut(),
1590                std::ptr::null_mut(),
1591            );
1592
1593            dds_delete(topic);
1594        }
1595    }
1596}