clock_bound_shm/reader.rs
1use errno::{errno, Errno};
2use std::ffi::{c_void, CStr};
3use std::mem::size_of;
4use std::ptr;
5use std::sync::atomic;
6
7use crate::shm_header::{ShmHeader, CLOCKBOUND_SHM_SUPPORTED_VERSION};
8use crate::{syserror, ClockErrorBound, ShmError};
9
10/// A guard tracking an open file descriptor.
11///
12/// Creating the FdGuard opens the file with read-only permission.
13/// The file descriptor is closed when the guard is dropped.
14struct FdGuard(i32);
15
16impl FdGuard {
17 /// Create a new FdGuard.
18 ///
19 /// Open a file at `path` and store the open file descriptor
20 fn new(path: &CStr) -> Result<Self, ShmError> {
21 // SAFETY: `path` is a valid C string.
22 let fd = unsafe { libc::open(path.as_ptr(), libc::O_RDONLY) };
23 if fd < 0 {
24 return syserror!(concat!("open"));
25 }
26
27 Ok(FdGuard(fd))
28 }
29}
30
31impl Drop for FdGuard {
32 /// Drop the FdGuard and close the file descriptor it holds.
33 fn drop(&mut self) {
34 // SAFETY: Unsafe because this is a call into a C API, but this particular
35 // call is always safe.
36 unsafe {
37 let ret = libc::close(self.0);
38 assert!(ret == 0 || errno() == Errno(libc::EINTR));
39 }
40 }
41}
42
43/// A guard tracking an memory mapped file.
44///
45/// Creating the MmapGuard maps an open file descriptor.
46/// The file is unmap'ed when the guard is dropped.
47#[derive(Debug)]
48struct MmapGuard {
49 /// A pointer to the head of the segment
50 segment: *mut c_void,
51
52 /// The size of the segment mapped into memory
53 segsize: usize,
54}
55
56impl MmapGuard {
57 /// Create a new MmapGuard.
58 ///
59 /// Map the open file descriptor held in the FdGuard.
60 fn new(fdguard: &FdGuard) -> Result<Self, ShmError> {
61 // Read the header so we know how much to map in memory.
62 let header = ShmHeader::read(fdguard.0)?;
63
64 // This consumes the segsize, but we only needed the header for validation and extracting
65 // the segment size. So the move is fine here.
66 let segsize = header.segsize.into_inner() as usize;
67
68 // SAFETY: We're calling into a C function, but this particular call is always safe.
69 let segment: *mut c_void = unsafe {
70 libc::mmap(
71 ptr::null_mut(),
72 segsize,
73 libc::PROT_READ,
74 libc::MAP_SHARED,
75 fdguard.0,
76 0,
77 )
78 };
79
80 if segment == libc::MAP_FAILED {
81 return syserror!("mmap SHM segment");
82 }
83
84 Ok(MmapGuard { segment, segsize })
85 }
86}
87
88impl Drop for MmapGuard {
89 /// Drop the MmapGuard and unmap the file it tracks.
90 fn drop(&mut self) {
91 // SAFETY: `segment` was previously returned from `mmap`, and therefore
92 // when this destructor runs there are no more live references into
93 // it.
94 unsafe {
95 let ret = libc::munmap(self.segment, self.segsize);
96 assert!(ret == 0);
97 }
98 }
99}
100
101/// Reader for ClockBound daemon shared memory segment.
102///
103/// The Clockbound daemon shared memory segment consists of a ShmHeader followed by a
104/// ClockBoundError struct. The segment is updated by a single producer (the clockbound daemon),
105/// but may be read by many clients. The shared memory segment does not implement a semaphore or
106/// equivalent to synchronize the single-producer / many-consumers processes. Instead, the
107/// mechanism is lock-free and relies on a `generation` number to ensure consistent reads (over
108/// retries).
109///
110/// The writer increments the generation field from even to odd before each update. It also
111/// increment it again, from odd to even, after finishing the update. Readers must check the
112/// `generation` field before and after each read, and verify that they obtain the same, even,
113/// value. Otherwise, the read was dirty and must be retried.
114#[derive(Debug)]
115pub struct ShmReader {
116 // Explicitly make the ShmReader be !Send and !Sync, since it is not thread safe. A bit ugly to
117 // use a phantom raw pointer, but effective and free at runtime.
118 _marker: std::marker::PhantomData<*const ()>,
119
120 // Drop guard to unmap the shared memory segment
121 _guard: MmapGuard,
122
123 // A raw pointer into the shared memory segment, pointing to the version member of the ShmHeader
124 // section. The version number defines the shared memory segment content and layout. This is a
125 // bit less flexible than a series of TLV but simpler (and not mutually exclusive).
126 version: *const atomic::AtomicU16,
127
128 // A raw pointer into the shared memory segment, pointing to the generation member of the
129 // ShmHeader section. The generation number is used to read consistent snapshots of the shared
130 // memory segment (that is outside of an update event by the writer). This is expected to roll
131 // over as a function of the rate of update from the writer (eg. every ~9 hours if updating
132 // once a second). A generation number equals to 0 signals the shared memory segment has not
133 // been initialized.
134 generation: *const atomic::AtomicU16,
135
136 // A raw pointer into the shared memory segment, pointing to the ClockErrorBound section. Note
137 // that the structured reference by this pointer may not be consistent, and reading it requires
138 // to assert the generation value.
139 ceb_shm: *const ClockErrorBound,
140
141 // The last snapshot of ClockErrorBound taken. This acts as a cache to avoid waiting for the
142 // writer to complete an update and allow to share a reference to this memory location
143 // (avoiding some memory copy). Keeping a state here and sharing it with the caller makes the
144 // ShmReader not thread safe.
145 snapshot_ceb: ClockErrorBound,
146
147 // The value of the writer generation when the ceb snapshot was taken.
148 snapshot_gen: u16,
149}
150
151impl ShmReader {
152 /// Open a ClockBound shared memory segment for reading.
153 ///
154 /// On error, returns an appropriate `Errno`. If the content of the segment
155 /// is uninitialized, unparseable, or otherwise malformed, EPROTO will be
156 /// returned.
157 pub fn new(path: &CStr) -> Result<ShmReader, ShmError> {
158 let fdguard = FdGuard::new(path)?;
159 let mmap_guard = MmapGuard::new(&fdguard)?;
160
161 // Create a cursor to pick the addresses of the various elements of interest in the shared
162 // memory segment.
163 let mut cursor: *const u8 = mmap_guard.segment.cast();
164
165 // Pick fields from the ShmHeader
166 // SAFETY: `cursor` is aligned to the start of the memory segment and the MmapGuard has
167 // validated the memory segment is large enough to contain the header.
168 let version = unsafe { ptr::addr_of!((*cursor.cast::<ShmHeader>()).version) };
169 let generation = unsafe { ptr::addr_of!((*cursor.cast::<ShmHeader>()).generation) };
170
171 // Move to the end of the header and map the ClockErrorBound data, but only if the segment
172 // size allows it and matches our expectation.
173 if mmap_guard.segsize < size_of::<ShmHeader>() + size_of::<ClockErrorBound>() {
174 return Err(ShmError::SegmentMalformed);
175 }
176
177 // SAFETY: segment size has been checked to ensure `cursor` move leads to a valid cast
178 cursor = unsafe { cursor.add(size_of::<ShmHeader>()) };
179 let ceb_shm = unsafe { ptr::addr_of!(*cursor.cast::<ClockErrorBound>()) };
180
181 Ok(ShmReader {
182 _marker: std::marker::PhantomData,
183 _guard: mmap_guard,
184 version,
185 generation,
186 ceb_shm,
187 snapshot_ceb: ClockErrorBound::default(),
188 snapshot_gen: 0,
189 })
190 }
191
192 /// Return a consistent snapshot of the shared memory segment.
193 ///
194 /// Taking a snapshot consists in reading the memory segment while confirming the generation
195 /// number in the header has not changed (which would indicate an update from the writer
196 /// occurred while reading). If an update is detected, the read is retried.
197 ///
198 /// This function returns a reference to the ClockErrorBound snapshot stored by the reader, and
199 /// not an owned value. This make the ShmReader NOT thread-safe: the data pointed to could be
200 /// updated without one of the thread knowing, leading to a incorrect clock error bond. The
201 /// advantage are in terms of performance: less data copied, but also no locking, yielding or
202 /// excessive retries.
203 pub fn snapshot(&mut self) -> Result<&ClockErrorBound, ShmError> {
204 // Atomically read the current version in the shared memory segment
205 // SAFETY: `self.version` has been validated when creating the reader
206 let version = unsafe { &*self.version };
207 let version = version.load(atomic::Ordering::Acquire);
208
209 // The version number is checked when the reader is created to not be 0. If we now see a
210 // version equal to 0, the writer has restarted, wiped the segment clean, but has not
211 // defined the layout yet. Choose to return the last snapshot. If the writer died in the
212 // middle of restarting, the snapshot will eventually be stale. Enough information is
213 // returned to the caller to take appropriate action (e.g. assert clock status).
214 if version == 0 {
215 return Ok(&self.snapshot_ceb);
216 } else if version != CLOCKBOUND_SHM_SUPPORTED_VERSION {
217 eprintln!("ClockBound shared memory segment has version {:?} which is not supported by this software.", version);
218 return Err(ShmError::SegmentVersionNotSupported);
219 }
220
221 // Atomically read the current generation in the shared memory segment
222 // SAFETY: `self.generation` has been validated when creating the reader
223 let generation = unsafe { &*self.generation };
224 let mut first_gen = generation.load(atomic::Ordering::Acquire);
225
226 // The generation number is checked when the reader is created to not be 0. If we now see a
227 // generation equals to 0, the writer has restarted, wiped the segment clean, but has not
228 // initialized it with valid data yet. Choose to return the last snapshot. If the writer
229 // died in the middle of restarting, the snapshot will eventually be stale. Enough
230 // information is returned to the caller to take appropriate action (e.g. assert clock
231 // status).
232 if first_gen == 0 {
233 return Ok(&self.snapshot_ceb);
234 }
235
236 // Quick optimization, if the generation number matches the last one recorded, the shared
237 // memory segment has not been updated since last read. No need to read more of the memory
238 // segment, instead return the reference to the snapshot. This is useful in cases where the
239 // rate of clockbound read is much higher than the rate of write to the shared memory
240 // segment.
241 //
242 // Note that the generation number DOES roll over, but never take a value of 0 once the
243 // segment is initialized. It is still possible that the generation number matches although
244 // the counter has rolled over. Assuming one update per sec, this leaves a collision
245 // probability of 1 / 2^16, and a rollover once every 18 hours. Although the risk is very
246 // small it exists, but the `void_after` member on the ClockErrorBound struct can be used
247 // to provide an additional layer of protection.
248 if first_gen == self.snapshot_gen {
249 return Ok(&self.snapshot_ceb);
250 }
251
252 // If the generation is an odd number, the shared memory segment is in the process of being
253 // updated by the writer. Instead of waiting, yielding or busy looping, simply return the
254 // last snapshot taken. It is fine for the reader to return a bound on clock error based on
255 // the previously updated shared memory segment. The bound on clock error returned would be
256 // larger than it could have been, but still correct. If the writer died in the middle of
257 // an update, the snapshot will eventually be stale. The caller is returned enough
258 // information to act accordingly.
259 if first_gen & 0x0001 == 1 {
260 return Ok(&self.snapshot_ceb);
261 }
262
263 // The generation number is an even number, and has changed since the last snapshot. Loop
264 // until we obtain a consistent read of the clock error bound data. This relies on reading
265 // the generation value twice, making sure they are identical and an even number.
266 //
267 // The writer could die in the middle of the update. This could lead to not making any
268 // progress hence capping the number of retries.
269 let mut retries = 1_000_000;
270 while retries > 0 {
271 // Read the ClockErrorBound data from the shared memory
272 // SAFETY: `ceb_at` has been checked to be valid while creating the ShmReader
273 let snapshot = unsafe { self.ceb_shm.read_volatile() };
274
275 // Confirm no update occurred during the read
276 let second_gen = generation.load(atomic::Ordering::Acquire);
277 if first_gen == second_gen {
278 self.snapshot_gen = first_gen;
279 self.snapshot_ceb = snapshot;
280 return Ok(&self.snapshot_ceb);
281 } else {
282 // Only track complete updates indicated by an even generation number.
283 if second_gen & 0x0001 == 0 {
284 first_gen = second_gen;
285 }
286 }
287 retries -= 1;
288 }
289
290 // Attempts to read the snapshot have failed.
291 Err(ShmError::SegmentNotInitialized)
292 }
293}
294
295#[cfg(test)]
296mod t_reader {
297 use super::*;
298 use crate::ClockStatus;
299 use byteorder::{NativeEndian, WriteBytesExt};
300 use nix::sys::time::TimeSpec;
301 use std::ffi::CString;
302 use std::fs::OpenOptions;
303 use std::io::Seek;
304 use std::io::Write;
305 /// We make use of tempfile::NamedTempFile to ensure that
306 /// local files that are created during a test get removed
307 /// afterwards.
308 use tempfile::NamedTempFile;
309
310 macro_rules! write_memory_segment {
311 ($file:ident,
312 $magic_0:literal,
313 $magic_1:literal,
314 $segsize:literal,
315 $version:literal,
316 $generation:literal,
317 ($as_of_sec:literal, $as_of_nsec:literal),
318 ($void_after_sec:literal, $void_after_nsec:literal),
319 $bound_nsec:literal,
320 $max_drift: literal) => {
321 // Build the bound on clock error data
322 let ceb = ClockErrorBound::new(
323 TimeSpec::new($as_of_sec, $as_of_nsec), // as_of
324 TimeSpec::new($void_after_sec, $void_after_nsec), // void_after
325 $bound_nsec, // bound_nsec
326 0, // disruption_marker
327 $max_drift, // max_drift_ppb
328 ClockStatus::Synchronized, // clock_status
329 true, // clock_disruption_support_enabled
330 );
331
332 // Convert the ceb struct into a slice so we can write it all out, fairly magic.
333 // Definitely needs the #[repr(C)] layout.
334 let slice = unsafe {
335 ::core::slice::from_raw_parts(
336 (&ceb as *const ClockErrorBound) as *const u8,
337 ::core::mem::size_of::<ClockErrorBound>(),
338 )
339 };
340
341 $file
342 .write_u32::<NativeEndian>($magic_0)
343 .expect("Write failed magic_0");
344 $file
345 .write_u32::<NativeEndian>($magic_1)
346 .expect("Write failed magic_1");
347 $file
348 .write_u32::<NativeEndian>($segsize)
349 .expect("Write failed segsize");
350 $file
351 .write_u16::<NativeEndian>($version)
352 .expect("Write failed version");
353 $file
354 .write_u16::<NativeEndian>($generation)
355 .expect("Write failed generation");
356 $file
357 .write_all(slice)
358 .expect("Write failed ClockErrorBound");
359 $file.sync_all().expect("Sync to disk failed");
360 };
361 }
362
363 /// Assert that the reader can map a file.
364 #[test]
365 fn test_reader_new() {
366 let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
367 let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
368 let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
369 let mut clockbound_shm_file = OpenOptions::new()
370 .write(true)
371 .open(clockbound_shm_path)
372 .expect("open clockbound file failed");
373 write_memory_segment!(
374 clockbound_shm_file,
375 0x414D5A4E,
376 0x43420200,
377 400,
378 2,
379 10,
380 (0, 0),
381 (0, 0),
382 123,
383 0
384 );
385
386 let path = CString::new(clockbound_shm_path).expect("CString failed");
387 let reader = ShmReader::new(&path).expect("Failed to create ShmReader");
388
389 let version = unsafe { &*reader.version };
390 let generation = unsafe { &*reader.generation };
391 let ceb = unsafe { *reader.ceb_shm };
392
393 assert_eq!(version.load(atomic::Ordering::Relaxed), 2);
394 assert_eq!(generation.load(atomic::Ordering::Relaxed), 10);
395 assert_eq!(ceb.bound_nsec, 123);
396 }
397
398 /// Assert that creating a reader when the
399 /// shared memory segment has an unsupported version causes an Err result.
400 #[test]
401 fn test_reader_new_of_unsupported_shm_version() {
402 let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
403 let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
404 let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
405 let mut clockbound_shm_file = OpenOptions::new()
406 .write(true)
407 .open(clockbound_shm_path)
408 .expect("open clockbound file failed");
409 write_memory_segment!(
410 clockbound_shm_file,
411 0x414D5A4E,
412 0x43420200,
413 400,
414 9999,
415 10,
416 (0, 0),
417 (0, 0),
418 123,
419 0
420 );
421
422 let path = CString::new(clockbound_shm_path).expect("CString failed");
423 let result = ShmReader::new(&path);
424
425 // Assert that creating a reader on an unsupported shared memory segment version
426 // returns Err(ShmError::SegmentVersionNotSupported).
427 assert!(result.is_err());
428 assert_eq!(result.unwrap_err(), ShmError::SegmentVersionNotSupported);
429 }
430
431 /// Assert that creating a reader and taking a snapshot when the
432 /// shared memory segment has an unsupported version causes an Err result.
433 #[test]
434 fn test_reader_snapshot_of_unsupported_shm_version() {
435 let clockbound_shm_tempfile = NamedTempFile::new().expect("create clockbound file failed");
436 let clockbound_shm_temppath = clockbound_shm_tempfile.into_temp_path();
437 let clockbound_shm_path = clockbound_shm_temppath.to_str().unwrap();
438 let mut clockbound_shm_file = OpenOptions::new()
439 .write(true)
440 .open(clockbound_shm_path)
441 .expect("open clockbound file failed");
442 // Initially, write the current supported version.
443 write_memory_segment!(
444 clockbound_shm_file,
445 0x414D5A4E,
446 0x43420200,
447 400,
448 2,
449 10,
450 (0, 0),
451 (0, 0),
452 123,
453 0
454 );
455
456 let path = CString::new(clockbound_shm_path).expect("CString failed");
457 let mut reader = ShmReader::new(&path).expect("Failed to create ShmReader");
458 let version = unsafe { &*reader.version };
459 assert_eq!(version.load(atomic::Ordering::Relaxed), 2);
460
461 // Assert that snapshot works without an error with this supported version.
462 let result = reader.snapshot();
463 assert!(result.is_ok());
464
465 // Update the shared memory segment so that the version number is an
466 // unsupported number (e.g. 9999).
467 let _ = clockbound_shm_file.rewind();
468 write_memory_segment!(
469 clockbound_shm_file,
470 0x414D5A4E,
471 0x43420200,
472 400,
473 9999,
474 10,
475 (0, 0),
476 (0, 0),
477 123,
478 0
479 );
480
481 let version = unsafe { &*reader.version };
482 assert_eq!(version.load(atomic::Ordering::Relaxed), 9999);
483
484 // Assert that taking a snapshot of an unsupported shared memory segment version
485 // returns Err(ShmError::SegmentVersionNotSupported).
486 let result = reader.snapshot();
487 assert!(result.is_err());
488 assert_eq!(result.unwrap_err(), ShmError::SegmentVersionNotSupported);
489 }
490}