1#![allow(unsafe_code)]
11
12use crate::error::{OxiGdalError, Result};
13use std::ops::Deref;
14use std::ptr::NonNull;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18#[derive(Debug, Clone)]
20pub struct ZeroCopyConfig {
21 pub use_pinned_memory: bool,
23 pub enable_cow: bool,
25 pub alignment: usize,
27 pub track_stats: bool,
29}
30
31impl Default for ZeroCopyConfig {
32 fn default() -> Self {
33 Self {
34 use_pinned_memory: false,
35 enable_cow: true,
36 alignment: 64,
37 track_stats: true,
38 }
39 }
40}
41
42impl ZeroCopyConfig {
43 #[must_use]
45 pub fn new() -> Self {
46 Self::default()
47 }
48
49 #[must_use]
51 pub fn with_pinned_memory(mut self, enable: bool) -> Self {
52 self.use_pinned_memory = enable;
53 self
54 }
55
56 #[must_use]
58 pub fn with_cow(mut self, enable: bool) -> Self {
59 self.enable_cow = enable;
60 self
61 }
62
63 #[must_use]
65 pub fn with_alignment(mut self, alignment: usize) -> Self {
66 self.alignment = alignment;
67 self
68 }
69
70 #[must_use]
72 pub fn with_stats(mut self, enable: bool) -> Self {
73 self.track_stats = enable;
74 self
75 }
76}
77
78pub struct SharedBuffer {
80 ptr: NonNull<u8>,
82 len: usize,
84 capacity: usize,
86 ref_count: Arc<AtomicUsize>,
88 is_pinned: bool,
90 config: ZeroCopyConfig,
92}
93
94impl SharedBuffer {
95 pub fn new(size: usize) -> Result<Self> {
97 Self::with_config(size, ZeroCopyConfig::default())
98 }
99
100 pub fn with_config(size: usize, config: ZeroCopyConfig) -> Result<Self> {
102 if size == 0 {
103 return Err(OxiGdalError::invalid_parameter(
104 "parameter",
105 "Buffer size must be non-zero".to_string(),
106 ));
107 }
108
109 let layout = std::alloc::Layout::from_size_align(size, config.alignment)
110 .map_err(|e| OxiGdalError::allocation_error(e.to_string()))?;
111
112 let ptr = if config.use_pinned_memory {
113 Self::allocate_pinned(layout)?
114 } else {
115 unsafe {
116 let raw_ptr = std::alloc::alloc(layout);
117 if raw_ptr.is_null() {
118 return Err(OxiGdalError::allocation_error(
119 "Failed to allocate buffer".to_string(),
120 ));
121 }
122 NonNull::new_unchecked(raw_ptr)
123 }
124 };
125
126 Ok(Self {
127 ptr,
128 len: size,
129 capacity: size,
130 ref_count: Arc::new(AtomicUsize::new(1)),
131 is_pinned: config.use_pinned_memory,
132 config,
133 })
134 }
135
136 #[allow(unsafe_code)]
142 fn allocate_pinned(layout: std::alloc::Layout) -> Result<NonNull<u8>> {
143 #[cfg(target_os = "linux")]
144 {
145 let ptr = unsafe {
147 libc::mmap(
148 std::ptr::null_mut(),
149 layout.size(),
150 libc::PROT_READ | libc::PROT_WRITE,
151 libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_LOCKED,
152 -1,
153 0,
154 )
155 };
156
157 if ptr == libc::MAP_FAILED {
158 return Err(OxiGdalError::allocation_error(
159 "Failed to allocate pinned memory".to_string(),
160 ));
161 }
162
163 NonNull::new(ptr as *mut u8)
164 .ok_or_else(|| OxiGdalError::allocation_error("mmap returned null".to_string()))
165 }
166
167 #[cfg(not(target_os = "linux"))]
168 {
169 unsafe {
171 let raw_ptr = std::alloc::alloc(layout);
172 if raw_ptr.is_null() {
173 return Err(OxiGdalError::allocation_error(
174 "Failed to allocate buffer".to_string(),
175 ));
176 }
177 Ok(NonNull::new_unchecked(raw_ptr))
178 }
179 }
180 }
181
182 #[must_use]
184 pub fn share(&self) -> Self {
185 self.ref_count.fetch_add(1, Ordering::Relaxed);
186 Self {
187 ptr: self.ptr,
188 len: self.len,
189 capacity: self.capacity,
190 ref_count: Arc::clone(&self.ref_count),
191 is_pinned: self.is_pinned,
192 config: self.config.clone(),
193 }
194 }
195
196 #[must_use]
198 pub fn ref_count(&self) -> usize {
199 self.ref_count.load(Ordering::Relaxed)
200 }
201
202 #[must_use]
204 pub fn is_unique(&self) -> bool {
205 self.ref_count() == 1
206 }
207
208 #[must_use]
210 pub fn len(&self) -> usize {
211 self.len
212 }
213
214 #[must_use]
216 pub fn is_empty(&self) -> bool {
217 self.len == 0
218 }
219
220 #[must_use]
222 pub fn capacity(&self) -> usize {
223 self.capacity
224 }
225
226 #[must_use]
228 pub fn is_pinned(&self) -> bool {
229 self.is_pinned
230 }
231
232 #[must_use]
234 pub fn as_slice(&self) -> &[u8] {
235 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
236 }
237
238 pub fn as_mut_slice(&mut self) -> Result<&mut [u8]> {
240 if !self.is_unique() {
241 if self.config.enable_cow {
242 self.make_unique()?;
243 } else {
244 return Err(OxiGdalError::invalid_operation(
245 "Cannot mutate shared buffer without COW".to_string(),
246 ));
247 }
248 }
249
250 Ok(unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) })
251 }
252
253 fn make_unique(&mut self) -> Result<()> {
255 if self.is_unique() {
256 return Ok(());
257 }
258
259 let layout = std::alloc::Layout::from_size_align(self.capacity, self.config.alignment)
261 .map_err(|e| OxiGdalError::allocation_error(e.to_string()))?;
262
263 let new_ptr = if self.is_pinned {
264 Self::allocate_pinned(layout)?
265 } else {
266 unsafe {
267 let raw_ptr = std::alloc::alloc(layout);
268 if raw_ptr.is_null() {
269 return Err(OxiGdalError::allocation_error(
270 "Failed to allocate buffer for COW".to_string(),
271 ));
272 }
273 NonNull::new_unchecked(raw_ptr)
274 }
275 };
276
277 unsafe {
279 std::ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_ptr.as_ptr(), self.len);
280 }
281
282 self.ref_count.fetch_sub(1, Ordering::Relaxed);
284
285 self.ptr = new_ptr;
287 self.ref_count = Arc::new(AtomicUsize::new(1));
288
289 Ok(())
290 }
291
292 pub fn clone_data(&self) -> Result<Self> {
294 let new_buffer = Self::with_config(self.len, self.config.clone())?;
295 unsafe {
296 std::ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
297 }
298 Ok(new_buffer)
299 }
300
301 pub fn as_typed_slice<T: bytemuck::Pod>(&self) -> Result<&[T]> {
303 if self.len % std::mem::size_of::<T>() != 0 {
304 return Err(OxiGdalError::invalid_parameter(
305 "parameter",
306 "Buffer size not aligned to type size".to_string(),
307 ));
308 }
309
310 let count = self.len / std::mem::size_of::<T>();
311 Ok(unsafe { std::slice::from_raw_parts(self.ptr.as_ptr() as *const T, count) })
312 }
313
314 pub fn as_typed_mut_slice<T: bytemuck::Pod>(&mut self) -> Result<&mut [T]> {
316 if !self.is_unique() {
317 if self.config.enable_cow {
318 self.make_unique()?;
319 } else {
320 return Err(OxiGdalError::invalid_operation(
321 "Cannot mutate shared buffer without COW".to_string(),
322 ));
323 }
324 }
325
326 if self.len % std::mem::size_of::<T>() != 0 {
327 return Err(OxiGdalError::invalid_parameter(
328 "parameter",
329 "Buffer size not aligned to type size".to_string(),
330 ));
331 }
332
333 let count = self.len / std::mem::size_of::<T>();
334 Ok(unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr().cast::<T>(), count) })
335 }
336}
337
338impl Clone for SharedBuffer {
339 fn clone(&self) -> Self {
340 self.share()
341 }
342}
343
344impl Deref for SharedBuffer {
345 type Target = [u8];
346
347 fn deref(&self) -> &Self::Target {
348 self.as_slice()
349 }
350}
351
352impl AsRef<[u8]> for SharedBuffer {
353 fn as_ref(&self) -> &[u8] {
354 self.as_slice()
355 }
356}
357
358impl Drop for SharedBuffer {
359 fn drop(&mut self) {
360 let count = self.ref_count.fetch_sub(1, Ordering::Relaxed);
361 if count == 1 {
362 unsafe {
364 if self.is_pinned {
365 #[cfg(target_os = "linux")]
366 {
367 libc::munmap(self.ptr.as_ptr() as *mut libc::c_void, self.capacity);
368 }
369 #[cfg(not(target_os = "linux"))]
370 {
371 let layout = std::alloc::Layout::from_size_align_unchecked(
372 self.capacity,
373 self.config.alignment,
374 );
375 std::alloc::dealloc(self.ptr.as_ptr(), layout);
376 }
377 } else {
378 let layout = std::alloc::Layout::from_size_align_unchecked(
379 self.capacity,
380 self.config.alignment,
381 );
382 std::alloc::dealloc(self.ptr.as_ptr(), layout);
383 }
384 }
385 }
386 }
387}
388
389unsafe impl Send for SharedBuffer {}
391unsafe impl Sync for SharedBuffer {}
392
393pub struct ZeroCopyBuffer<T: bytemuck::Pod> {
395 buffer: SharedBuffer,
397 _phantom: std::marker::PhantomData<T>,
399}
400
401impl<T: bytemuck::Pod> ZeroCopyBuffer<T> {
402 pub fn new(count: usize) -> Result<Self> {
404 let size = count * std::mem::size_of::<T>();
405 let buffer = SharedBuffer::new(size)?;
406 Ok(Self {
407 buffer,
408 _phantom: std::marker::PhantomData,
409 })
410 }
411
412 pub fn with_config(count: usize, config: ZeroCopyConfig) -> Result<Self> {
414 let size = count * std::mem::size_of::<T>();
415 let buffer = SharedBuffer::with_config(size, config)?;
416 Ok(Self {
417 buffer,
418 _phantom: std::marker::PhantomData,
419 })
420 }
421
422 pub fn from_buffer(buffer: SharedBuffer) -> Result<Self> {
424 if buffer.len() % std::mem::size_of::<T>() != 0 {
425 return Err(OxiGdalError::invalid_parameter(
426 "parameter",
427 "Buffer size not aligned to type size".to_string(),
428 ));
429 }
430
431 Ok(Self {
432 buffer,
433 _phantom: std::marker::PhantomData,
434 })
435 }
436
437 #[must_use]
439 pub fn len(&self) -> usize {
440 self.buffer.len() / std::mem::size_of::<T>()
441 }
442
443 #[must_use]
445 pub fn is_empty(&self) -> bool {
446 self.len() == 0
447 }
448
449 #[must_use]
451 pub fn as_slice(&self) -> &[T] {
452 let count = self.buffer.len() / std::mem::size_of::<T>();
456 unsafe { std::slice::from_raw_parts(self.buffer.ptr.as_ptr() as *const T, count) }
457 }
458
459 pub fn as_mut_slice(&mut self) -> Result<&mut [T]> {
461 self.buffer.as_typed_mut_slice()
462 }
463
464 #[must_use]
466 pub fn share(&self) -> Self {
467 Self {
468 buffer: self.buffer.share(),
469 _phantom: std::marker::PhantomData,
470 }
471 }
472
473 #[must_use]
475 pub fn is_unique(&self) -> bool {
476 self.buffer.is_unique()
477 }
478
479 #[must_use]
481 pub fn ref_count(&self) -> usize {
482 self.buffer.ref_count()
483 }
484
485 pub fn clone_data(&self) -> Result<Self> {
487 Ok(Self {
488 buffer: self.buffer.clone_data()?,
489 _phantom: std::marker::PhantomData,
490 })
491 }
492}
493
494impl<T: bytemuck::Pod> Clone for ZeroCopyBuffer<T> {
495 fn clone(&self) -> Self {
496 self.share()
497 }
498}
499
500impl<T: bytemuck::Pod> Deref for ZeroCopyBuffer<T> {
501 type Target = [T];
502
503 fn deref(&self) -> &Self::Target {
504 self.as_slice()
505 }
506}
507
508impl<T: bytemuck::Pod> AsRef<[T]> for ZeroCopyBuffer<T> {
509 fn as_ref(&self) -> &[T] {
510 self.as_slice()
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 #[test]
519 fn test_shared_buffer() {
520 let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
521 assert_eq!(buffer.len(), 1024);
522 assert_eq!(buffer.ref_count(), 1);
523 assert!(buffer.is_unique());
524
525 let shared = buffer.share();
526 assert_eq!(buffer.ref_count(), 2);
527 assert_eq!(shared.ref_count(), 2);
528 assert!(!buffer.is_unique());
529 assert!(!shared.is_unique());
530 }
531
532 #[test]
533 fn test_copy_on_write() {
534 let mut buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
535 let shared = buffer.share();
536
537 assert_eq!(buffer.ref_count(), 2);
538
539 let slice = buffer
541 .as_mut_slice()
542 .expect("Failed to get mutable slice (COW should trigger)");
543 slice[0] = 42;
544
545 assert_eq!(buffer.ref_count(), 1);
546 assert_eq!(shared.ref_count(), 1);
547 assert_eq!(buffer.as_slice()[0], 42);
548 assert_eq!(shared.as_slice()[0], 0);
549 }
550
551 #[test]
552 fn test_zero_copy_buffer() {
553 let buffer: ZeroCopyBuffer<u32> =
554 ZeroCopyBuffer::new(256).expect("Failed to create zero-copy buffer");
555 assert_eq!(buffer.len(), 256);
556 assert_eq!(buffer.ref_count(), 1);
557
558 let shared = buffer.share();
559 assert_eq!(buffer.ref_count(), 2);
560 assert_eq!(shared.ref_count(), 2);
561 }
562
563 #[test]
564 fn test_typed_slice() {
565 let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
566 let slice: &[u32] = buffer
567 .as_typed_slice()
568 .expect("Failed to create typed slice from buffer");
569 assert_eq!(slice.len(), 256);
570 }
571
572 #[test]
573 fn test_clone_data() {
574 let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
575 let cloned = buffer.clone_data().expect("Failed to clone buffer data");
576
577 assert_eq!(buffer.len(), cloned.len());
578 assert_eq!(buffer.ref_count(), 1);
579 assert_eq!(cloned.ref_count(), 1);
580 }
581}