aeron_rs/concurrent/
atomic_buffer.rs

1use std::ffi::{CStr, CString};
2use std::fmt::{Debug, Error, Formatter};
3use std::io::Write;
4use std::slice;
5use std::sync::atomic::{fence, AtomicI32, AtomicI64, Ordering};
6
7use crate::utils::misc::{alloc_buffer_aligned, dealloc_buffer_aligned};
8use crate::utils::types::{Index, I32_SIZE, I64_SIZE};
9
10/// Buffer allocated on cache-aligned memory boundaries. This struct owns the memory it is pointing to
11pub struct AlignedBuffer {
12    ptr: *mut u8,
13    len: Index,
14}
15
16impl AlignedBuffer {
17    pub fn with_capacity(len: Index) -> AlignedBuffer {
18        AlignedBuffer {
19            ptr: alloc_buffer_aligned(len),
20            len,
21        }
22    }
23
24    pub fn ptr(&self) -> *mut u8 {
25        self.ptr
26    }
27
28    pub fn len(&self) -> Index {
29        self.len
30    }
31
32    pub fn is_empty(&self) -> bool {
33        self.len == 0
34    }
35}
36
37impl Drop for AlignedBuffer {
38    fn drop(&mut self) {
39        unsafe {
40            dealloc_buffer_aligned(self.ptr, self.len);
41        }
42    }
43}
44
45/// Wraps but does not own a region of shared memory
46#[derive(Copy, Clone)]
47pub struct AtomicBuffer {
48    pub(crate) ptr: *mut u8,
49    len: Index,
50}
51
52impl Debug for AtomicBuffer {
53    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
54        let mut slice = self.as_slice();
55        const TAKE_LIMIT: usize = 40;
56        let mut bytes_counter = 0;
57        loop {
58            write!(f, "{}: ", bytes_counter)?;
59            bytes_counter += TAKE_LIMIT;
60
61            let (head, tail) = slice.split_at(TAKE_LIMIT);
62            if tail.len() > TAKE_LIMIT {
63                writeln!(f, "{:?}", head)?;
64                slice = tail;
65            } else {
66                write!(f, "{:?}", tail)?;
67                break;
68            }
69        }
70        Ok(())
71    }
72}
73
74impl Write for AtomicBuffer {
75    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
76        self.put_bytes(0, buf);
77        Ok(buf.len())
78    }
79
80    fn flush(&mut self) -> Result<(), std::io::Error> {
81        Ok(())
82    }
83}
84
85// Where needed AtomicBuffer is accessed through mem fences or atomic operations.
86// Seems its safe to share AtomicBuffer between threads.
87unsafe impl Send for AtomicBuffer {}
88unsafe impl Sync for AtomicBuffer {}
89
90// todo: add bounds checks!!!
91// todo: remove unsafe?
92impl AtomicBuffer {
93    pub fn from_aligned(aligned: &AlignedBuffer) -> AtomicBuffer {
94        AtomicBuffer {
95            ptr: aligned.ptr,
96            len: aligned.len as Index,
97        }
98    }
99
100    pub fn wrap(buffer: AtomicBuffer) -> Self {
101        AtomicBuffer {
102            ptr: buffer.ptr,
103            len: buffer.len as Index,
104        }
105    }
106
107    pub fn wrap_slice(slice: &mut [u8]) -> Self {
108        AtomicBuffer {
109            ptr: slice.as_mut_ptr(),
110            len: slice.len() as Index,
111        }
112    }
113
114    pub fn wrap_raw_slice(slice: *mut [u8]) -> Self {
115        AtomicBuffer {
116            ptr: slice as *mut _,
117            len: slice.len() as Index,
118        }
119    }
120
121    //TODO: check that len is ok and ptr is aligned
122    pub(crate) fn new(ptr: *mut u8, len: Index) -> AtomicBuffer {
123        AtomicBuffer { ptr, len }
124    }
125
126    #[inline]
127    unsafe fn at(&self, offset: Index) -> *mut u8 {
128        self.ptr.offset(offset as isize)
129    }
130
131    /// Create a view on the contents of the buffer starting from offset and spanning len bytes.
132    /// Sets length of the "view" buffer to "len"
133    #[inline]
134    pub fn view(&self, offset: Index, len: Index) -> Self {
135        self.bounds_check(offset, len);
136
137        AtomicBuffer {
138            ptr: unsafe { self.at(offset) },
139            len,
140        }
141    }
142
143    pub const fn capacity(&self) -> Index {
144        self.len
145    }
146
147    #[inline]
148    pub fn bounds_check(&self, idx: Index, len: Index) {
149        assert!((idx + len) <= self.len)
150    }
151
152    #[inline]
153    pub fn get<T: Copy>(&self, position: Index) -> T {
154        self.bounds_check(position, std::mem::size_of::<T>() as Index);
155        unsafe { (self.at(position) as *mut T).read_unaligned() }
156    }
157
158    #[inline]
159    pub fn overlay_struct<T>(&self, position: Index) -> *mut T {
160        self.bounds_check(position, std::mem::size_of::<T>() as Index);
161        unsafe { self.at(position) as *mut T }
162    }
163
164    #[inline]
165    pub fn as_ref<T: Copy>(&self, position: Index) -> &T {
166        self.bounds_check(position, std::mem::size_of::<T>() as Index);
167        unsafe { &*(self.at(position) as *const T) }
168    }
169
170    #[inline]
171    pub fn buffer(&self) -> *mut u8 {
172        self.ptr
173    }
174
175    #[inline]
176    pub fn set_memory(&self, position: Index, len: Index, value: u8) {
177        self.bounds_check(position, len);
178        let s = unsafe { slice::from_raw_parts_mut(self.ptr.offset(position as isize), len as usize) };
179
180        // poor man's memcp
181        for i in s {
182            *i = value
183        }
184    }
185
186    #[inline]
187    pub fn get_volatile<T: Copy>(&self, position: Index) -> T {
188        self.bounds_check(position, std::mem::size_of::<T>() as Index);
189        let read = self.get(position);
190        fence(Ordering::Acquire);
191        read
192    }
193
194    #[inline]
195    pub fn put_ordered<T>(&self, position: Index, val: T) {
196        self.bounds_check(position, std::mem::size_of::<T>() as Index);
197        fence(Ordering::Release);
198        self.put(position, val);
199    }
200
201    #[inline]
202    pub fn put<T>(&self, position: Index, val: T) {
203        self.bounds_check(position, std::mem::size_of::<T>() as Index);
204        unsafe { (self.at(position) as *mut T).write_unaligned(val) }
205    }
206
207    #[inline]
208    #[allow(clippy::cast_ptr_alignment)]
209    pub fn put_atomic_i64(&self, offset: Index, val: i64) {
210        self.bounds_check(offset, I64_SIZE);
211        unsafe {
212            let atomic_ptr = self.at(offset) as *const AtomicI64;
213            (*atomic_ptr).store(val, Ordering::SeqCst);
214        }
215    }
216
217    #[inline]
218    #[allow(clippy::cast_ptr_alignment)]
219    pub fn compare_and_set_i32(&self, position: Index, expected: i32, update: i32) -> bool {
220        self.bounds_check(position, I32_SIZE);
221        unsafe {
222            let ptr = self.at(position) as *const AtomicI32;
223            (*ptr)
224                .compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
225                .is_ok()
226        }
227    }
228
229    #[inline]
230    #[allow(clippy::cast_ptr_alignment)]
231    pub fn compare_and_set_i64(&self, position: Index, expected: i64, update: i64) -> bool {
232        self.bounds_check(position, I64_SIZE);
233        unsafe {
234            let ptr = self.at(position) as *const AtomicI64;
235            (*ptr)
236                .compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
237                .is_ok()
238        }
239    }
240
241    /**
242     * Single threaded increment with release semantics.
243     *
244     * @param offset in the buffer of the word.
245     * @param delta  for to be applied to the value.
246     */
247    pub fn add_i64_ordered(&self, offset: Index, delta: i64) {
248        self.bounds_check(offset, I64_SIZE);
249
250        let value = self.get::<i64>(offset);
251        self.put_ordered::<i64>(offset, value + delta);
252    }
253
254    /// Put bytes in to this buffer at specified offset
255    #[inline]
256    pub fn put_bytes(&self, offset: Index, src: &[u8]) {
257        self.bounds_check(offset, src.len() as Index);
258
259        unsafe {
260            let ptr = self.ptr.offset(offset as isize);
261            std::ptr::copy(src.as_ptr(), ptr, src.len());
262        }
263    }
264
265    #[inline]
266    pub fn get_bytes<T>(&self, offset: Index, dest: &mut T) {
267        let length = std::mem::size_of::<T>();
268        self.bounds_check(offset, length as Index);
269
270        unsafe {
271            let ptr = self.at(offset);
272            std::ptr::copy(ptr, dest as *mut T as *mut _, length);
273        }
274    }
275
276    /// Copy "length" bytes from "src_buffer" starting from "src_offset" in to this buffer at given "offset"
277    /// offset - offset in current (self) buffer to start coping from
278    /// src_buffer - atomic buffer to copy data from
279    /// src_offset - offset in src_buffer to start coping from
280    /// length - number of bytes to copy
281    #[inline]
282    pub fn copy_from(&self, offset: Index, src_buffer: &AtomicBuffer, src_offset: Index, length: Index) {
283        self.bounds_check(offset, length);
284        src_buffer.bounds_check(src_offset, length);
285        unsafe {
286            let src_ptr = src_buffer.at(src_offset);
287            let dest_ptr = self.at(offset);
288            // TODO: check that memory regions are actually not overlapping, otherwise UB!
289            std::ptr::copy_nonoverlapping(src_ptr, dest_ptr, length as usize);
290        }
291    }
292
293    pub fn as_mutable_slice(&mut self) -> &mut [u8] {
294        unsafe { slice::from_raw_parts_mut(self.ptr, self.len as usize) }
295    }
296
297    pub fn as_slice(&self) -> &[u8] {
298        unsafe { slice::from_raw_parts(self.ptr, self.len as usize) }
299    }
300
301    pub fn as_sub_slice(&self, index: Index, len: Index) -> &[u8] {
302        self.bounds_check(index, len);
303        unsafe { slice::from_raw_parts(self.at(index), len as usize) }
304    }
305
306    #[inline]
307    pub fn get_string(&self, offset: Index) -> CString {
308        self.bounds_check(offset, 4);
309
310        // String in Aeron has first 4 bytes as length and rest "length" bytes is string body in ASCII
311        let length: i32 = self.get::<i32>(offset);
312        self.get_string_without_length(offset + I32_SIZE, length)
313    }
314
315    #[inline]
316    pub fn get_string_without_length(&self, offset: Index, length: Index) -> CString {
317        self.bounds_check(offset, length);
318
319        unsafe {
320            // NOTE: we need to add trailing zero after the "length" bytes read from the buffer
321            let str_slice = std::slice::from_raw_parts(self.at(offset) as *const u8, length as usize);
322            let mut zero_terminated: Vec<u8> = Vec::with_capacity(length as usize + 1);
323            zero_terminated.extend_from_slice(str_slice);
324            zero_terminated.push(0);
325
326            CString::from(CStr::from_bytes_with_nul_unchecked(&zero_terminated))
327        }
328    }
329
330    #[inline]
331    pub fn get_string_length(&self, offset: Index) -> Index {
332        self.bounds_check(offset, 4);
333
334        self.get::<i32>(offset) as Index
335    }
336
337    /// This function expects ASCII string WITHOUT trailing zero as its input.
338    #[inline]
339    pub fn put_string(&self, offset: Index, string: &[u8]) {
340        self.bounds_check(offset, string.len() as Index + I32_SIZE);
341
342        // String in Aeron has first 4 bytes as length and rest "length" bytes is string body
343        self.put::<i32>(offset, string.len() as i32);
344
345        self.put_bytes(offset + I32_SIZE, string);
346    }
347
348    #[inline]
349    pub fn put_string_without_length(&self, offset: Index, string: &[u8]) -> Index {
350        self.bounds_check(offset, string.len() as Index);
351
352        self.put_bytes(offset + I32_SIZE, string);
353
354        string.len() as Index
355    }
356
357    /**
358     * Multi threaded increment.
359     *
360     * @param offset in the buffer of the word.
361     * @param delta  for to be applied to the value.
362     * @return the value before applying the delta.
363     */
364    #[allow(clippy::cast_ptr_alignment)]
365    pub fn get_and_add_i64(&self, offset: Index, delta: i64) -> i64 {
366        self.bounds_check(offset, I64_SIZE);
367        unsafe {
368            let atomic_ptr = self.at(offset) as *const AtomicI64;
369            (*atomic_ptr).fetch_add(delta, Ordering::SeqCst)
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use std::io::Write;
377
378    use crate::concurrent::atomic_buffer::{AlignedBuffer, AtomicBuffer};
379    use crate::utils::types::Index;
380
381    #[test]
382    fn atomic_buffer_can_be_created() {
383        let capacity = 1024 << 2;
384        let mut data = Vec::with_capacity(capacity);
385        let _buffer = AtomicBuffer::new(data.as_mut_ptr(), capacity as Index);
386    }
387
388    #[test]
389    fn atomic_buffer_aligned_buffer_create() {
390        let src = AlignedBuffer::with_capacity(16);
391        let atomic_buffer = AtomicBuffer::from_aligned(&src);
392
393        //assert zeroed
394        assert_eq!(atomic_buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,])
395    }
396
397    #[test]
398    fn atomic_buffer_write_read() {
399        let src = AlignedBuffer::with_capacity(1024 << 2);
400        let buffer = AtomicBuffer::from_aligned(&src);
401        let to_write = 1;
402        buffer.put(0, to_write);
403        let read: i32 = buffer.get(0);
404
405        assert_eq!(read, to_write)
406    }
407
408    #[test]
409    fn atomic_buffer_preserves_from_aligned() {
410        let buffer = AlignedBuffer::with_capacity(8);
411        let _atomic_buffer = AtomicBuffer::from_aligned(&buffer);
412        // TODO: assert_eq!(atomic_buffer.as_slice(), buffer.)
413    }
414
415    #[test]
416    fn atomic_buffer_put_bytes() {
417        let mut data: Vec<u8> = (0u8..=7).collect();
418        assert_eq!(data.len(), 8);
419
420        let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
421
422        buffer.put_bytes(4, &[0, 1, 2, 3]);
423
424        assert_eq!(buffer.as_slice(), &[0, 1, 2, 3, 0, 1, 2, 3])
425    }
426
427    #[test]
428    fn atomic_buffer_put_bytes_with_write_trait() {
429        let mut data: Vec<u8> = (0u8..=7).collect();
430        assert_eq!(data.len(), 8);
431
432        let mut buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
433
434        buffer.write_all(&[4, 5, 6, 7]).unwrap();
435
436        assert_eq!(buffer.as_slice(), &[4, 5, 6, 7, 4, 5, 6, 7]);
437    }
438
439    #[test]
440    fn atomic_buffer_get_as_slice() {
441        let mut data: Vec<u8> = (0u8..=7).collect();
442        assert_eq!(data.len(), 8);
443
444        let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
445        let sub_slice = buffer.as_slice();
446
447        assert_eq!(sub_slice, &[0, 1, 2, 3, 4, 5, 6, 7])
448    }
449
450    #[test]
451    fn atomic_buffer_get_as_mut_slice() {
452        let mut data: Vec<u8> = (0u8..=7).collect();
453        assert_eq!(data.len(), 8);
454
455        let mut buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
456        let sub_slice = buffer.as_mutable_slice();
457
458        assert_eq!(sub_slice, &[0, 1, 2, 3, 4, 5, 6, 7])
459    }
460
461    #[test]
462    fn atomic_buffer_get_sub_slice() {
463        let mut data: Vec<u8> = (0u8..=7).collect();
464        assert_eq!(data.len(), 8);
465
466        let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
467        let sub_slice = buffer.as_sub_slice(3, 2);
468
469        assert_eq!(sub_slice, &[3, 4])
470    }
471
472    #[test]
473    #[should_panic]
474    fn atomic_buffer_get_sub_slice_out_of_bounds() {
475        let mut data: Vec<u8> = (0u8..=7).collect();
476        assert_eq!(data.len(), 8);
477
478        let x = AtomicBuffer::new(data.as_mut_ptr(), 8);
479        let _sub_slice = x.as_sub_slice(7, 2);
480    }
481
482    #[test]
483    fn atomic_buffer_put_and_get_string() {
484        let src = AlignedBuffer::with_capacity(16);
485        let atomic_buffer = AtomicBuffer::from_aligned(&src);
486
487        let test_string = [1, 2, 3, 4, 5, 6, 7, 8, 9]; // without trailing zero
488
489        atomic_buffer.put_string(2, &test_string);
490        let read_str = atomic_buffer.get_string(2); // trailing zero added here while reading from AB
491
492        assert_eq!(read_str.as_bytes().len(), 9);
493        assert_eq!(read_str.as_bytes(), test_string); // as_bytes() returns string body without trailing zero
494    }
495}