clock_bound_shm/
reader.rs

1use errno::{errno, Errno};
2use std::ffi::{c_void, CStr};
3use std::mem::size_of;
4use std::ptr;
5use std::sync::atomic;
6
7use crate::shm_header::{ShmHeader, CLOCKBOUND_SHM_SUPPORTED_VERSION};
8use crate::{syserror, ClockErrorBound, ShmError};
9
10/// A guard tracking an open file descriptor.
11///
12/// Creating the FdGuard opens the file with read-only permission.
13/// The file descriptor is closed when the guard is dropped.
14struct FdGuard(i32);
15
16impl FdGuard {
17    /// Create a new FdGuard.
18    ///
19    /// Open a file at `path` and store the open file descriptor
20    fn new(path: &CStr) -> Result<Self, ShmError> {
21        // SAFETY: `path` is a valid C string.
22        let fd = unsafe { libc::open(path.as_ptr(), libc::O_RDONLY) };
23        if fd < 0 {
24            return syserror!(concat!("open"));
25        }
26
27        Ok(FdGuard(fd))
28    }
29}
30
31impl Drop for FdGuard {
32    /// Drop the FdGuard and close the file descriptor it holds.
33    fn drop(&mut self) {
34        // SAFETY: Unsafe because this is a call into a C API, but this particular
35        // call is always safe.
36        unsafe {
37            let ret = libc::close(self.0);
38            assert!(ret == 0 || errno() == Errno(libc::EINTR));
39        }
40    }
41}
42
43/// A guard tracking an memory mapped file.
44///
45/// Creating the MmapGuard maps an open file descriptor.
46/// The file is unmap'ed when the guard is dropped.
47#[derive(Debug)]
48struct MmapGuard {
49    /// A pointer to the head of the segment
50    segment: *mut c_void,
51
52    /// The size of the segment mapped into memory
53    segsize: usize,
54}
55
56impl MmapGuard {
57    /// Create a new MmapGuard.
58    ///
59    /// Map the open file descriptor held in the FdGuard.
60    fn new(fdguard: &FdGuard) -> Result<Self, ShmError> {
61        // Read the header so we know how much to map in memory.
62        let header = ShmHeader::read(fdguard.0)?;
63
64        // This consumes the segsize, but we only needed the header for validation and extracting
65        // the segment size. So the move is fine here.
66        let segsize = header.segsize.into_inner() as usize;
67
68        // SAFETY: We're calling into a C function, but this particular call is always safe.
69        let segment: *mut c_void = unsafe {
70            libc::mmap(
71                ptr::null_mut(),
72                segsize,
73                libc::PROT_READ,
74                libc::MAP_SHARED,
75                fdguard.0,
76                0,
77            )
78        };
79
80        if segment == libc::MAP_FAILED {
81            return syserror!("mmap SHM segment");
82        }
83
84        Ok(MmapGuard { segment, segsize })
85    }
86}
87
88impl Drop for MmapGuard {
89    /// Drop the MmapGuard and unmap the file it tracks.
90    fn drop(&mut self) {
91        // SAFETY: `segment` was previously returned from `mmap`, and therefore
92        // when this destructor runs there are no more live references into
93        // it.
94        unsafe {
95            let ret = libc::munmap(self.segment, self.segsize);
96            assert!(ret == 0);
97        }
98    }
99}
100
101/// Reader for ClockBound daemon shared memory segment.
102///
103/// The Clockbound daemon shared memory segment consists of a ShmHeader followed by a
104/// ClockBoundError struct. The segment is updated by a single producer (the clockbound daemon),
105/// but may be read by many clients.  The shared memory segment does not implement a semaphore or
106/// equivalent to synchronize the single-producer / many-consumers processes. Instead, the
107/// mechanism is lock-free and relies on a `generation` number to ensure consistent reads (over
108/// retries).
109///
110/// The writer increments the generation field from even to odd before each update. It also
111/// increment it again, from odd to even, after finishing the update. Readers must check the
112/// `generation` field before and after each read, and verify that they obtain the same, even,
113/// value. Otherwise, the read was dirty and must be retried.
114#[derive(Debug)]
115pub struct ShmReader {
116    // Explicitly make the ShmReader be !Send and !Sync, since it is not thread safe. A bit ugly to
117    // use a phantom raw pointer, but effective and free at runtime.
118    _marker: std::marker::PhantomData<*const ()>,
119
120    // Drop guard to unmap the shared memory segment
121    _guard: MmapGuard,
122
123    // A raw pointer into the shared memory segment, pointing to the version member of the ShmHeader
124    // section. The version number defines the shared memory segment content and layout. This is a
125    // bit less flexible than a series of TLV but simpler (and not mutually exclusive).
126    version: *const atomic::AtomicU16,
127
128    // A raw pointer into the shared memory segment, pointing to the generation member of the
129    // ShmHeader section. The generation number is used to read consistent snapshots of the shared
130    // memory segment (that is outside of an update event by the writer). This is expected to roll
131    // over as a function of the rate of update from the writer (eg. every ~9 hours if updating
132    // once a second). A generation number equals to 0 signals the shared memory segment has not
133    // been initialized.
134    generation: *const atomic::AtomicU16,
135
136    // A raw pointer into the shared memory segment, pointing to the ClockErrorBound section. Note
137    // that the structured reference by this pointer may not be consistent, and reading it requires
138    // to assert the generation value.
139    ceb_shm: *const ClockErrorBound,
140
141    // The last snapshot of ClockErrorBound taken. This acts as a cache to avoid waiting for the
142    // writer to complete an update and allow to share a reference to this memory location
143    // (avoiding some memory copy). Keeping a state here and sharing it with the caller makes the
144    // ShmReader not thread safe.
145    snapshot_ceb: ClockErrorBound,
146
147    // The value of the writer generation when the ceb snapshot was taken.
148    snapshot_gen: u16,
149}
150
151impl ShmReader {
152    /// Open a ClockBound shared memory segment for reading.
153    ///
154    /// On error, returns an appropriate `Errno`. If the content of the segment
155    /// is uninitialized, unparseable, or otherwise malformed, EPROTO will be
156    /// returned.
157    pub fn new(path: &CStr) -> Result<ShmReader, ShmError> {
158        let fdguard = FdGuard::new(path)?;
159        let mmap_guard = MmapGuard::new(&fdguard)?;
160
161        // Create a cursor to pick the addresses of the various elements of interest in the shared
162        // memory segment.
163        let mut cursor: *const u8 = mmap_guard.segment.cast();
164
165        // Pick fields from the ShmHeader
166        // SAFETY: `cursor` is aligned to the start of the memory segment and the MmapGuard has
167        // validated the memory segment is large enough to contain the header.
168        let version = unsafe { ptr::addr_of!((*cursor.cast::<ShmHeader>()).version) };
169        let generation = unsafe { ptr::addr_of!((*cursor.cast::<ShmHeader>()).generation) };
170
171        // Move to the end of the header and map the ClockErrorBound data, but only if the segment
172        // size allows it and matches our expectation.
173        if mmap_guard.segsize < size_of::<ShmHeader>() + size_of::<ClockErrorBound>() {
174            return Err(ShmError::SegmentMalformed);
175        }
176
177        // SAFETY: segment size has been checked to ensure `cursor` move leads to a valid cast
178        cursor = unsafe { cursor.add(size_of::<ShmHeader>()) };
179        let ceb_shm = unsafe { ptr::addr_of!(*cursor.cast::<ClockErrorBound>()) };
180
181        Ok(ShmReader {
182            _marker: std::marker::PhantomData,
183            _guard: mmap_guard,
184            version,
185            generation,
186            ceb_shm,
187            snapshot_ceb: ClockErrorBound::default(),
188            snapshot_gen: 0,
189        })
190    }
191
192    /// Return a consistent snapshot of the shared memory segment.
193    ///
194    /// Taking a snapshot consists in reading the memory segment while confirming the generation
195    /// number in the header has not changed (which would indicate an update from the writer
196    /// occurred while reading). If an update is detected, the read is retried.
197    ///
198    /// This function returns a reference to the ClockErrorBound snapshot stored by the reader, and
199    /// not an owned value. This make the ShmReader NOT thread-safe: the data pointed to could be
200    /// updated without one of the thread knowing, leading to a incorrect clock error bond. The
201    /// advantage are in terms of performance: less data copied, but also no locking, yielding or
202    /// excessive retries.
203    pub fn snapshot(&mut self) -> Result<&ClockErrorBound, ShmError> {
204        // Atomically read the current version in the shared memory segment
205        // SAFETY: `self.version` has been validated when creating the reader
206        let version = unsafe { &*self.version };
207        let version = version.load(atomic::Ordering::Acquire);
208
209        // The version number is checked when the reader is created to not be 0. If we now see a
210        // version equal to 0, the writer has restarted, wiped the segment clean, but has not
211        // defined the layout yet. Choose to return the last snapshot. If the writer died in the
212        // middle of restarting, the snapshot will eventually be stale. Enough information is
213        // returned to the caller to take appropriate action (e.g. assert clock status).
214        if version == 0 {
215            return Ok(&self.snapshot_ceb);
216        } else if version != CLOCKBOUND_SHM_SUPPORTED_VERSION {
217            eprintln!("ClockBound shared memory segment has version {:?} which is not supported by this software.", version);
218            return Err(ShmError::SegmentVersionNotSupported);
219        }
220
221        // Atomically read the current generation in the shared memory segment
222        // SAFETY: `self.generation` has been validated when creating the reader
223        let generation = unsafe { &*self.generation };
224        let mut first_gen = generation.load(atomic::Ordering::Acquire);
225
226        // The generation number is checked when the reader is created to not be 0. If we now see a
227        // generation equals to 0, the writer has restarted, wiped the segment clean, but has not
228        // initialized it with valid data yet. Choose to return the last snapshot. If the writer
229        // died in the middle of restarting, the snapshot will eventually be stale. Enough
230        // information is returned to the caller to take appropriate action (e.g. assert clock
231        // status).
232        if first_gen == 0 {
233            return Ok(&self.snapshot_ceb);
234        }
235
236        // Quick optimization, if the generation number matches the last one recorded, the shared
237        // memory segment has not been updated since last read. No need to read more of the memory
238        // segment, instead return the reference to the snapshot. This is useful in cases where the
239        // rate of clockbound read is much higher than the rate of write to the shared memory
240        // segment.
241        //
242        // Note that the generation number DOES roll over, but never take a value of 0 once the
243        // segment is initialized. It is still possible that the generation number matches although
244        // the counter has rolled over. Assuming one update per sec, this leaves a collision
245        // probability of 1 / 2^16, and a rollover once every 18 hours. Although the risk is very
246        // small it exists, but the `void_after` member on the ClockErrorBound struct can be used
247        // to provide an additional layer of protection.
248        if first_gen == self.snapshot_gen {
249            return Ok(&self.snapshot_ceb);
250        }
251
252        // If the generation is an odd number, the shared memory segment is in the process of being
253        // updated by the writer. Instead of waiting, yielding or busy looping, simply return the
254        // last snapshot taken. It is fine for the reader to return a bound on clock error based on
255        // the previously updated shared memory segment. The bound on clock error returned would be
256        // larger than it could have been, but still correct. If the writer died in the middle of
257        // an update, the snapshot will eventually be stale. The caller is returned enough
258        // information to act accordingly.
259        if first_gen & 0x0001 == 1 {
260            return Ok(&self.snapshot_ceb);
261        }
262
263        // The generation number is an even number, and has changed since the last snapshot. Loop
264        // until we obtain a consistent read of the clock error bound data. This relies on reading
265        // the generation value twice, making sure they are identical and an even number.
266        //
267        // The writer could die in the middle of the update. This could lead to not making any
268        // progress hence capping the number of retries.
269        let mut retries = 1_000_000;
270        while retries > 0 {
271            // Read the ClockErrorBound data from the shared memory
272            // SAFETY: `ceb_at` has been checked to be valid while creating the ShmReader
273            let snapshot = unsafe { self.ceb_shm.read_volatile() };
274
275            // Confirm no update occurred during the read
276            let second_gen = generation.load(atomic::Ordering::Acquire);
277            if first_gen == second_gen {
278                self.snapshot_gen = first_gen;
279                self.snapshot_ceb = snapshot;
280                return Ok(&self.snapshot_ceb);
281            } else {
282                // Only track complete updates indicated by an even generation number.
283                if second_gen & 0x0001 == 0 {
284                    first_gen = second_gen;
285                }
286            }
287            retries -= 1;
288        }
289
290        // Attempts to read the snapshot have failed.
291        Err(ShmError::SegmentNotInitialized)
292    }
293}
294
295#[cfg(test)]
296mod t_reader {
297    use super::*;
298    use crate::ClockStatus;
299    use byteorder::{NativeEndian, WriteBytesExt};
300    use nix::sys::time::TimeSpec;
301    use std::ffi::CString;
302    use std::fs::OpenOptions;
303    use std::io::Seek;
304    use std::io::Write;
305    /// We make use of tempfile::NamedTempFile to ensure that
306    /// local files that are created during a test get removed
307    /// afterwards.
308    use tempfile::NamedTempFile;
309
310    macro_rules! write_memory_segment {
311        ($file:ident,
312         $magic_0:literal,
313         $magic_1:literal,
314         $segsize:literal,
315         $version:literal,
316         $generation:literal,
317         ($as_of_sec:literal, $as_of_nsec:literal),
318         ($void_after_sec:literal, $void_after_nsec:literal),
319         $bound_nsec:literal,
320         $max_drift: literal) => {
321            // Build the bound on clock error data
322            let ceb = ClockErrorBound::new(
323                TimeSpec::new($as_of_sec, $as_of_nsec), // as_of
324                TimeSpec::new($void_after_sec, $void_after_nsec), // void_after
325                $bound_nsec,                            // bound_nsec
326                0,                                      // disruption_marker
327                $max_drift,                             // max_drift_ppb
328                ClockStatus::Synchronized,              // clock_status
329                true,                                   // clock_disruption_support_enabled
330            );
331
332            // Convert the ceb struct into a slice so we can write it all out, fairly magic.
333            // Definitely needs the #[repr(C)] layout.
334            let slice = unsafe {
335                ::core::slice::from_raw_parts(
336                    (&ceb as *const ClockErrorBound) as *const u8,
337                    ::core::mem::size_of::<ClockErrorBound>(),
338                )
339            };
340
341            $file
342                .write_u32::<NativeEndian>($magic_0)
343                .expect("Write failed magic_0");
344            $file
345                .write_u32::<NativeEndian>($magic_1)
346                .expect("Write failed magic_1");
347            $file
348                .write_u32::<NativeEndian>($segsize)
349                .expect("Write failed segsize");
350            $file
351                .write_u16::<NativeEndian>($version)
352                .expect("Write failed version");
353            $file
354                .write_u16::<NativeEndian>($generation)
355                .expect("Write failed generation");
356            $file
357                .write_all(slice)
358                .expect("Write failed ClockErrorBound");
359            $file.sync_all().expect("Sync to disk failed");
360        };
361    }
362
363    /// Assert that the reader can map a file.
364    #[test]
365    fn test_reader_new() {
366        let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
367        let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
368        let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
369        let mut clockbound_shm_file = OpenOptions::new()
370            .write(true)
371            .open(clockbound_shm_path)
372            .expect("open clockbound file failed");
373        write_memory_segment!(
374            clockbound_shm_file,
375            0x414D5A4E,
376            0x43420200,
377            400,
378            2,
379            10,
380            (0, 0),
381            (0, 0),
382            123,
383            0
384        );
385
386        let path = CString::new(clockbound_shm_path).expect("CString failed");
387        let reader = ShmReader::new(&path).expect("Failed to create ShmReader");
388
389        let version = unsafe { &*reader.version };
390        let generation = unsafe { &*reader.generation };
391        let ceb = unsafe { *reader.ceb_shm };
392
393        assert_eq!(version.load(atomic::Ordering::Relaxed), 2);
394        assert_eq!(generation.load(atomic::Ordering::Relaxed), 10);
395        assert_eq!(ceb.bound_nsec, 123);
396    }
397
398    /// Assert that creating a reader when the
399    /// shared memory segment has an unsupported version causes an Err result.
400    #[test]
401    fn test_reader_new_of_unsupported_shm_version() {
402        let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
403        let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
404        let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
405        let mut clockbound_shm_file = OpenOptions::new()
406            .write(true)
407            .open(clockbound_shm_path)
408            .expect("open clockbound file failed");
409        write_memory_segment!(
410            clockbound_shm_file,
411            0x414D5A4E,
412            0x43420200,
413            400,
414            9999,
415            10,
416            (0, 0),
417            (0, 0),
418            123,
419            0
420        );
421
422        let path = CString::new(clockbound_shm_path).expect("CString failed");
423        let result = ShmReader::new(&path);
424
425        // Assert that creating a reader on an unsupported shared memory segment version
426        // returns Err(ShmError::SegmentVersionNotSupported).
427        assert!(result.is_err());
428        assert_eq!(result.unwrap_err(), ShmError::SegmentVersionNotSupported);
429    }
430
431    /// Assert that creating a reader and taking a snapshot when the
432    /// shared memory segment has an unsupported version causes an Err result.
433    #[test]
434    fn test_reader_snapshot_of_unsupported_shm_version() {
435        let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
436        let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
437        let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
438        let mut clockbound_shm_file = OpenOptions::new()
439            .write(true)
440            .open(clockbound_shm_path)
441            .expect("open clockbound file failed");
442        // Initially, write the current supported version.
443        write_memory_segment!(
444            clockbound_shm_file,
445            0x414D5A4E,
446            0x43420200,
447            400,
448            2,
449            10,
450            (0, 0),
451            (0, 0),
452            123,
453            0
454        );
455
456        let path = CString::new(clockbound_shm_path).expect("CString failed");
457        let mut reader = ShmReader::new(&path).expect("Failed to create ShmReader");
458        let version = unsafe { &*reader.version };
459        assert_eq!(version.load(atomic::Ordering::Relaxed), 2);
460
461        // Assert that snapshot works without an error with this supported version.
462        let result = reader.snapshot();
463        assert!(result.is_ok());
464
465        // Update the shared memory segment so that the version number is an
466        // unsupported number (e.g. 9999).
467        let _ = clockbound_shm_file.rewind();
468        write_memory_segment!(
469            clockbound_shm_file,
470            0x414D5A4E,
471            0x43420200,
472            400,
473            9999,
474            10,
475            (0, 0),
476            (0, 0),
477            123,
478            0
479        );
480
481        let version = unsafe { &*reader.version };
482        assert_eq!(version.load(atomic::Ordering::Relaxed), 9999);
483
484        // Assert that taking a snapshot of an unsupported shared memory segment version
485        // returns Err(ShmError::SegmentVersionNotSupported).
486        let result = reader.snapshot();
487        assert!(result.is_err());
488        assert_eq!(result.unwrap_err(), ShmError::SegmentVersionNotSupported);
489    }
490}