mmap_io/
atomic.rs

1//! Atomic memory views for lock-free concurrent access to specific data types.
2
3use crate::errors::{MmapIoError, Result};
4use crate::mmap::MemoryMappedFile;
5use std::sync::atomic::{AtomicU32, AtomicU64};
6
7impl MemoryMappedFile {
8    /// Get an atomic view of a u64 value at the specified offset.
9    ///
10    /// The offset must be properly aligned for atomic operations (8-byte alignment for u64).
11    /// This allows lock-free concurrent access to the value.
12    ///
13    /// # Safety
14    ///
15    /// The returned reference is valid for the lifetime of the memory mapping.
16    /// The caller must ensure that the memory at this offset is not concurrently
17    /// modified through non-atomic operations.
18    ///
19    /// # Errors
20    ///
21    /// Returns `MmapIoError::Misaligned` if the offset is not 8-byte aligned.
22    /// Returns `MmapIoError::OutOfBounds` if the offset + 8 exceeds file bounds.
23    #[cfg(feature = "atomic")]
24    pub fn atomic_u64(&self, offset: u64) -> Result<&AtomicU64> {
25        const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
26        const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
27
28        // Check alignment
29        if offset % ALIGN != 0 {
30            return Err(MmapIoError::Misaligned {
31                required: ALIGN,
32                offset,
33            });
34        }
35
36        // Check bounds
37        let total = self.current_len()?;
38        if offset + SIZE > total {
39            return Err(MmapIoError::OutOfBounds {
40                offset,
41                len: SIZE,
42                total,
43            });
44        }
45
46        // Get the base pointer for the mapping
47        let ptr = match &self.inner.map {
48            crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
49            crate::mmap::MapVariant::Rw(lock) => {
50                let guard = lock.read();
51                guard.as_ptr()
52            }
53            crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
54        };
55
56        // SAFETY: We've validated alignment and bounds.
57        // The AtomicU64 reference is valid for the lifetime of self.
58        unsafe {
59            let addr = ptr.add(offset as usize);
60            let atomic_ptr = addr as *const AtomicU64;
61            Ok(&*atomic_ptr)
62        }
63    }
64
65    /// Get an atomic view of a u32 value at the specified offset.
66    ///
67    /// The offset must be properly aligned for atomic operations (4-byte alignment for u32).
68    /// This allows lock-free concurrent access to the value.
69    ///
70    /// # Safety
71    ///
72    /// The returned reference is valid for the lifetime of the memory mapping.
73    /// The caller must ensure that the memory at this offset is not concurrently
74    /// modified through non-atomic operations.
75    ///
76    /// # Errors
77    ///
78    /// Returns `MmapIoError::Misaligned` if the offset is not 4-byte aligned.
79    /// Returns `MmapIoError::OutOfBounds` if the offset + 4 exceeds file bounds.
80    #[cfg(feature = "atomic")]
81    pub fn atomic_u32(&self, offset: u64) -> Result<&AtomicU32> {
82        const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
83        const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
84
85        // Check alignment
86        if offset % ALIGN != 0 {
87            return Err(MmapIoError::Misaligned {
88                required: ALIGN,
89                offset,
90            });
91        }
92
93        // Check bounds
94        let total = self.current_len()?;
95        if offset + SIZE > total {
96            return Err(MmapIoError::OutOfBounds {
97                offset,
98                len: SIZE,
99                total,
100            });
101        }
102
103        // Get the base pointer for the mapping
104        let ptr = match &self.inner.map {
105            crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
106            crate::mmap::MapVariant::Rw(lock) => {
107                let guard = lock.read();
108                guard.as_ptr()
109            }
110            crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
111        };
112
113        // SAFETY: We've validated alignment and bounds.
114        // The AtomicU32 reference is valid for the lifetime of self.
115        unsafe {
116            let addr = ptr.add(offset as usize);
117            let atomic_ptr = addr as *const AtomicU32;
118            Ok(&*atomic_ptr)
119        }
120    }
121
122    /// Get multiple atomic u64 views starting at the specified offset.
123    ///
124    /// Returns a slice of atomic values. All values must be within bounds
125    /// and the offset must be 8-byte aligned.
126    ///
127    /// # Errors
128    ///
129    /// Returns `MmapIoError::Misaligned` if the offset is not 8-byte aligned.
130    /// Returns `MmapIoError::OutOfBounds` if the range exceeds file bounds.
131    #[cfg(feature = "atomic")]
132    pub fn atomic_u64_slice(&self, offset: u64, count: usize) -> Result<&[AtomicU64]> {
133        const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
134        const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
135
136        // Check alignment
137        if offset % ALIGN != 0 {
138            return Err(MmapIoError::Misaligned {
139                required: ALIGN,
140                offset,
141            });
142        }
143
144        // Check bounds
145        let total_size = SIZE * count as u64;
146        let total = self.current_len()?;
147        if offset + total_size > total {
148            return Err(MmapIoError::OutOfBounds {
149                offset,
150                len: total_size,
151                total,
152            });
153        }
154
155        // Get the base pointer for the mapping
156        let ptr = match &self.inner.map {
157            crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
158            crate::mmap::MapVariant::Rw(lock) => {
159                let guard = lock.read();
160                guard.as_ptr()
161            }
162            crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
163        };
164
165        // SAFETY: We've validated alignment and bounds.
166        // The slice is valid for the lifetime of self.
167        unsafe {
168            let addr = ptr.add(offset as usize);
169            let atomic_ptr = addr as *const AtomicU64;
170            Ok(std::slice::from_raw_parts(atomic_ptr, count))
171        }
172    }
173
174    /// Get multiple atomic u32 views starting at the specified offset.
175    ///
176    /// Returns a slice of atomic values. All values must be within bounds
177    /// and the offset must be 4-byte aligned.
178    ///
179    /// # Errors
180    ///
181    /// Returns `MmapIoError::Misaligned` if the offset is not 4-byte aligned.
182    /// Returns `MmapIoError::OutOfBounds` if the range exceeds file bounds.
183    #[cfg(feature = "atomic")]
184    pub fn atomic_u32_slice(&self, offset: u64, count: usize) -> Result<&[AtomicU32]> {
185        const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
186        const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
187
188        // Check alignment
189        if offset % ALIGN != 0 {
190            return Err(MmapIoError::Misaligned {
191                required: ALIGN,
192                offset,
193            });
194        }
195
196        // Check bounds
197        let total_size = SIZE * count as u64;
198        let total = self.current_len()?;
199        if offset + total_size > total {
200            return Err(MmapIoError::OutOfBounds {
201                offset,
202                len: total_size,
203                total,
204            });
205        }
206
207        // Get the base pointer for the mapping
208        let ptr = match &self.inner.map {
209            crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
210            crate::mmap::MapVariant::Rw(lock) => {
211                let guard = lock.read();
212                guard.as_ptr()
213            }
214            crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
215        };
216
217        // SAFETY: We've validated alignment and bounds.
218        // The slice is valid for the lifetime of self.
219        unsafe {
220            let addr = ptr.add(offset as usize);
221            let atomic_ptr = addr as *const AtomicU32;
222            Ok(std::slice::from_raw_parts(atomic_ptr, count))
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::create_mmap;
231    use std::fs;
232    use std::path::PathBuf;
233    use std::sync::atomic::Ordering;
234
235    fn tmp_path(name: &str) -> PathBuf {
236        let mut p = std::env::temp_dir();
237        p.push(format!("mmap_io_atomic_test_{}_{}", name, std::process::id()));
238        p
239    }
240
241    #[test]
242    #[cfg(feature = "atomic")]
243    fn test_atomic_u64_operations() {
244        let path = tmp_path("atomic_u64");
245        let _ = fs::remove_file(&path);
246
247        let mmap = create_mmap(&path, 64).expect("create");
248
249        // Test aligned access
250        let atomic = mmap.atomic_u64(0).expect("atomic at 0");
251        atomic.store(0x1234567890ABCDEF, Ordering::SeqCst);
252        assert_eq!(atomic.load(Ordering::SeqCst), 0x1234567890ABCDEF);
253
254        // Test another aligned offset
255        let atomic2 = mmap.atomic_u64(8).expect("atomic at 8");
256        atomic2.store(0xFEDCBA0987654321, Ordering::SeqCst);
257        assert_eq!(atomic2.load(Ordering::SeqCst), 0xFEDCBA0987654321);
258
259        // Test misaligned access
260        assert!(matches!(
261            mmap.atomic_u64(1),
262            Err(MmapIoError::Misaligned { required: 8, offset: 1 })
263        ));
264        assert!(matches!(
265            mmap.atomic_u64(7),
266            Err(MmapIoError::Misaligned { required: 8, offset: 7 })
267        ));
268
269        // Test out of bounds
270        assert!(mmap.atomic_u64(64).is_err());
271        assert!(mmap.atomic_u64(57).is_err()); // Would need 8 bytes
272
273        fs::remove_file(&path).expect("cleanup");
274    }
275
276    #[test]
277    #[cfg(feature = "atomic")]
278    fn test_atomic_u32_operations() {
279        let path = tmp_path("atomic_u32");
280        let _ = fs::remove_file(&path);
281
282        let mmap = create_mmap(&path, 32).expect("create");
283
284        // Test aligned access
285        let atomic = mmap.atomic_u32(0).expect("atomic at 0");
286        atomic.store(0x12345678, Ordering::SeqCst);
287        assert_eq!(atomic.load(Ordering::SeqCst), 0x12345678);
288
289        // Test another aligned offset
290        let atomic2 = mmap.atomic_u32(4).expect("atomic at 4");
291        atomic2.store(0x87654321, Ordering::SeqCst);
292        assert_eq!(atomic2.load(Ordering::SeqCst), 0x87654321);
293
294        // Test misaligned access
295        assert!(matches!(
296            mmap.atomic_u32(1),
297            Err(MmapIoError::Misaligned { required: 4, offset: 1 })
298        ));
299        assert!(matches!(
300            mmap.atomic_u32(3),
301            Err(MmapIoError::Misaligned { required: 4, offset: 3 })
302        ));
303
304        // Test out of bounds
305        assert!(mmap.atomic_u32(32).is_err());
306        assert!(mmap.atomic_u32(29).is_err()); // Would need 4 bytes
307
308        fs::remove_file(&path).expect("cleanup");
309    }
310
311    #[test]
312    #[cfg(feature = "atomic")]
313    fn test_atomic_slices() {
314        let path = tmp_path("atomic_slices");
315        let _ = fs::remove_file(&path);
316
317        let mmap = create_mmap(&path, 128).expect("create");
318
319        // Test u64 slice
320        let slice = mmap.atomic_u64_slice(0, 4).expect("u64 slice");
321        assert_eq!(slice.len(), 4);
322        for (i, atomic) in slice.iter().enumerate() {
323            atomic.store(i as u64 * 100, Ordering::SeqCst);
324        }
325        for (i, atomic) in slice.iter().enumerate() {
326            assert_eq!(atomic.load(Ordering::SeqCst), i as u64 * 100);
327        }
328
329        // Test u32 slice
330        let slice = mmap.atomic_u32_slice(64, 8).expect("u32 slice");
331        assert_eq!(slice.len(), 8);
332        for (i, atomic) in slice.iter().enumerate() {
333            atomic.store(i as u32 * 10, Ordering::SeqCst);
334        }
335        for (i, atomic) in slice.iter().enumerate() {
336            assert_eq!(atomic.load(Ordering::SeqCst), i as u32 * 10);
337        }
338
339        // Test misaligned slice
340        assert!(mmap.atomic_u64_slice(1, 2).is_err());
341        assert!(mmap.atomic_u32_slice(2, 2).is_err());
342
343        // Test out of bounds slice
344        assert!(mmap.atomic_u64_slice(120, 2).is_err()); // Would need 16 bytes
345        assert!(mmap.atomic_u32_slice(124, 2).is_err()); // Would need 8 bytes
346
347        fs::remove_file(&path).expect("cleanup");
348    }
349
350    #[test]
351    #[cfg(feature = "atomic")]
352    fn test_atomic_with_different_modes() {
353        let path = tmp_path("atomic_modes");
354        let _ = fs::remove_file(&path);
355
356        // Create and write initial values
357        let mmap = create_mmap(&path, 16).expect("create");
358        let atomic = mmap.atomic_u64(0).expect("atomic");
359        atomic.store(42, Ordering::SeqCst);
360        mmap.flush().expect("flush");
361        drop(mmap);
362
363        // Test with RO mode
364        let mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
365        let atomic = mmap.atomic_u64(0).expect("atomic ro");
366        assert_eq!(atomic.load(Ordering::SeqCst), 42);
367        // Note: Writing to RO atomic would be UB, so we don't test it
368
369        #[cfg(feature = "cow")]
370        {
371            // Test with COW mode
372            let mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
373            let atomic = mmap.atomic_u64(0).expect("atomic cow");
374            assert_eq!(atomic.load(Ordering::SeqCst), 42);
375            // COW writes would only affect this process
376        }
377
378        fs::remove_file(&path).expect("cleanup");
379    }
380
381    #[test]
382    #[cfg(feature = "atomic")]
383    fn test_concurrent_atomic_access() {
384        use std::sync::Arc;
385        use std::thread;
386
387        let path = tmp_path("concurrent_atomic");
388        let _ = fs::remove_file(&path);
389
390        let mmap = Arc::new(create_mmap(&path, 8).expect("create"));
391        let atomic = mmap.atomic_u64(0).expect("atomic");
392        atomic.store(0, Ordering::SeqCst);
393
394        // Spawn multiple threads incrementing the same atomic
395        let handles: Vec<_> = (0..4)
396            .map(|_| {
397                let mmap = Arc::clone(&mmap);
398                thread::spawn(move || {
399                    let atomic = mmap.atomic_u64(0).expect("atomic in thread");
400                    for _ in 0..1000 {
401                        atomic.fetch_add(1, Ordering::SeqCst);
402                    }
403                })
404            })
405            .collect();
406
407        for handle in handles {
408            handle.join().expect("thread join");
409        }
410
411        // Verify all increments were recorded
412        assert_eq!(atomic.load(Ordering::SeqCst), 4000);
413
414        fs::remove_file(&path).expect("cleanup");
415    }
416}