roundabout/message/
vec.rs

1use crate::message::buffer::{MessageBuffer, MessageHeader};
2use crate::message::MessageRegistry;
3use std::mem::ManuallyDrop;
4use std::ops::Deref;
5
6pub struct MessageVec {
7    // Optimization: clone vs arc vs reference
8    registry: MessageRegistry,
9    buffer: MessageBuffer,
10    len: usize,
11}
12
13impl MessageVec {
14    #[inline]
15    pub fn new(registry: MessageRegistry) -> Self {
16        unsafe {
17            let buffer = MessageBuffer::new(registry.message_size().inner());
18            Self {
19                registry,
20                buffer,
21                len: 0,
22            }
23        }
24    }
25
26    #[inline]
27    pub fn with_capacity(registry: MessageRegistry, cap: usize) -> Self {
28        unsafe {
29            let buffer = MessageBuffer::with_capacity(registry.message_size().inner(), cap);
30            Self {
31                registry,
32                buffer,
33                len: 0,
34            }
35        }
36    }
37
38    #[inline]
39    pub fn is_empty(&self) -> bool {
40        self.len == 0
41    }
42
43    #[inline]
44    pub fn len(&self) -> usize {
45        self.len
46    }
47
48    #[inline]
49    pub fn iter(&self) -> MessageVecIter {
50        MessageVecIter { vec: self, i: 0 }
51    }
52
53    #[inline]
54    pub fn push<T: 'static + Send + Sync>(&mut self, message: T) -> bool {
55        unsafe {
56            // Optimization: static resolution of e_idx
57            match self.registry.get_index_of::<T>() {
58                Some(e_idx) => {
59                    let message = ManuallyDrop::new(message);
60                    let data = message.deref() as *const T as *const u8;
61                    let drop_fn: Option<fn(*mut u8)> = if std::mem::needs_drop::<T>() {
62                        Some(|ptr| (ptr as *mut T).drop_in_place())
63                    } else {
64                        None
65                    };
66
67                    self.push_untyped(e_idx, data, std::mem::size_of::<T>(), drop_fn);
68                    true
69                }
70                None => {
71                    log::debug!(
72                        "skipping storing of unhandled message type: {}",
73                        std::any::type_name::<T>()
74                    );
75                    false
76                }
77            }
78        }
79    }
80
81    #[inline]
82    pub fn extend<I: IntoIterator<Item = T>, T: 'static + Send + Sync>(
83        &mut self,
84        messages: I,
85    ) -> bool {
86        unsafe {
87            match self.registry.get_index_of::<T>() {
88                Some(e_idx) => {
89                    for message in messages.into_iter() {
90                        let message: ManuallyDrop<T> = ManuallyDrop::new(message);
91                        let data = message.deref() as *const T as *const u8;
92                        let drop_fn: Option<fn(*mut u8)> = if std::mem::needs_drop::<T>() {
93                            Some(|ptr| (ptr as *mut T).drop_in_place())
94                        } else {
95                            None
96                        };
97
98                        self.push_untyped(e_idx, data, std::mem::size_of::<T>(), drop_fn);
99                    }
100
101                    true
102                }
103                None => {
104                    log::debug!(
105                        "skipping storing of unhandled message type: {}",
106                        std::any::type_name::<T>()
107                    );
108                    false
109                }
110            }
111        }
112    }
113
114    #[inline]
115    pub fn extend_vec(&mut self, other: &mut Self) {
116        unsafe {
117            assert_eq!(self.registry, other.registry);
118            self.extend_vec_unchecked(other);
119        }
120    }
121
122    #[inline]
123    pub unsafe fn extend_vec_unchecked(&mut self, other: &mut Self) {
124        debug_assert_eq!(self.registry, other.registry);
125        if other.is_empty() {
126            return;
127        }
128
129        let remaining = self.buffer.cap() - self.len;
130        if remaining < other.len {
131            self.buffer.grow(other.len - remaining);
132        }
133
134        let headers = other.buffer.get_header(0);
135        let messages = other.buffer.get_message(0);
136        self.buffer
137            .copy_nonoverlapping_all(self.len, headers, messages, other.len);
138
139        self.len += other.len;
140        other.len = 0;
141    }
142
143    pub(crate) fn get_registry(&self) -> &MessageRegistry {
144        &self.registry
145    }
146
147    pub(crate) unsafe fn get_buffer(&self) -> &MessageBuffer {
148        &self.buffer
149    }
150
151    pub(crate) unsafe fn set_len(&mut self, len: usize) {
152        self.len = len;
153    }
154
155    pub(crate) unsafe fn push_untyped(
156        &mut self,
157        e_idx: usize,
158        data: *const u8,
159        data_size: usize,
160        drop_fn: Option<fn(*mut u8)>,
161    ) {
162        if self.buffer.cap() - self.len == 0 {
163            self.buffer.grow(1);
164        }
165
166        let header = MessageHeader { e_idx, drop_fn };
167        self.buffer
168            .copy_nonoverlapping(self.len, header, data, data_size);
169        self.len += 1;
170    }
171}
172
173impl Drop for MessageVec {
174    fn drop(&mut self) {
175        unsafe {
176            if self.buffer.cap() == 0 {
177                return;
178            }
179
180            for i in 0..self.len {
181                self.buffer.drop_message(i);
182            }
183
184            self.buffer.dealloc();
185        }
186    }
187}
188
189unsafe impl Send for MessageVec {}
190unsafe impl Sync for MessageVec {}
191
192pub struct MessageVecIter<'a> {
193    vec: &'a MessageVec,
194    i: usize,
195}
196
197impl<'a> Iterator for MessageVecIter<'a> {
198    type Item = MessageVecView<'a>;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        unsafe {
202            if self.i >= self.vec.len {
203                return None;
204            }
205
206            let header = &*self.vec.buffer.get_header(self.i);
207            let data = self.vec.buffer.get_message(self.i);
208            self.i += 1;
209            Some(MessageVecView { header, data })
210        }
211    }
212}
213
214pub struct MessageVecView<'a> {
215    header: &'a MessageHeader,
216    data: *const u8,
217}
218
219impl<'a> MessageVecView<'a> {
220    #[inline]
221    pub fn message_idx(&self) -> usize {
222        self.header.e_idx
223    }
224
225    #[inline]
226    pub unsafe fn data(&self) -> *const u8 {
227        self.data
228    }
229}