1use crate::errors::{MmapIoError, Result};
4use crate::mmap::MemoryMappedFile;
5use std::sync::atomic::{AtomicU32, AtomicU64};
6
7impl MemoryMappedFile {
8 #[cfg(feature = "atomic")]
24 pub fn atomic_u64(&self, offset: u64) -> Result<&AtomicU64> {
25 const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
26 const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
27
28 if offset % ALIGN != 0 {
30 return Err(MmapIoError::Misaligned {
31 required: ALIGN,
32 offset,
33 });
34 }
35
36 let total = self.current_len()?;
38 if offset + SIZE > total {
39 return Err(MmapIoError::OutOfBounds {
40 offset,
41 len: SIZE,
42 total,
43 });
44 }
45
46 let ptr = match &self.inner.map {
48 crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
49 crate::mmap::MapVariant::Rw(lock) => {
50 let guard = lock.read();
51 guard.as_ptr()
52 }
53 crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
54 };
55
56 unsafe {
59 let addr = ptr.add(offset as usize);
60 let atomic_ptr = addr as *const AtomicU64;
61 Ok(&*atomic_ptr)
62 }
63 }
64
65 #[cfg(feature = "atomic")]
81 pub fn atomic_u32(&self, offset: u64) -> Result<&AtomicU32> {
82 const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
83 const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
84
85 if offset % ALIGN != 0 {
87 return Err(MmapIoError::Misaligned {
88 required: ALIGN,
89 offset,
90 });
91 }
92
93 let total = self.current_len()?;
95 if offset + SIZE > total {
96 return Err(MmapIoError::OutOfBounds {
97 offset,
98 len: SIZE,
99 total,
100 });
101 }
102
103 let ptr = match &self.inner.map {
105 crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
106 crate::mmap::MapVariant::Rw(lock) => {
107 let guard = lock.read();
108 guard.as_ptr()
109 }
110 crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
111 };
112
113 unsafe {
116 let addr = ptr.add(offset as usize);
117 let atomic_ptr = addr as *const AtomicU32;
118 Ok(&*atomic_ptr)
119 }
120 }
121
122 #[cfg(feature = "atomic")]
132 pub fn atomic_u64_slice(&self, offset: u64, count: usize) -> Result<&[AtomicU64]> {
133 const ALIGN: u64 = std::mem::align_of::<AtomicU64>() as u64;
134 const SIZE: u64 = std::mem::size_of::<AtomicU64>() as u64;
135
136 if offset % ALIGN != 0 {
138 return Err(MmapIoError::Misaligned {
139 required: ALIGN,
140 offset,
141 });
142 }
143
144 let total_size = SIZE * count as u64;
146 let total = self.current_len()?;
147 if offset + total_size > total {
148 return Err(MmapIoError::OutOfBounds {
149 offset,
150 len: total_size,
151 total,
152 });
153 }
154
155 let ptr = match &self.inner.map {
157 crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
158 crate::mmap::MapVariant::Rw(lock) => {
159 let guard = lock.read();
160 guard.as_ptr()
161 }
162 crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
163 };
164
165 unsafe {
168 let addr = ptr.add(offset as usize);
169 let atomic_ptr = addr as *const AtomicU64;
170 Ok(std::slice::from_raw_parts(atomic_ptr, count))
171 }
172 }
173
174 #[cfg(feature = "atomic")]
184 pub fn atomic_u32_slice(&self, offset: u64, count: usize) -> Result<&[AtomicU32]> {
185 const ALIGN: u64 = std::mem::align_of::<AtomicU32>() as u64;
186 const SIZE: u64 = std::mem::size_of::<AtomicU32>() as u64;
187
188 if offset % ALIGN != 0 {
190 return Err(MmapIoError::Misaligned {
191 required: ALIGN,
192 offset,
193 });
194 }
195
196 let total_size = SIZE * count as u64;
198 let total = self.current_len()?;
199 if offset + total_size > total {
200 return Err(MmapIoError::OutOfBounds {
201 offset,
202 len: total_size,
203 total,
204 });
205 }
206
207 let ptr = match &self.inner.map {
209 crate::mmap::MapVariant::Ro(m) => m.as_ptr(),
210 crate::mmap::MapVariant::Rw(lock) => {
211 let guard = lock.read();
212 guard.as_ptr()
213 }
214 crate::mmap::MapVariant::Cow(m) => m.as_ptr(),
215 };
216
217 unsafe {
220 let addr = ptr.add(offset as usize);
221 let atomic_ptr = addr as *const AtomicU32;
222 Ok(std::slice::from_raw_parts(atomic_ptr, count))
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::create_mmap;
231 use std::fs;
232 use std::path::PathBuf;
233 use std::sync::atomic::Ordering;
234
235 fn tmp_path(name: &str) -> PathBuf {
236 let mut p = std::env::temp_dir();
237 p.push(format!("mmap_io_atomic_test_{}_{}", name, std::process::id()));
238 p
239 }
240
241 #[test]
242 #[cfg(feature = "atomic")]
243 fn test_atomic_u64_operations() {
244 let path = tmp_path("atomic_u64");
245 let _ = fs::remove_file(&path);
246
247 let mmap = create_mmap(&path, 64).expect("create");
248
249 let atomic = mmap.atomic_u64(0).expect("atomic at 0");
251 atomic.store(0x1234567890ABCDEF, Ordering::SeqCst);
252 assert_eq!(atomic.load(Ordering::SeqCst), 0x1234567890ABCDEF);
253
254 let atomic2 = mmap.atomic_u64(8).expect("atomic at 8");
256 atomic2.store(0xFEDCBA0987654321, Ordering::SeqCst);
257 assert_eq!(atomic2.load(Ordering::SeqCst), 0xFEDCBA0987654321);
258
259 assert!(matches!(
261 mmap.atomic_u64(1),
262 Err(MmapIoError::Misaligned { required: 8, offset: 1 })
263 ));
264 assert!(matches!(
265 mmap.atomic_u64(7),
266 Err(MmapIoError::Misaligned { required: 8, offset: 7 })
267 ));
268
269 assert!(mmap.atomic_u64(64).is_err());
271 assert!(mmap.atomic_u64(57).is_err()); fs::remove_file(&path).expect("cleanup");
274 }
275
276 #[test]
277 #[cfg(feature = "atomic")]
278 fn test_atomic_u32_operations() {
279 let path = tmp_path("atomic_u32");
280 let _ = fs::remove_file(&path);
281
282 let mmap = create_mmap(&path, 32).expect("create");
283
284 let atomic = mmap.atomic_u32(0).expect("atomic at 0");
286 atomic.store(0x12345678, Ordering::SeqCst);
287 assert_eq!(atomic.load(Ordering::SeqCst), 0x12345678);
288
289 let atomic2 = mmap.atomic_u32(4).expect("atomic at 4");
291 atomic2.store(0x87654321, Ordering::SeqCst);
292 assert_eq!(atomic2.load(Ordering::SeqCst), 0x87654321);
293
294 assert!(matches!(
296 mmap.atomic_u32(1),
297 Err(MmapIoError::Misaligned { required: 4, offset: 1 })
298 ));
299 assert!(matches!(
300 mmap.atomic_u32(3),
301 Err(MmapIoError::Misaligned { required: 4, offset: 3 })
302 ));
303
304 assert!(mmap.atomic_u32(32).is_err());
306 assert!(mmap.atomic_u32(29).is_err()); fs::remove_file(&path).expect("cleanup");
309 }
310
311 #[test]
312 #[cfg(feature = "atomic")]
313 fn test_atomic_slices() {
314 let path = tmp_path("atomic_slices");
315 let _ = fs::remove_file(&path);
316
317 let mmap = create_mmap(&path, 128).expect("create");
318
319 let slice = mmap.atomic_u64_slice(0, 4).expect("u64 slice");
321 assert_eq!(slice.len(), 4);
322 for (i, atomic) in slice.iter().enumerate() {
323 atomic.store(i as u64 * 100, Ordering::SeqCst);
324 }
325 for (i, atomic) in slice.iter().enumerate() {
326 assert_eq!(atomic.load(Ordering::SeqCst), i as u64 * 100);
327 }
328
329 let slice = mmap.atomic_u32_slice(64, 8).expect("u32 slice");
331 assert_eq!(slice.len(), 8);
332 for (i, atomic) in slice.iter().enumerate() {
333 atomic.store(i as u32 * 10, Ordering::SeqCst);
334 }
335 for (i, atomic) in slice.iter().enumerate() {
336 assert_eq!(atomic.load(Ordering::SeqCst), i as u32 * 10);
337 }
338
339 assert!(mmap.atomic_u64_slice(1, 2).is_err());
341 assert!(mmap.atomic_u32_slice(2, 2).is_err());
342
343 assert!(mmap.atomic_u64_slice(120, 2).is_err()); assert!(mmap.atomic_u32_slice(124, 2).is_err()); fs::remove_file(&path).expect("cleanup");
348 }
349
350 #[test]
351 #[cfg(feature = "atomic")]
352 fn test_atomic_with_different_modes() {
353 let path = tmp_path("atomic_modes");
354 let _ = fs::remove_file(&path);
355
356 let mmap = create_mmap(&path, 16).expect("create");
358 let atomic = mmap.atomic_u64(0).expect("atomic");
359 atomic.store(42, Ordering::SeqCst);
360 mmap.flush().expect("flush");
361 drop(mmap);
362
363 let mmap = MemoryMappedFile::open_ro(&path).expect("open ro");
365 let atomic = mmap.atomic_u64(0).expect("atomic ro");
366 assert_eq!(atomic.load(Ordering::SeqCst), 42);
367 #[cfg(feature = "cow")]
370 {
371 let mmap = MemoryMappedFile::open_cow(&path).expect("open cow");
373 let atomic = mmap.atomic_u64(0).expect("atomic cow");
374 assert_eq!(atomic.load(Ordering::SeqCst), 42);
375 }
377
378 fs::remove_file(&path).expect("cleanup");
379 }
380
381 #[test]
382 #[cfg(feature = "atomic")]
383 fn test_concurrent_atomic_access() {
384 use std::sync::Arc;
385 use std::thread;
386
387 let path = tmp_path("concurrent_atomic");
388 let _ = fs::remove_file(&path);
389
390 let mmap = Arc::new(create_mmap(&path, 8).expect("create"));
391 let atomic = mmap.atomic_u64(0).expect("atomic");
392 atomic.store(0, Ordering::SeqCst);
393
394 let handles: Vec<_> = (0..4)
396 .map(|_| {
397 let mmap = Arc::clone(&mmap);
398 thread::spawn(move || {
399 let atomic = mmap.atomic_u64(0).expect("atomic in thread");
400 for _ in 0..1000 {
401 atomic.fetch_add(1, Ordering::SeqCst);
402 }
403 })
404 })
405 .collect();
406
407 for handle in handles {
408 handle.join().expect("thread join");
409 }
410
411 assert_eq!(atomic.load(Ordering::SeqCst), 4000);
413
414 fs::remove_file(&path).expect("cleanup");
415 }
416}