clock_bound_vmclock/
shm_writer.rs

1use byteorder::{LittleEndian, WriteBytesExt};
2use std::ffi::c_void;
3use std::io::Seek;
4use std::io::Write;
5use std::io::{Error, ErrorKind};
6use std::mem::size_of;
7use std::path::Path;
8use std::sync::atomic;
9use std::{fs, ptr};
10
11use tracing::debug;
12
13use crate::shm::{VMClockShmBody, VMClockShmHeader, VMCLOCK_SHM_MAGIC};
14use crate::shm_reader::VMClockShmReader;
15use clock_bound_shm::ShmError;
16
17/// Trait that a writer to the shared memory segment has to implement.
18pub trait VMClockShmWrite {
19    /// Update the shared memory segment with updated clock error bound data.
20    fn write(&mut self, vmclock_shm_body: &VMClockShmBody);
21}
22
23/// Writer to the VMClock shared memory segment.
24///
25/// This writer is expected to be used by a single process writing to a given path. The file
26/// written to is memory mapped by the writer and many (read-only) readers. Updates to the memory
27/// segment are applied in a lock-free manner, using a rolling seq_count number to protect the
28/// update section.
29#[derive(Debug)]
30pub struct VMClockShmWriter {
31    /// The size of the segment mapped in memory
32    segsize: usize,
33
34    /// A raw pointer keeping the address of the segment mapped in memory
35    addr: *mut c_void,
36
37    /// A raw pointer to the version member of the VMClockShmHeader mapped in memory. The version number
38    /// identifies the layout of the rest of the segment. A value of 0 indicates the memory segment
39    /// is not initialized / not usable.
40    version_ptr: *mut atomic::AtomicU16,
41
42    /// A raw pointer to the sequence count member of the
43    /// VMClockShmHeader mapped in memory. The sequence count number is updated by the writer
44    /// before and after updating the content mapped in memory.
45    seq_count_ptr: *mut atomic::AtomicU32,
46
47    /// A raw pointer to the VMClockShmBody data mapped in memory. This structure follows the
48    /// VMClockShmHeader and contains the information required to compute a bound on clock error.
49    vmclock_shm_body: *mut VMClockShmBody,
50}
51
52impl VMClockShmWriter {
53    /// Create a new VMClockShmWriter referencing the memory segment to write VMClockShmBody data to.
54    ///
55    /// There are several cases to consider:
56    /// 1. The file backing the memory segment does not exist, or the content is corrupted/wrong.
57    ///    This is a cold start-like scenario, creating a fresh memory mapped file.
58    /// 2. The file backing the memory segment already exist and is valid. This may be that the
59    ///    process using this writer has restarted, but clients may still be using the existing
60    ///    values. Here, we want to load the existing memory segment, and continue as if nothing
61    ///    happened. That's a warm reboot-like scenario.
62    /// 3. A variation of 2., but where the layout is being changed (a version bump). This is
63    ///    analog to a cold boot.
64    ///
65    /// TODO: implement scenario 3 once the readers support a version bump.
66    pub fn new(path: &Path) -> std::io::Result<VMClockShmWriter> {
67        // Determine the size of the segment.
68        let segsize = VMClockShmWriter::segment_size();
69
70        // Use the VMClockShmReader to assert the state of the segment. If the segment does not exist or
71        // cannot be read correctly, wipe it clean. Note that there is a strong assumption here
72        // that there is only one writer running on the system and writing to `path`. Consequently,
73        // it is safe to wipe clean and then update. No-one will attempt to write to the segment
74        // even if this process is scheduled out.
75        if VMClockShmWriter::is_usable_segment(path).is_err() {
76            // Note that wiping the file sets the version to 0, which is used to indicate the
77            // readers that the memory segment is not usable yet.
78            VMClockShmWriter::wipe(path, segsize)?
79        }
80
81        // Memory map the file.
82        let addr = VMClockShmWriter::mmap_segment_at(path, segsize)?;
83
84        // Obtain raw pointers to relevant members in the memory map segment and create a new
85        // writer.
86        // SAFETY: segment has been validated to be usable, can map pointers.
87        //
88        let (version_ptr, seq_count_ptr, vmclock_shm_body) = unsafe {
89            let cursor: *mut u8 = addr.cast();
90            let version_ptr = ptr::addr_of_mut!((*cursor.cast::<VMClockShmHeader>()).version);
91            let seq_count_ptr = ptr::addr_of_mut!((*cursor.cast::<VMClockShmHeader>()).seq_count);
92            let vmclock_shm_body: *mut VMClockShmBody =
93                addr.add(size_of::<VMClockShmHeader>()).cast();
94            (version_ptr, seq_count_ptr, vmclock_shm_body)
95        };
96
97        let writer = VMClockShmWriter {
98            segsize,
99            addr,
100            version_ptr,
101            seq_count_ptr,
102            vmclock_shm_body,
103        };
104
105        // Update the memory segment with bound on clock error data and write the layout version.
106        // - If the segment was wiped clean, this defines the memory layout. It is still not useable
107        //   by readers, until the next `update()` is successful.
108        // - If the segment existed and was valid, the version is over-written, and with a single
109        //   version defined today, this overwrites the same value and the segment is readily
110        //   available to the existing readers.
111        //
112        // TODO: remove the hard coded version 1 below, manage a change of version, and update the
113        // comment above since the no-op assumption won't hold true anymore with more than one
114        // version.
115        // SAFETY: segment has been validated to be usable, can use pointers.
116        unsafe {
117            let version_number = 1_u16;
118            let version = &*writer.version_ptr;
119            version.store(version_number, atomic::Ordering::Relaxed);
120        }
121
122        Ok(writer)
123    }
124
125    /// Check whether the memory segment already exist and is usable.
126    ///
127    /// The segment is usable if it can be opened at `path` and it can be read by a VMClockShmReader.
128    fn is_usable_segment(path: &Path) -> Result<(), ShmError> {
129        if let Some(path_str) = path.to_str() {
130            match VMClockShmReader::new(path_str) {
131                Ok(_reader) => Ok(()),
132                Err(err) => Err(err),
133            }
134        } else {
135            Err(ShmError::SegmentNotInitialized)
136        }
137    }
138
139    /// Return a segment size which is large enough to store everything we need.
140    fn segment_size() -> usize {
141        // Need to hold the header and the bound on clock error data.
142        let size = size_of::<VMClockShmHeader>() + size_of::<VMClockShmBody>();
143        debug!("Segment size (header + body) is {:?}", size);
144        size
145    }
146
147    /// Initialize the file backing the memory segment.
148    ///
149    /// Zero out the file up to segsize, but write out header information such the readers can
150    /// access it. Note that both the layout version number and the seq_count number are set to 0,
151    /// which makes this file not usable to retrieve clock error bound data yet.
152    fn wipe(path: &Path, segsize: usize) -> std::io::Result<()> {
153        // Attempt at creating intermediate directories, but do expect that the base permissions
154        // are set correctly.
155        if let Some(parent) = path.parent() {
156            match parent.to_str() {
157                Some("") => (), // This would be a relative path without parent
158                Some(_) => fs::create_dir_all(parent)?,
159                None => {
160                    return Err(Error::new(
161                        ErrorKind::Other,
162                        "Failed to extract parent dir name",
163                    ))
164                }
165            }
166        }
167
168        // Opens the file in write-only mode. Create a file if it does not exist, and truncate it
169        // if it does.
170        let mut file = std::fs::File::create(path)?;
171
172        // In theory, usize may not fit within a u32. In practice, we
173        let size: u32 = match segsize.try_into() {
174            Ok(size) => size, // it did fit
175            Err(e) => {
176                return Err(std::io::Error::new(
177                    ErrorKind::Other,
178                    format!(
179                        "Failed to convert segment size {:?} into u32 {:?}",
180                        segsize, e
181                    ),
182                ))
183            }
184        };
185
186        // Write the VMClockShmHeader
187
188        // Magic number.
189        file.write_u32::<LittleEndian>(VMCLOCK_SHM_MAGIC)?;
190        // Segsize.
191        file.write_u32::<LittleEndian>(size)?;
192        // Version.
193        file.write_u16::<LittleEndian>(0_u16)?;
194        // Counter ID.
195        file.write_u8(0_u8)?;
196        // Time type.
197        file.write_u8(0_u8)?;
198        // Sequence count.
199        file.write_u32::<LittleEndian>(0_u32)?;
200
201        // Zero the rest of the segment
202        let remaining = segsize - size_of::<VMClockShmHeader>();
203        let buf = vec![0; remaining];
204        file.write_all(&buf)?;
205
206        // Make sure the amount of bytes written matches the segment size
207        let pos = file.stream_position()?;
208        if pos > size.into() {
209            return Err(std::io::Error::new(
210                ErrorKind::Other,
211                format!(
212                    "SHM Writer implementation error: wrote {:?} bytes but segsize is {:?} bytes",
213                    pos, size
214                ),
215            ));
216        }
217
218        // Sync all and drop (close) the descriptor
219        file.sync_all()?;
220
221        Ok(())
222    }
223
224    /// Open and map the file at the given path to memory.
225    ///
226    /// TODO: implementation is using the nix crate, but may want to revisit and see if it would be
227    /// worth refactoring to align with the reader implementation, which is similar (but
228    /// read-only).
229    fn mmap_segment_at(path: &Path, segsize: usize) -> std::io::Result<*mut c_void> {
230        let fd = nix::fcntl::open(
231            path,
232            nix::fcntl::OFlag::O_RDWR,
233            nix::sys::stat::Mode::from_bits_truncate(0o644),
234        )?;
235
236        // SAFETY: always safe when addr is None.
237        unsafe {
238            nix::sys::mman::mmap(
239                None,
240                std::num::NonZeroUsize::new(segsize).unwrap(),
241                nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
242                nix::sys::mman::MapFlags::MAP_SHARED,
243                fd,
244                0,
245            )
246            .map_err(|e| {
247                let _ = nix::unistd::close(fd);
248                e.into()
249            })
250        }
251    }
252}
253
254impl VMClockShmWrite for VMClockShmWriter {
255    /// Update the clock error bound data in the memory segment.
256    ///
257    /// This function implements the lock-free mechanism that lets the writer update the memory
258    /// segment shared with many readers. The seq_count number is set to an odd number before the
259    /// update and an even number when successfully completed.
260    ///
261    fn write(&mut self, vmclock_shm_body: &VMClockShmBody) {
262        // SAFETY: pointers to fields in the memory segment have been validated on init.
263        unsafe {
264            // Start by reading the seq_count value stored in the memory segment.
265            let seq_count = &*self.seq_count_ptr;
266            let seq_count_value = seq_count.load(atomic::Ordering::Acquire);
267
268            // Mark the beginning of the update into the memory segment.
269            // The producer process may have error'ed or died in the middle of a previous update
270            // and left things hanging with an odd seq_count number. Being a bit fancy, this is
271            // our data anti-entropy protection, and make sure we enter the updating section with
272            // an odd number.
273            let seq_count_value = if seq_count_value & 0x0001 == 0 {
274                // This should be the most common case
275                seq_count_value.wrapping_add(1)
276            } else {
277                seq_count_value
278            };
279            seq_count.store(seq_count_value, atomic::Ordering::Release);
280
281            self.vmclock_shm_body.write(*vmclock_shm_body);
282            let seq_count_value = seq_count_value.wrapping_add(1);
283            seq_count.store(seq_count_value, atomic::Ordering::Release);
284        }
285    }
286}
287
288impl Drop for VMClockShmWriter {
289    /// Unmap the memory segment
290    ///
291    /// TODO: revisit to see if this can be refactored into the MmapGuard logic implemented on the
292    /// VMClockShmReader.
293    fn drop(&mut self) {
294        unsafe {
295            nix::sys::mman::munmap(self.addr, self.segsize).expect("munmap");
296        }
297    }
298}
299
300#[cfg(test)]
301mod t_writer {
302    use super::*;
303    use byteorder::{LittleEndian, ReadBytesExt};
304    use std::path::Path;
305    /// We make use of tempfile::NamedTempFile to ensure that
306    /// local files that are created during a test get removed
307    /// afterwards.
308    use tempfile::NamedTempFile;
309
310    use crate::shm::VMClockClockStatus;
311
312    macro_rules! vmclockshmbody {
313        () => {
314            VMClockShmBody {
315                disruption_marker: 0,
316                flags: 0,
317                _padding: [0x00, 0x00],
318                clock_status: VMClockClockStatus::Unknown,
319                leap_second_smearing_hint: 0,
320                tai_offset_sec: 0,
321                leap_indicator: 0,
322                counter_period_shift: 0,
323                counter_value: 0,
324                counter_period_frac_sec: 0,
325                counter_period_esterror_rate_frac_sec: 0,
326                counter_period_maxerror_rate_frac_sec: 0,
327                time_sec: 0,
328                time_frac_sec: 0,
329                time_esterror_nanosec: 0,
330                time_maxerror_nanosec: 0,
331            }
332        };
333    }
334
335    /// Helper function to remove files created during unit tests.
336    fn remove_file_or_directory(path: &str) {
337        // Busy looping on deleting the previous file, good enough for unit test
338        let p = Path::new(&path);
339        while p.exists() {
340            if p.is_dir() {
341                std::fs::remove_dir_all(&path).expect("failed to remove file");
342            } else {
343                std::fs::remove_file(&path).expect("failed to remove file");
344            }
345        }
346    }
347
348    #[test]
349    fn test_segment_size() {
350        let actual_segment_size = VMClockShmWriter::segment_size();
351        let expected_segment_size = 104_usize;
352        assert_eq!(actual_segment_size, expected_segment_size);
353    }
354
355    /// Assert that a new memory mapped segment is created it does not exist.
356    #[test]
357    fn test_writer_create_new_if_not_exist() {
358        let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
359        let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
360        let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
361        remove_file_or_directory(&vmclock_shm_path);
362
363        // Create and wipe the memory segment
364        let vmclock_shm_body = vmclockshmbody!();
365        let mut writer =
366            VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
367        writer.write(&vmclock_shm_body);
368
369        // Read it back into a snapshot
370        let mut reader =
371            VMClockShmReader::new(&vmclock_shm_path).expect("Failed to create VMClockShmReader");
372        let snapshot = reader
373            .snapshot()
374            .expect("Failed to take a VMClockShmBody snapshot");
375
376        assert_eq!(*snapshot, vmclock_shm_body);
377    }
378
379    /// Assert that an existing memory mapped segment is wiped clean if dirty.
380    #[test]
381    fn test_writer_wipe_clean_on_new() {
382        let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
383        let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
384        let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
385        remove_file_or_directory(&vmclock_shm_path);
386
387        // Let's write some garbage first
388        let mut file = std::fs::File::create(&vmclock_shm_path).expect("create file failed");
389        let _ = file.write(b"foobarbaz");
390        let _ = file.sync_all();
391
392        // Create and wipe the memory segment
393        let vmclock_shm_body = vmclockshmbody!();
394        let mut writer =
395            VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
396        writer.write(&vmclock_shm_body);
397
398        // Read it back into a snapshot
399        let mut reader =
400            VMClockShmReader::new(&vmclock_shm_path).expect("Failed to create VMClockShmReader");
401        let snapshot = reader
402            .snapshot()
403            .expect("Failed to take a clock error bound snapshot");
404
405        assert_eq!(*snapshot, vmclock_shm_body);
406    }
407
408    /// Assert that an existing and valid segment is reused and updated.
409    #[test]
410    fn test_writer_update_existing() {
411        let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
412        let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
413        let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
414        remove_file_or_directory(&vmclock_shm_path);
415
416        // Create a clean memory segment
417        let vmclock_shm_body = vmclockshmbody!();
418        let mut writer =
419            VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
420
421        // Push two updates to the shared memory segment, the seq_count moves from 0, to 2, to 4
422        writer.write(&vmclock_shm_body);
423        writer.write(&vmclock_shm_body);
424
425        // Check what the writer says
426        let seq_count = unsafe { &*writer.seq_count_ptr };
427        let seq_count_value = seq_count.load(atomic::Ordering::Acquire);
428        std::mem::drop(writer);
429        assert_eq!(seq_count_value, 4);
430
431        // Raw validation in the file
432        // A bit brittle, would be more robust not to hardcode the seek to the seq_count field
433        let mut file = std::fs::File::open(&vmclock_shm_path).expect("create file failed");
434        file.seek(std::io::SeekFrom::Start(12))
435            .expect("Failed to seek to seq_count offset");
436        let seq_count_value = file
437            .read_u64::<LittleEndian>()
438            .expect("Failed to read seq_count from file");
439        assert_eq!(seq_count_value, 4);
440    }
441}