1use std::ffi::c_void;
2use std::fs::File;
3use std::io::Read;
4use std::mem::size_of;
5use std::os::fd::AsRawFd;
6use std::ptr;
7use std::sync::atomic;
8use tracing::{debug, error};
9
10use crate::shm::{VMClockShmBody, VMClockShmHeader};
11use clock_bound_shm::{syserror, ShmError};
12
13const VMCLOCK_SUPPORTED_VERSION: u16 = 1;
14
15struct MmapGuard {
20 segment: *mut c_void,
22
23 segsize: usize,
25
26 _file: File,
28}
29
30impl MmapGuard {
31 fn new(mut file: File) -> Result<Self, ShmError> {
35 let mut buffer = vec![];
36
37 let bytes_read = match file.read_to_end(&mut buffer) {
38 Ok(bytes_read) => bytes_read,
39 Err(_) => return syserror!("Failed to read SHM segment"),
40 };
41
42 if bytes_read == 0_usize {
43 error!("MmapGuard: Read zero bytes.");
44 return Err(ShmError::SegmentNotInitialized);
45 } else if bytes_read < size_of::<VMClockShmHeader>() {
46 error!("MmapGuard: Number of bytes read ({:?}) is less than the size of VMClockShmHeader ({:?}).", bytes_read, size_of::<VMClockShmHeader>());
47 return Err(ShmError::SegmentMalformed);
48 }
49
50 debug!("MMapGuard: Reading the VMClockShmHeader ...");
51
52 let header = VMClockShmHeader::read(&buffer)?;
54
55 let segsize = header.size.into_inner() as usize;
58
59 debug!("MMapGuard: Read a segment size of: {:?}", segsize);
60
61 let segment: *mut c_void = unsafe {
63 libc::mmap(
64 ptr::null_mut(),
65 segsize,
66 libc::PROT_READ,
67 libc::MAP_SHARED,
68 file.as_raw_fd(),
69 0,
70 )
71 };
72
73 if segment == libc::MAP_FAILED {
74 return syserror!("mmap SHM segment");
75 }
76
77 Ok(MmapGuard {
78 segment,
79 segsize,
80 _file: file,
81 })
82 }
83}
84
85impl Drop for MmapGuard {
86 fn drop(&mut self) {
88 unsafe {
92 let ret = libc::munmap(self.segment, self.segsize);
93 assert!(ret == 0);
94 }
95 }
96}
97
98pub struct VMClockShmReader {
112 _marker: std::marker::PhantomData<*const ()>,
115
116 _guard: MmapGuard,
118
119 version_ptr: *const atomic::AtomicU16,
123
124 seq_count_ptr: *const atomic::AtomicU32,
130
131 vmclock_shm_body_ptr: *const VMClockShmBody,
135
136 vmclock_shm_body_snapshot: VMClockShmBody,
141
142 seq_count_snapshot: u32,
144}
145
146impl VMClockShmReader {
147 pub fn new(path: &str) -> Result<VMClockShmReader, ShmError> {
153 debug!("VMClockShmReader::new(): path is: {:?}", path);
154 let file = match File::open(path) {
155 Ok(f) => f,
156 Err(e) => {
157 error!("VMClockShmReader::new(): {:?}", e);
158 return Err(ShmError::SegmentNotInitialized);
159 }
160 };
161
162 debug!("VMClockShmReader::new(): Creating a MmapGuard ...");
163 let mmap_guard = MmapGuard::new(file)?;
164
165 let mut cursor: *const u8 = mmap_guard.segment.cast();
168 debug!("VMClockShmReader::new(): Created the cursor.");
169
170 let version_ptr = unsafe { ptr::addr_of!((*cursor.cast::<VMClockShmHeader>()).version) };
175 let counter_id_ptr =
176 unsafe { ptr::addr_of!((*cursor.cast::<VMClockShmHeader>()).counter_id) };
177 let time_type_ptr =
178 unsafe { ptr::addr_of!((*cursor.cast::<VMClockShmHeader>()).time_type) };
179 let seq_count_ptr =
180 unsafe { ptr::addr_of!((*cursor.cast::<VMClockShmHeader>()).seq_count) };
181
182 let version = unsafe { &*version_ptr };
187 let version_number = version.load(atomic::Ordering::Acquire);
188 if version_number != VMCLOCK_SUPPORTED_VERSION {
189 error!("VMClock shared memory segment has version {:?} which is not supported by this version of the VMClockShmReader.", version_number);
190 return Err(ShmError::SegmentVersionNotSupported);
191 }
192
193 let counter_id = unsafe { &*counter_id_ptr };
197 let counter_id_value = counter_id.load(atomic::Ordering::Acquire);
198 debug!("VMClockShmReader::(): counter_id: {:?}", counter_id_value);
199
200 let time_type = unsafe { &*time_type_ptr };
204 let time_type_value = time_type.load(atomic::Ordering::Acquire);
205 debug!("VMClockShmReader::(): time_type: {:?}", time_type_value);
206
207 if mmap_guard.segsize < (size_of::<VMClockShmHeader>() + size_of::<VMClockShmBody>()) {
210 error!("VMClockShmReader::new(): Segment size is smaller than expected.");
211 return Err(ShmError::SegmentMalformed);
212 }
213 cursor = unsafe { cursor.add(size_of::<VMClockShmHeader>()) };
215 let vmclock_shm_body_ptr = unsafe { ptr::addr_of!(*cursor.cast::<VMClockShmBody>()) };
216
217 Ok(VMClockShmReader {
218 _marker: std::marker::PhantomData,
219 _guard: mmap_guard,
220 version_ptr,
221 seq_count_ptr,
222 vmclock_shm_body_ptr,
223 vmclock_shm_body_snapshot: VMClockShmBody::default(),
224 seq_count_snapshot: 0,
225 })
226 }
227
228 pub fn snapshot(&mut self) -> Result<&VMClockShmBody, ShmError> {
240 let version = unsafe { &*self.version_ptr };
243 let version = version.load(atomic::Ordering::Acquire);
244
245 if version != VMCLOCK_SUPPORTED_VERSION {
250 error!("VMClock shared memory segment has version {:?} which is not supported by this version of the VMClockShmReader.", version);
251 return Err(ShmError::SegmentVersionNotSupported);
252 }
253
254 let seq_count = unsafe { &*self.seq_count_ptr };
257 let mut seq_count_first = seq_count.load(atomic::Ordering::Acquire);
258
259 if seq_count_first == self.seq_count_snapshot {
272 return Ok(&self.vmclock_shm_body_snapshot);
273 }
274
275 let mut retries = u32::MAX;
283 while retries > 0 {
284 let snapshot = unsafe { self.vmclock_shm_body_ptr.read_volatile() };
287
288 let seq_count_second = seq_count.load(atomic::Ordering::Acquire);
290
291 if seq_count_second & 0x0001 == 0 {
293 if seq_count_first == seq_count_second {
294 self.seq_count_snapshot = seq_count_first;
295 self.vmclock_shm_body_snapshot = snapshot;
296 return Ok(&self.vmclock_shm_body_snapshot);
297 } else {
298 seq_count_first = seq_count_second;
299 }
300 }
301 retries -= 1;
302 }
303
304 Err(ShmError::SegmentNotInitialized)
306 }
307}
308
309#[cfg(test)]
310mod t_reader {
311 use super::*;
312 use crate::shm::VMClockClockStatus;
313 use std::fs::{File, OpenOptions};
314 use std::io::Write;
315 use std::path::Path;
316 use tempfile::NamedTempFile;
320
321 #[repr(C)]
323 #[derive(Debug, Copy, Clone, PartialEq)]
324 struct VMClockContent {
325 magic: u32,
326 size: u32,
327 version: u16,
328 counter_id: u8,
329 time_type: u8,
330 seq_count: u32,
331 disruption_marker: u64,
332 flags: u64,
333 _padding: [u8; 2],
334 clock_status: VMClockClockStatus,
335 leap_second_smearing_hint: u8,
336 tai_offset_sec: i16,
337 leap_indicator: u8,
338 counter_period_shift: u8,
339 counter_value: u64,
340 counter_period_frac_sec: u64,
341 counter_period_esterror_rate_frac_sec: u64,
342 counter_period_maxerror_rate_frac_sec: u64,
343 time_sec: u64,
344 time_frac_sec: u64,
345 time_esterror_nanosec: u64,
346 time_maxerror_nanosec: u64,
347 }
348
349 fn write_vmclock_content(file: &mut File, vmclock_content: &VMClockContent) {
350 let slice = unsafe {
353 ::core::slice::from_raw_parts(
354 (vmclock_content as *const VMClockContent) as *const u8,
355 ::core::mem::size_of::<VMClockContent>(),
356 )
357 };
358
359 file.write_all(slice).expect("Write failed VMClockContent");
360 file.sync_all().expect("Sync to disk failed");
361 }
362
363 fn remove_path_if_exists(path_shm: &str) {
364 let path = Path::new(path_shm);
365 if path.exists() {
366 if path.is_dir() {
367 std::fs::remove_dir_all(path_shm).expect("failed to remove file");
368 } else {
369 std::fs::remove_file(path_shm).expect("failed to remove file");
370 }
371 }
372 }
373
374 #[test]
376 fn test_reader_new() {
377 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
378 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
379 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
380 let mut vmclock_shm_file = OpenOptions::new()
381 .write(true)
382 .open(vmclock_shm_path)
383 .expect("open vmclock file failed");
384 let vmclock_content = VMClockContent {
385 magic: 0x4B4C4356,
386 size: 104_u32,
387 version: 1_u16,
388 counter_id: 1_u8,
389 time_type: 0_u8,
390 seq_count: 10_u32,
391 disruption_marker: 888888_u64,
392 flags: 0_u64,
393 _padding: [0x00, 0x00],
394 clock_status: VMClockClockStatus::Synchronized,
395 leap_second_smearing_hint: 0_u8,
396 tai_offset_sec: 0_i16,
397 leap_indicator: 0_u8,
398 counter_period_shift: 0_u8,
399 counter_value: 123456_u64,
400 counter_period_frac_sec: 0_u64,
401 counter_period_esterror_rate_frac_sec: 0_u64,
402 counter_period_maxerror_rate_frac_sec: 0_u64,
403 time_sec: 0_u64,
404 time_frac_sec: 0_u64,
405 time_esterror_nanosec: 0_u64,
406 time_maxerror_nanosec: 0_u64,
407 };
408 write_vmclock_content(&mut vmclock_shm_file, &vmclock_content);
409
410 let reader =
411 VMClockShmReader::new(&vmclock_shm_path).expect("Failed to create VMClockShmReader");
412
413 let version = unsafe { &*reader.version_ptr };
414 let seq_count = unsafe { &*reader.seq_count_ptr };
415 let vmclock_shm_body = unsafe { *reader.vmclock_shm_body_ptr };
416
417 assert_eq!(version.load(atomic::Ordering::Relaxed), 1_u16);
418 assert_eq!(seq_count.load(atomic::Ordering::Relaxed), 10_u32);
419 assert_eq!(vmclock_shm_body.counter_value, 123456_u64);
420 assert_eq!(
421 vmclock_shm_body.clock_status,
422 VMClockClockStatus::Synchronized
423 );
424 assert_eq!(vmclock_shm_body.disruption_marker, 888888_u64);
425 }
426
427 #[test]
429 fn test_reader_file_does_not_exist() {
430 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
431 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
432 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
433 remove_path_if_exists(vmclock_shm_path);
434
435 let expected = ShmError::SegmentNotInitialized;
436 match VMClockShmReader::new(&vmclock_shm_path) {
437 Err(actual) => assert_eq!(expected, actual),
438 _ => assert!(false),
439 }
440 }
441
442 #[test]
444 fn test_reader_file_is_empty() {
445 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
446 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
447 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
448
449 let expected = ShmError::SegmentNotInitialized;
450 match VMClockShmReader::new(&vmclock_shm_path) {
451 Err(actual) => assert_eq!(expected, actual),
452 _ => assert!(false),
453 }
454 }
455
456 #[test]
459 fn test_reader_version_not_supported() {
460 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
461 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
462 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
463 let mut vmclock_shm_file = OpenOptions::new()
464 .write(true)
465 .open(vmclock_shm_path)
466 .expect("open vmclock file failed");
467 let vmclock_content = VMClockContent {
468 magic: 0x4B4C4356,
469 size: 104_u32,
470 version: 999_u16,
471 counter_id: 1_u8,
472 time_type: 0_u8,
473 seq_count: 10_u32,
474 disruption_marker: 888888_u64,
475 flags: 0_u64,
476 _padding: [0x00, 0x00],
477 clock_status: VMClockClockStatus::Synchronized,
478 leap_second_smearing_hint: 0_u8,
479 tai_offset_sec: 0_i16,
480 leap_indicator: 0_u8,
481 counter_period_shift: 0_u8,
482 counter_value: 123456_u64,
483 counter_period_frac_sec: 0_u64,
484 counter_period_esterror_rate_frac_sec: 0_u64,
485 counter_period_maxerror_rate_frac_sec: 0_u64,
486 time_sec: 0_u64,
487 time_frac_sec: 0_u64,
488 time_esterror_nanosec: 0_u64,
489 time_maxerror_nanosec: 0_u64,
490 };
491 write_vmclock_content(&mut vmclock_shm_file, &vmclock_content);
492
493 let expected = ShmError::SegmentVersionNotSupported;
494 match VMClockShmReader::new(&vmclock_shm_path) {
495 Err(actual) => assert_eq!(expected, actual),
496 _ => assert!(false),
497 }
498 }
499
500 #[test]
503 fn test_reader_segment_size_smaller_than_header() {
504 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
505 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
506 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
507 let mut vmclock_shm_file = OpenOptions::new()
508 .write(true)
509 .open(vmclock_shm_path)
510 .expect("open vmclock file failed");
511 let vmclock_content = VMClockContent {
512 magic: 0x4B4C4356,
513 size: 8_u32, version: 1_u16,
515 counter_id: 1_u8,
516 time_type: 0_u8,
517 seq_count: 10_u32,
518 disruption_marker: 888888_u64,
519 flags: 0_u64,
520 _padding: [0x00, 0x00],
521 clock_status: VMClockClockStatus::Synchronized,
522 leap_second_smearing_hint: 0_u8,
523 tai_offset_sec: 0_i16,
524 leap_indicator: 0_u8,
525 counter_period_shift: 0_u8,
526 counter_value: 123456_u64,
527 counter_period_frac_sec: 0_u64,
528 counter_period_esterror_rate_frac_sec: 0_u64,
529 counter_period_maxerror_rate_frac_sec: 0_u64,
530 time_sec: 0_u64,
531 time_frac_sec: 0_u64,
532 time_esterror_nanosec: 0_u64,
533 time_maxerror_nanosec: 0_u64,
534 };
535 write_vmclock_content(&mut vmclock_shm_file, &vmclock_content);
536
537 let expected = ShmError::SegmentMalformed;
538 match VMClockShmReader::new(&vmclock_shm_path) {
539 Err(actual) => assert_eq!(expected, actual),
540 _ => assert!(false),
541 }
542 }
543
544 #[test]
548 fn test_reader_segment_size_smaller_than_header_and_body() {
549 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
550 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
551 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
552 let mut vmclock_shm_file = OpenOptions::new()
553 .write(true)
554 .open(vmclock_shm_path)
555 .expect("open vmclock file failed");
556 let vmclock_content = VMClockContent {
557 magic: 0x4B4C4356,
558 size: 26_u32, version: 1_u16,
560 counter_id: 1_u8,
561 time_type: 0_u8,
562 seq_count: 10_u32,
563 disruption_marker: 888888_u64,
564 flags: 0_u64,
565 _padding: [0x00, 0x00],
566 clock_status: VMClockClockStatus::Synchronized,
567 leap_second_smearing_hint: 0_u8,
568 tai_offset_sec: 0_i16,
569 leap_indicator: 0_u8,
570 counter_period_shift: 0_u8,
571 counter_value: 123456_u64,
572 counter_period_frac_sec: 0_u64,
573 counter_period_esterror_rate_frac_sec: 0_u64,
574 counter_period_maxerror_rate_frac_sec: 0_u64,
575 time_sec: 0_u64,
576 time_frac_sec: 0_u64,
577 time_esterror_nanosec: 0_u64,
578 time_maxerror_nanosec: 0_u64,
579 };
580 write_vmclock_content(&mut vmclock_shm_file, &vmclock_content);
581
582 let expected = ShmError::SegmentMalformed;
583 match VMClockShmReader::new(&vmclock_shm_path) {
584 Err(actual) => assert_eq!(expected, actual),
585 _ => assert!(false),
586 }
587 }
588}