1use std::ffi::{CStr, CString};
2use std::fmt::{Debug, Error, Formatter};
3use std::io::Write;
4use std::slice;
5use std::sync::atomic::{fence, AtomicI32, AtomicI64, Ordering};
6
7use crate::utils::misc::{alloc_buffer_aligned, dealloc_buffer_aligned};
8use crate::utils::types::{Index, I32_SIZE, I64_SIZE};
9
10pub struct AlignedBuffer {
12 ptr: *mut u8,
13 len: Index,
14}
15
16impl AlignedBuffer {
17 pub fn with_capacity(len: Index) -> AlignedBuffer {
18 AlignedBuffer {
19 ptr: alloc_buffer_aligned(len),
20 len,
21 }
22 }
23
24 pub fn ptr(&self) -> *mut u8 {
25 self.ptr
26 }
27
28 pub fn len(&self) -> Index {
29 self.len
30 }
31
32 pub fn is_empty(&self) -> bool {
33 self.len == 0
34 }
35}
36
37impl Drop for AlignedBuffer {
38 fn drop(&mut self) {
39 unsafe {
40 dealloc_buffer_aligned(self.ptr, self.len);
41 }
42 }
43}
44
45#[derive(Copy, Clone)]
47pub struct AtomicBuffer {
48 pub(crate) ptr: *mut u8,
49 len: Index,
50}
51
52impl Debug for AtomicBuffer {
53 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
54 let mut slice = self.as_slice();
55 const TAKE_LIMIT: usize = 40;
56 let mut bytes_counter = 0;
57 loop {
58 write!(f, "{}: ", bytes_counter)?;
59 bytes_counter += TAKE_LIMIT;
60
61 let (head, tail) = slice.split_at(TAKE_LIMIT);
62 if tail.len() > TAKE_LIMIT {
63 writeln!(f, "{:?}", head)?;
64 slice = tail;
65 } else {
66 write!(f, "{:?}", tail)?;
67 break;
68 }
69 }
70 Ok(())
71 }
72}
73
74impl Write for AtomicBuffer {
75 fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
76 self.put_bytes(0, buf);
77 Ok(buf.len())
78 }
79
80 fn flush(&mut self) -> Result<(), std::io::Error> {
81 Ok(())
82 }
83}
84
85unsafe impl Send for AtomicBuffer {}
88unsafe impl Sync for AtomicBuffer {}
89
90impl AtomicBuffer {
93 pub fn from_aligned(aligned: &AlignedBuffer) -> AtomicBuffer {
94 AtomicBuffer {
95 ptr: aligned.ptr,
96 len: aligned.len as Index,
97 }
98 }
99
100 pub fn wrap(buffer: AtomicBuffer) -> Self {
101 AtomicBuffer {
102 ptr: buffer.ptr,
103 len: buffer.len as Index,
104 }
105 }
106
107 pub fn wrap_slice(slice: &mut [u8]) -> Self {
108 AtomicBuffer {
109 ptr: slice.as_mut_ptr(),
110 len: slice.len() as Index,
111 }
112 }
113
114 pub fn wrap_raw_slice(slice: *mut [u8]) -> Self {
115 AtomicBuffer {
116 ptr: slice as *mut _,
117 len: slice.len() as Index,
118 }
119 }
120
121 pub(crate) fn new(ptr: *mut u8, len: Index) -> AtomicBuffer {
123 AtomicBuffer { ptr, len }
124 }
125
126 #[inline]
127 unsafe fn at(&self, offset: Index) -> *mut u8 {
128 self.ptr.offset(offset as isize)
129 }
130
131 #[inline]
134 pub fn view(&self, offset: Index, len: Index) -> Self {
135 self.bounds_check(offset, len);
136
137 AtomicBuffer {
138 ptr: unsafe { self.at(offset) },
139 len,
140 }
141 }
142
143 pub const fn capacity(&self) -> Index {
144 self.len
145 }
146
147 #[inline]
148 pub fn bounds_check(&self, idx: Index, len: Index) {
149 assert!((idx + len) <= self.len)
150 }
151
152 #[inline]
153 pub fn get<T: Copy>(&self, position: Index) -> T {
154 self.bounds_check(position, std::mem::size_of::<T>() as Index);
155 unsafe { (self.at(position) as *mut T).read_unaligned() }
156 }
157
158 #[inline]
159 pub fn overlay_struct<T>(&self, position: Index) -> *mut T {
160 self.bounds_check(position, std::mem::size_of::<T>() as Index);
161 unsafe { self.at(position) as *mut T }
162 }
163
164 #[inline]
165 pub fn as_ref<T: Copy>(&self, position: Index) -> &T {
166 self.bounds_check(position, std::mem::size_of::<T>() as Index);
167 unsafe { &*(self.at(position) as *const T) }
168 }
169
170 #[inline]
171 pub fn buffer(&self) -> *mut u8 {
172 self.ptr
173 }
174
175 #[inline]
176 pub fn set_memory(&self, position: Index, len: Index, value: u8) {
177 self.bounds_check(position, len);
178 let s = unsafe { slice::from_raw_parts_mut(self.ptr.offset(position as isize), len as usize) };
179
180 for i in s {
182 *i = value
183 }
184 }
185
186 #[inline]
187 pub fn get_volatile<T: Copy>(&self, position: Index) -> T {
188 self.bounds_check(position, std::mem::size_of::<T>() as Index);
189 let read = self.get(position);
190 fence(Ordering::Acquire);
191 read
192 }
193
194 #[inline]
195 pub fn put_ordered<T>(&self, position: Index, val: T) {
196 self.bounds_check(position, std::mem::size_of::<T>() as Index);
197 fence(Ordering::Release);
198 self.put(position, val);
199 }
200
201 #[inline]
202 pub fn put<T>(&self, position: Index, val: T) {
203 self.bounds_check(position, std::mem::size_of::<T>() as Index);
204 unsafe { (self.at(position) as *mut T).write_unaligned(val) }
205 }
206
207 #[inline]
208 #[allow(clippy::cast_ptr_alignment)]
209 pub fn put_atomic_i64(&self, offset: Index, val: i64) {
210 self.bounds_check(offset, I64_SIZE);
211 unsafe {
212 let atomic_ptr = self.at(offset) as *const AtomicI64;
213 (*atomic_ptr).store(val, Ordering::SeqCst);
214 }
215 }
216
217 #[inline]
218 #[allow(clippy::cast_ptr_alignment)]
219 pub fn compare_and_set_i32(&self, position: Index, expected: i32, update: i32) -> bool {
220 self.bounds_check(position, I32_SIZE);
221 unsafe {
222 let ptr = self.at(position) as *const AtomicI32;
223 (*ptr)
224 .compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
225 .is_ok()
226 }
227 }
228
229 #[inline]
230 #[allow(clippy::cast_ptr_alignment)]
231 pub fn compare_and_set_i64(&self, position: Index, expected: i64, update: i64) -> bool {
232 self.bounds_check(position, I64_SIZE);
233 unsafe {
234 let ptr = self.at(position) as *const AtomicI64;
235 (*ptr)
236 .compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
237 .is_ok()
238 }
239 }
240
241 pub fn add_i64_ordered(&self, offset: Index, delta: i64) {
248 self.bounds_check(offset, I64_SIZE);
249
250 let value = self.get::<i64>(offset);
251 self.put_ordered::<i64>(offset, value + delta);
252 }
253
254 #[inline]
256 pub fn put_bytes(&self, offset: Index, src: &[u8]) {
257 self.bounds_check(offset, src.len() as Index);
258
259 unsafe {
260 let ptr = self.ptr.offset(offset as isize);
261 std::ptr::copy(src.as_ptr(), ptr, src.len());
262 }
263 }
264
265 #[inline]
266 pub fn get_bytes<T>(&self, offset: Index, dest: &mut T) {
267 let length = std::mem::size_of::<T>();
268 self.bounds_check(offset, length as Index);
269
270 unsafe {
271 let ptr = self.at(offset);
272 std::ptr::copy(ptr, dest as *mut T as *mut _, length);
273 }
274 }
275
276 #[inline]
282 pub fn copy_from(&self, offset: Index, src_buffer: &AtomicBuffer, src_offset: Index, length: Index) {
283 self.bounds_check(offset, length);
284 src_buffer.bounds_check(src_offset, length);
285 unsafe {
286 let src_ptr = src_buffer.at(src_offset);
287 let dest_ptr = self.at(offset);
288 std::ptr::copy_nonoverlapping(src_ptr, dest_ptr, length as usize);
290 }
291 }
292
293 pub fn as_mutable_slice(&mut self) -> &mut [u8] {
294 unsafe { slice::from_raw_parts_mut(self.ptr, self.len as usize) }
295 }
296
297 pub fn as_slice(&self) -> &[u8] {
298 unsafe { slice::from_raw_parts(self.ptr, self.len as usize) }
299 }
300
301 pub fn as_sub_slice(&self, index: Index, len: Index) -> &[u8] {
302 self.bounds_check(index, len);
303 unsafe { slice::from_raw_parts(self.at(index), len as usize) }
304 }
305
306 #[inline]
307 pub fn get_string(&self, offset: Index) -> CString {
308 self.bounds_check(offset, 4);
309
310 let length: i32 = self.get::<i32>(offset);
312 self.get_string_without_length(offset + I32_SIZE, length)
313 }
314
315 #[inline]
316 pub fn get_string_without_length(&self, offset: Index, length: Index) -> CString {
317 self.bounds_check(offset, length);
318
319 unsafe {
320 let str_slice = std::slice::from_raw_parts(self.at(offset) as *const u8, length as usize);
322 let mut zero_terminated: Vec<u8> = Vec::with_capacity(length as usize + 1);
323 zero_terminated.extend_from_slice(str_slice);
324 zero_terminated.push(0);
325
326 CString::from(CStr::from_bytes_with_nul_unchecked(&zero_terminated))
327 }
328 }
329
330 #[inline]
331 pub fn get_string_length(&self, offset: Index) -> Index {
332 self.bounds_check(offset, 4);
333
334 self.get::<i32>(offset) as Index
335 }
336
337 #[inline]
339 pub fn put_string(&self, offset: Index, string: &[u8]) {
340 self.bounds_check(offset, string.len() as Index + I32_SIZE);
341
342 self.put::<i32>(offset, string.len() as i32);
344
345 self.put_bytes(offset + I32_SIZE, string);
346 }
347
348 #[inline]
349 pub fn put_string_without_length(&self, offset: Index, string: &[u8]) -> Index {
350 self.bounds_check(offset, string.len() as Index);
351
352 self.put_bytes(offset + I32_SIZE, string);
353
354 string.len() as Index
355 }
356
357 #[allow(clippy::cast_ptr_alignment)]
365 pub fn get_and_add_i64(&self, offset: Index, delta: i64) -> i64 {
366 self.bounds_check(offset, I64_SIZE);
367 unsafe {
368 let atomic_ptr = self.at(offset) as *const AtomicI64;
369 (*atomic_ptr).fetch_add(delta, Ordering::SeqCst)
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use std::io::Write;
377
378 use crate::concurrent::atomic_buffer::{AlignedBuffer, AtomicBuffer};
379 use crate::utils::types::Index;
380
381 #[test]
382 fn atomic_buffer_can_be_created() {
383 let capacity = 1024 << 2;
384 let mut data = Vec::with_capacity(capacity);
385 let _buffer = AtomicBuffer::new(data.as_mut_ptr(), capacity as Index);
386 }
387
388 #[test]
389 fn atomic_buffer_aligned_buffer_create() {
390 let src = AlignedBuffer::with_capacity(16);
391 let atomic_buffer = AtomicBuffer::from_aligned(&src);
392
393 assert_eq!(atomic_buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,])
395 }
396
397 #[test]
398 fn atomic_buffer_write_read() {
399 let src = AlignedBuffer::with_capacity(1024 << 2);
400 let buffer = AtomicBuffer::from_aligned(&src);
401 let to_write = 1;
402 buffer.put(0, to_write);
403 let read: i32 = buffer.get(0);
404
405 assert_eq!(read, to_write)
406 }
407
408 #[test]
409 fn atomic_buffer_preserves_from_aligned() {
410 let buffer = AlignedBuffer::with_capacity(8);
411 let _atomic_buffer = AtomicBuffer::from_aligned(&buffer);
412 }
414
415 #[test]
416 fn atomic_buffer_put_bytes() {
417 let mut data: Vec<u8> = (0u8..=7).collect();
418 assert_eq!(data.len(), 8);
419
420 let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
421
422 buffer.put_bytes(4, &[0, 1, 2, 3]);
423
424 assert_eq!(buffer.as_slice(), &[0, 1, 2, 3, 0, 1, 2, 3])
425 }
426
427 #[test]
428 fn atomic_buffer_put_bytes_with_write_trait() {
429 let mut data: Vec<u8> = (0u8..=7).collect();
430 assert_eq!(data.len(), 8);
431
432 let mut buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
433
434 buffer.write_all(&[4, 5, 6, 7]).unwrap();
435
436 assert_eq!(buffer.as_slice(), &[4, 5, 6, 7, 4, 5, 6, 7]);
437 }
438
439 #[test]
440 fn atomic_buffer_get_as_slice() {
441 let mut data: Vec<u8> = (0u8..=7).collect();
442 assert_eq!(data.len(), 8);
443
444 let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
445 let sub_slice = buffer.as_slice();
446
447 assert_eq!(sub_slice, &[0, 1, 2, 3, 4, 5, 6, 7])
448 }
449
450 #[test]
451 fn atomic_buffer_get_as_mut_slice() {
452 let mut data: Vec<u8> = (0u8..=7).collect();
453 assert_eq!(data.len(), 8);
454
455 let mut buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
456 let sub_slice = buffer.as_mutable_slice();
457
458 assert_eq!(sub_slice, &[0, 1, 2, 3, 4, 5, 6, 7])
459 }
460
461 #[test]
462 fn atomic_buffer_get_sub_slice() {
463 let mut data: Vec<u8> = (0u8..=7).collect();
464 assert_eq!(data.len(), 8);
465
466 let buffer = AtomicBuffer::new(data.as_mut_ptr(), 8);
467 let sub_slice = buffer.as_sub_slice(3, 2);
468
469 assert_eq!(sub_slice, &[3, 4])
470 }
471
472 #[test]
473 #[should_panic]
474 fn atomic_buffer_get_sub_slice_out_of_bounds() {
475 let mut data: Vec<u8> = (0u8..=7).collect();
476 assert_eq!(data.len(), 8);
477
478 let x = AtomicBuffer::new(data.as_mut_ptr(), 8);
479 let _sub_slice = x.as_sub_slice(7, 2);
480 }
481
482 #[test]
483 fn atomic_buffer_put_and_get_string() {
484 let src = AlignedBuffer::with_capacity(16);
485 let atomic_buffer = AtomicBuffer::from_aligned(&src);
486
487 let test_string = [1, 2, 3, 4, 5, 6, 7, 8, 9]; atomic_buffer.put_string(2, &test_string);
490 let read_str = atomic_buffer.get_string(2); assert_eq!(read_str.as_bytes().len(), 9);
493 assert_eq!(read_str.as_bytes(), test_string); }
495}