Skip to main content

oxigdal_core/memory/
zero_copy.rs

1//! Zero-Copy Data Transfers
2//!
3//! This module provides zero-copy buffer sharing and transfers:
4//! - Buffer sharing between operations
5//! - GPU-CPU zero-copy transfers (pinned memory)
6//! - Reference-counted buffers
7//! - Copy-on-write semantics
8
9// Unsafe code is necessary for zero-copy operations
10#![allow(unsafe_code)]
11
12use crate::error::{OxiGdalError, Result};
13use std::ops::Deref;
14use std::ptr::NonNull;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18/// Configuration for zero-copy buffers
19#[derive(Debug, Clone)]
20pub struct ZeroCopyConfig {
21    /// Use pinned memory for GPU transfers
22    pub use_pinned_memory: bool,
23    /// Enable copy-on-write semantics
24    pub enable_cow: bool,
25    /// Alignment requirement
26    pub alignment: usize,
27    /// Enable statistics tracking
28    pub track_stats: bool,
29}
30
31impl Default for ZeroCopyConfig {
32    fn default() -> Self {
33        Self {
34            use_pinned_memory: false,
35            enable_cow: true,
36            alignment: 64,
37            track_stats: true,
38        }
39    }
40}
41
42impl ZeroCopyConfig {
43    /// Create new configuration
44    #[must_use]
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    /// Enable pinned memory
50    #[must_use]
51    pub fn with_pinned_memory(mut self, enable: bool) -> Self {
52        self.use_pinned_memory = enable;
53        self
54    }
55
56    /// Enable copy-on-write
57    #[must_use]
58    pub fn with_cow(mut self, enable: bool) -> Self {
59        self.enable_cow = enable;
60        self
61    }
62
63    /// Set alignment
64    #[must_use]
65    pub fn with_alignment(mut self, alignment: usize) -> Self {
66        self.alignment = alignment;
67        self
68    }
69
70    /// Enable statistics
71    #[must_use]
72    pub fn with_stats(mut self, enable: bool) -> Self {
73        self.track_stats = enable;
74        self
75    }
76}
77
78/// Reference-counted buffer with zero-copy semantics
79pub struct SharedBuffer {
80    /// Pointer to data
81    ptr: NonNull<u8>,
82    /// Length in bytes
83    len: usize,
84    /// Capacity in bytes
85    capacity: usize,
86    /// Reference count
87    ref_count: Arc<AtomicUsize>,
88    /// Whether buffer is pinned
89    is_pinned: bool,
90    /// Configuration
91    config: ZeroCopyConfig,
92}
93
94impl SharedBuffer {
95    /// Create a new shared buffer
96    pub fn new(size: usize) -> Result<Self> {
97        Self::with_config(size, ZeroCopyConfig::default())
98    }
99
100    /// Create a new shared buffer with configuration
101    pub fn with_config(size: usize, config: ZeroCopyConfig) -> Result<Self> {
102        if size == 0 {
103            return Err(OxiGdalError::invalid_parameter(
104                "parameter",
105                "Buffer size must be non-zero".to_string(),
106            ));
107        }
108
109        let layout = std::alloc::Layout::from_size_align(size, config.alignment)
110            .map_err(|e| OxiGdalError::allocation_error(e.to_string()))?;
111
112        let ptr = if config.use_pinned_memory {
113            Self::allocate_pinned(layout)?
114        } else {
115            unsafe {
116                let raw_ptr = std::alloc::alloc(layout);
117                if raw_ptr.is_null() {
118                    return Err(OxiGdalError::allocation_error(
119                        "Failed to allocate buffer".to_string(),
120                    ));
121                }
122                NonNull::new_unchecked(raw_ptr)
123            }
124        };
125
126        Ok(Self {
127            ptr,
128            len: size,
129            capacity: size,
130            ref_count: Arc::new(AtomicUsize::new(1)),
131            is_pinned: config.use_pinned_memory,
132            config,
133        })
134    }
135
136    /// Allocate pinned memory for GPU transfers
137    ///
138    /// # Safety
139    ///
140    /// Uses unsafe allocation. Pinned memory is allocated differently on different platforms.
141    #[allow(unsafe_code)]
142    fn allocate_pinned(layout: std::alloc::Layout) -> Result<NonNull<u8>> {
143        #[cfg(target_os = "linux")]
144        {
145            // On Linux, use mmap with MAP_LOCKED
146            let ptr = unsafe {
147                libc::mmap(
148                    std::ptr::null_mut(),
149                    layout.size(),
150                    libc::PROT_READ | libc::PROT_WRITE,
151                    libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_LOCKED,
152                    -1,
153                    0,
154                )
155            };
156
157            if ptr == libc::MAP_FAILED {
158                return Err(OxiGdalError::allocation_error(
159                    "Failed to allocate pinned memory".to_string(),
160                ));
161            }
162
163            NonNull::new(ptr as *mut u8)
164                .ok_or_else(|| OxiGdalError::allocation_error("mmap returned null".to_string()))
165        }
166
167        #[cfg(not(target_os = "linux"))]
168        {
169            // Fallback to regular allocation
170            unsafe {
171                let raw_ptr = std::alloc::alloc(layout);
172                if raw_ptr.is_null() {
173                    return Err(OxiGdalError::allocation_error(
174                        "Failed to allocate buffer".to_string(),
175                    ));
176                }
177                Ok(NonNull::new_unchecked(raw_ptr))
178            }
179        }
180    }
181
182    /// Create a shared reference to this buffer
183    #[must_use]
184    pub fn share(&self) -> Self {
185        self.ref_count.fetch_add(1, Ordering::Relaxed);
186        Self {
187            ptr: self.ptr,
188            len: self.len,
189            capacity: self.capacity,
190            ref_count: Arc::clone(&self.ref_count),
191            is_pinned: self.is_pinned,
192            config: self.config.clone(),
193        }
194    }
195
196    /// Get current reference count
197    #[must_use]
198    pub fn ref_count(&self) -> usize {
199        self.ref_count.load(Ordering::Relaxed)
200    }
201
202    /// Check if this is the only reference
203    #[must_use]
204    pub fn is_unique(&self) -> bool {
205        self.ref_count() == 1
206    }
207
208    /// Get length
209    #[must_use]
210    pub fn len(&self) -> usize {
211        self.len
212    }
213
214    /// Check if empty
215    #[must_use]
216    pub fn is_empty(&self) -> bool {
217        self.len == 0
218    }
219
220    /// Get capacity
221    #[must_use]
222    pub fn capacity(&self) -> usize {
223        self.capacity
224    }
225
226    /// Check if buffer is pinned
227    #[must_use]
228    pub fn is_pinned(&self) -> bool {
229        self.is_pinned
230    }
231
232    /// Get a slice of the buffer
233    #[must_use]
234    pub fn as_slice(&self) -> &[u8] {
235        unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
236    }
237
238    /// Get a mutable slice (requires unique ownership or COW)
239    pub fn as_mut_slice(&mut self) -> Result<&mut [u8]> {
240        if !self.is_unique() {
241            if self.config.enable_cow {
242                self.make_unique()?;
243            } else {
244                return Err(OxiGdalError::invalid_operation(
245                    "Cannot mutate shared buffer without COW".to_string(),
246                ));
247            }
248        }
249
250        Ok(unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) })
251    }
252
253    /// Make this buffer unique by copying if necessary (COW)
254    fn make_unique(&mut self) -> Result<()> {
255        if self.is_unique() {
256            return Ok(());
257        }
258
259        // Allocate new buffer
260        let layout = std::alloc::Layout::from_size_align(self.capacity, self.config.alignment)
261            .map_err(|e| OxiGdalError::allocation_error(e.to_string()))?;
262
263        let new_ptr = if self.is_pinned {
264            Self::allocate_pinned(layout)?
265        } else {
266            unsafe {
267                let raw_ptr = std::alloc::alloc(layout);
268                if raw_ptr.is_null() {
269                    return Err(OxiGdalError::allocation_error(
270                        "Failed to allocate buffer for COW".to_string(),
271                    ));
272                }
273                NonNull::new_unchecked(raw_ptr)
274            }
275        };
276
277        // Copy data
278        unsafe {
279            std::ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_ptr.as_ptr(), self.len);
280        }
281
282        // Update reference count
283        self.ref_count.fetch_sub(1, Ordering::Relaxed);
284
285        // Update self to use new buffer
286        self.ptr = new_ptr;
287        self.ref_count = Arc::new(AtomicUsize::new(1));
288
289        Ok(())
290    }
291
292    /// Clone the buffer data (explicit copy)
293    pub fn clone_data(&self) -> Result<Self> {
294        let new_buffer = Self::with_config(self.len, self.config.clone())?;
295        unsafe {
296            std::ptr::copy_nonoverlapping(self.ptr.as_ptr(), new_buffer.ptr.as_ptr(), self.len);
297        }
298        Ok(new_buffer)
299    }
300
301    /// Get a typed slice view
302    pub fn as_typed_slice<T: bytemuck::Pod>(&self) -> Result<&[T]> {
303        if self.len % std::mem::size_of::<T>() != 0 {
304            return Err(OxiGdalError::invalid_parameter(
305                "parameter",
306                "Buffer size not aligned to type size".to_string(),
307            ));
308        }
309
310        let count = self.len / std::mem::size_of::<T>();
311        Ok(unsafe { std::slice::from_raw_parts(self.ptr.as_ptr() as *const T, count) })
312    }
313
314    /// Get a typed mutable slice view
315    pub fn as_typed_mut_slice<T: bytemuck::Pod>(&mut self) -> Result<&mut [T]> {
316        if !self.is_unique() {
317            if self.config.enable_cow {
318                self.make_unique()?;
319            } else {
320                return Err(OxiGdalError::invalid_operation(
321                    "Cannot mutate shared buffer without COW".to_string(),
322                ));
323            }
324        }
325
326        if self.len % std::mem::size_of::<T>() != 0 {
327            return Err(OxiGdalError::invalid_parameter(
328                "parameter",
329                "Buffer size not aligned to type size".to_string(),
330            ));
331        }
332
333        let count = self.len / std::mem::size_of::<T>();
334        Ok(unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr().cast::<T>(), count) })
335    }
336}
337
338impl Clone for SharedBuffer {
339    fn clone(&self) -> Self {
340        self.share()
341    }
342}
343
344impl Deref for SharedBuffer {
345    type Target = [u8];
346
347    fn deref(&self) -> &Self::Target {
348        self.as_slice()
349    }
350}
351
352impl AsRef<[u8]> for SharedBuffer {
353    fn as_ref(&self) -> &[u8] {
354        self.as_slice()
355    }
356}
357
358impl Drop for SharedBuffer {
359    fn drop(&mut self) {
360        let count = self.ref_count.fetch_sub(1, Ordering::Relaxed);
361        if count == 1 {
362            // Last reference, deallocate
363            unsafe {
364                if self.is_pinned {
365                    #[cfg(target_os = "linux")]
366                    {
367                        libc::munmap(self.ptr.as_ptr() as *mut libc::c_void, self.capacity);
368                    }
369                    #[cfg(not(target_os = "linux"))]
370                    {
371                        let layout = std::alloc::Layout::from_size_align_unchecked(
372                            self.capacity,
373                            self.config.alignment,
374                        );
375                        std::alloc::dealloc(self.ptr.as_ptr(), layout);
376                    }
377                } else {
378                    let layout = std::alloc::Layout::from_size_align_unchecked(
379                        self.capacity,
380                        self.config.alignment,
381                    );
382                    std::alloc::dealloc(self.ptr.as_ptr(), layout);
383                }
384            }
385        }
386    }
387}
388
389// Safety: SharedBuffer can be sent between threads
390unsafe impl Send for SharedBuffer {}
391unsafe impl Sync for SharedBuffer {}
392
393/// Zero-copy buffer wrapper
394pub struct ZeroCopyBuffer<T: bytemuck::Pod> {
395    /// Underlying shared buffer
396    buffer: SharedBuffer,
397    /// Phantom data for type
398    _phantom: std::marker::PhantomData<T>,
399}
400
401impl<T: bytemuck::Pod> ZeroCopyBuffer<T> {
402    /// Create a new zero-copy buffer
403    pub fn new(count: usize) -> Result<Self> {
404        let size = count * std::mem::size_of::<T>();
405        let buffer = SharedBuffer::new(size)?;
406        Ok(Self {
407            buffer,
408            _phantom: std::marker::PhantomData,
409        })
410    }
411
412    /// Create with configuration
413    pub fn with_config(count: usize, config: ZeroCopyConfig) -> Result<Self> {
414        let size = count * std::mem::size_of::<T>();
415        let buffer = SharedBuffer::with_config(size, config)?;
416        Ok(Self {
417            buffer,
418            _phantom: std::marker::PhantomData,
419        })
420    }
421
422    /// Create from existing buffer
423    pub fn from_buffer(buffer: SharedBuffer) -> Result<Self> {
424        if buffer.len() % std::mem::size_of::<T>() != 0 {
425            return Err(OxiGdalError::invalid_parameter(
426                "parameter",
427                "Buffer size not aligned to type size".to_string(),
428            ));
429        }
430
431        Ok(Self {
432            buffer,
433            _phantom: std::marker::PhantomData,
434        })
435    }
436
437    /// Get length in elements
438    #[must_use]
439    pub fn len(&self) -> usize {
440        self.buffer.len() / std::mem::size_of::<T>()
441    }
442
443    /// Check if empty
444    #[must_use]
445    pub fn is_empty(&self) -> bool {
446        self.len() == 0
447    }
448
449    /// Get as slice
450    #[must_use]
451    pub fn as_slice(&self) -> &[T] {
452        // SAFETY: Type alignment is verified in constructor (new, with_config, from_buffer).
453        // The buffer size is always a multiple of std::mem::size_of::<T>(), so this
454        // conversion is safe. bytemuck::Pod ensures T is safe to read from raw bytes.
455        let count = self.buffer.len() / std::mem::size_of::<T>();
456        unsafe { std::slice::from_raw_parts(self.buffer.ptr.as_ptr() as *const T, count) }
457    }
458
459    /// Get as mutable slice
460    pub fn as_mut_slice(&mut self) -> Result<&mut [T]> {
461        self.buffer.as_typed_mut_slice()
462    }
463
464    /// Share the buffer
465    #[must_use]
466    pub fn share(&self) -> Self {
467        Self {
468            buffer: self.buffer.share(),
469            _phantom: std::marker::PhantomData,
470        }
471    }
472
473    /// Check if unique
474    #[must_use]
475    pub fn is_unique(&self) -> bool {
476        self.buffer.is_unique()
477    }
478
479    /// Get reference count
480    #[must_use]
481    pub fn ref_count(&self) -> usize {
482        self.buffer.ref_count()
483    }
484
485    /// Clone the buffer data
486    pub fn clone_data(&self) -> Result<Self> {
487        Ok(Self {
488            buffer: self.buffer.clone_data()?,
489            _phantom: std::marker::PhantomData,
490        })
491    }
492}
493
494impl<T: bytemuck::Pod> Clone for ZeroCopyBuffer<T> {
495    fn clone(&self) -> Self {
496        self.share()
497    }
498}
499
500impl<T: bytemuck::Pod> Deref for ZeroCopyBuffer<T> {
501    type Target = [T];
502
503    fn deref(&self) -> &Self::Target {
504        self.as_slice()
505    }
506}
507
508impl<T: bytemuck::Pod> AsRef<[T]> for ZeroCopyBuffer<T> {
509    fn as_ref(&self) -> &[T] {
510        self.as_slice()
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    #[test]
519    fn test_shared_buffer() {
520        let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
521        assert_eq!(buffer.len(), 1024);
522        assert_eq!(buffer.ref_count(), 1);
523        assert!(buffer.is_unique());
524
525        let shared = buffer.share();
526        assert_eq!(buffer.ref_count(), 2);
527        assert_eq!(shared.ref_count(), 2);
528        assert!(!buffer.is_unique());
529        assert!(!shared.is_unique());
530    }
531
532    #[test]
533    fn test_copy_on_write() {
534        let mut buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
535        let shared = buffer.share();
536
537        assert_eq!(buffer.ref_count(), 2);
538
539        // This should trigger COW
540        let slice = buffer
541            .as_mut_slice()
542            .expect("Failed to get mutable slice (COW should trigger)");
543        slice[0] = 42;
544
545        assert_eq!(buffer.ref_count(), 1);
546        assert_eq!(shared.ref_count(), 1);
547        assert_eq!(buffer.as_slice()[0], 42);
548        assert_eq!(shared.as_slice()[0], 0);
549    }
550
551    #[test]
552    fn test_zero_copy_buffer() {
553        let buffer: ZeroCopyBuffer<u32> =
554            ZeroCopyBuffer::new(256).expect("Failed to create zero-copy buffer");
555        assert_eq!(buffer.len(), 256);
556        assert_eq!(buffer.ref_count(), 1);
557
558        let shared = buffer.share();
559        assert_eq!(buffer.ref_count(), 2);
560        assert_eq!(shared.ref_count(), 2);
561    }
562
563    #[test]
564    fn test_typed_slice() {
565        let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
566        let slice: &[u32] = buffer
567            .as_typed_slice()
568            .expect("Failed to create typed slice from buffer");
569        assert_eq!(slice.len(), 256);
570    }
571
572    #[test]
573    fn test_clone_data() {
574        let buffer = SharedBuffer::new(1024).expect("Failed to create shared buffer");
575        let cloned = buffer.clone_data().expect("Failed to clone buffer data");
576
577        assert_eq!(buffer.len(), cloned.len());
578        assert_eq!(buffer.ref_count(), 1);
579        assert_eq!(cloned.ref_count(), 1);
580    }
581}