clock_bound_vmclock/shm_writer.rs
1use byteorder::{LittleEndian, WriteBytesExt};
2use std::ffi::c_void;
3use std::io::Seek;
4use std::io::Write;
5use std::io::{Error, ErrorKind};
6use std::mem::size_of;
7use std::path::Path;
8use std::sync::atomic;
9use std::{fs, ptr};
10
11use tracing::debug;
12
13use crate::shm::{VMClockShmBody, VMClockShmHeader, VMCLOCK_SHM_MAGIC};
14use crate::shm_reader::VMClockShmReader;
15use clock_bound_shm::ShmError;
16
17/// Trait that a writer to the shared memory segment has to implement.
18pub trait VMClockShmWrite {
19 /// Update the shared memory segment with updated clock error bound data.
20 fn write(&mut self, vmclock_shm_body: &VMClockShmBody);
21}
22
23/// Writer to the VMClock shared memory segment.
24///
25/// This writer is expected to be used by a single process writing to a given path. The file
26/// written to is memory mapped by the writer and many (read-only) readers. Updates to the memory
27/// segment are applied in a lock-free manner, using a rolling seq_count number to protect the
28/// update section.
29#[derive(Debug)]
30pub struct VMClockShmWriter {
31 /// The size of the segment mapped in memory
32 segsize: usize,
33
34 /// A raw pointer keeping the address of the segment mapped in memory
35 addr: *mut c_void,
36
37 /// A raw pointer to the version member of the VMClockShmHeader mapped in memory. The version number
38 /// identifies the layout of the rest of the segment. A value of 0 indicates the memory segment
39 /// is not initialized / not usable.
40 version_ptr: *mut atomic::AtomicU16,
41
42 /// A raw pointer to the sequence count member of the
43 /// VMClockShmHeader mapped in memory. The sequence count number is updated by the writer
44 /// before and after updating the content mapped in memory.
45 seq_count_ptr: *mut atomic::AtomicU32,
46
47 /// A raw pointer to the VMClockShmBody data mapped in memory. This structure follows the
48 /// VMClockShmHeader and contains the information required to compute a bound on clock error.
49 vmclock_shm_body: *mut VMClockShmBody,
50}
51
52impl VMClockShmWriter {
53 /// Create a new VMClockShmWriter referencing the memory segment to write VMClockShmBody data to.
54 ///
55 /// There are several cases to consider:
56 /// 1. The file backing the memory segment does not exist, or the content is corrupted/wrong.
57 /// This is a cold start-like scenario, creating a fresh memory mapped file.
58 /// 2. The file backing the memory segment already exist and is valid. This may be that the
59 /// process using this writer has restarted, but clients may still be using the existing
60 /// values. Here, we want to load the existing memory segment, and continue as if nothing
61 /// happened. That's a warm reboot-like scenario.
62 /// 3. A variation of 2., but where the layout is being changed (a version bump). This is
63 /// analog to a cold boot.
64 ///
65 /// TODO: implement scenario 3 once the readers support a version bump.
66 pub fn new(path: &Path) -> std::io::Result<VMClockShmWriter> {
67 // Determine the size of the segment.
68 let segsize = VMClockShmWriter::segment_size();
69
70 // Use the VMClockShmReader to assert the state of the segment. If the segment does not exist or
71 // cannot be read correctly, wipe it clean. Note that there is a strong assumption here
72 // that there is only one writer running on the system and writing to `path`. Consequently,
73 // it is safe to wipe clean and then update. No-one will attempt to write to the segment
74 // even if this process is scheduled out.
75 if VMClockShmWriter::is_usable_segment(path).is_err() {
76 // Note that wiping the file sets the version to 0, which is used to indicate the
77 // readers that the memory segment is not usable yet.
78 VMClockShmWriter::wipe(path, segsize)?
79 }
80
81 // Memory map the file.
82 let addr = VMClockShmWriter::mmap_segment_at(path, segsize)?;
83
84 // Obtain raw pointers to relevant members in the memory map segment and create a new
85 // writer.
86 // SAFETY: segment has been validated to be usable, can map pointers.
87 //
88 let (version_ptr, seq_count_ptr, vmclock_shm_body) = unsafe {
89 let cursor: *mut u8 = addr.cast();
90 let version_ptr = ptr::addr_of_mut!((*cursor.cast::<VMClockShmHeader>()).version);
91 let seq_count_ptr = ptr::addr_of_mut!((*cursor.cast::<VMClockShmHeader>()).seq_count);
92 let vmclock_shm_body: *mut VMClockShmBody =
93 addr.add(size_of::<VMClockShmHeader>()).cast();
94 (version_ptr, seq_count_ptr, vmclock_shm_body)
95 };
96
97 let writer = VMClockShmWriter {
98 segsize,
99 addr,
100 version_ptr,
101 seq_count_ptr,
102 vmclock_shm_body,
103 };
104
105 // Update the memory segment with bound on clock error data and write the layout version.
106 // - If the segment was wiped clean, this defines the memory layout. It is still not useable
107 // by readers, until the next `update()` is successful.
108 // - If the segment existed and was valid, the version is over-written, and with a single
109 // version defined today, this overwrites the same value and the segment is readily
110 // available to the existing readers.
111 //
112 // TODO: remove the hard coded version 1 below, manage a change of version, and update the
113 // comment above since the no-op assumption won't hold true anymore with more than one
114 // version.
115 // SAFETY: segment has been validated to be usable, can use pointers.
116 unsafe {
117 let version_number = 1_u16;
118 let version = &*writer.version_ptr;
119 version.store(version_number, atomic::Ordering::Relaxed);
120 }
121
122 Ok(writer)
123 }
124
125 /// Check whether the memory segment already exist and is usable.
126 ///
127 /// The segment is usable if it can be opened at `path` and it can be read by a VMClockShmReader.
128 fn is_usable_segment(path: &Path) -> Result<(), ShmError> {
129 if let Some(path_str) = path.to_str() {
130 match VMClockShmReader::new(path_str) {
131 Ok(_reader) => Ok(()),
132 Err(err) => Err(err),
133 }
134 } else {
135 Err(ShmError::SegmentNotInitialized)
136 }
137 }
138
139 /// Return a segment size which is large enough to store everything we need.
140 fn segment_size() -> usize {
141 // Need to hold the header and the bound on clock error data.
142 let size = size_of::<VMClockShmHeader>() + size_of::<VMClockShmBody>();
143 debug!("Segment size (header + body) is {:?}", size);
144 size
145 }
146
147 /// Initialize the file backing the memory segment.
148 ///
149 /// Zero out the file up to segsize, but write out header information such the readers can
150 /// access it. Note that both the layout version number and the seq_count number are set to 0,
151 /// which makes this file not usable to retrieve clock error bound data yet.
152 fn wipe(path: &Path, segsize: usize) -> std::io::Result<()> {
153 // Attempt at creating intermediate directories, but do expect that the base permissions
154 // are set correctly.
155 if let Some(parent) = path.parent() {
156 match parent.to_str() {
157 Some("") => (), // This would be a relative path without parent
158 Some(_) => fs::create_dir_all(parent)?,
159 None => {
160 return Err(Error::new(
161 ErrorKind::Other,
162 "Failed to extract parent dir name",
163 ))
164 }
165 }
166 }
167
168 // Opens the file in write-only mode. Create a file if it does not exist, and truncate it
169 // if it does.
170 let mut file = std::fs::File::create(path)?;
171
172 // In theory, usize may not fit within a u32. In practice, we
173 let size: u32 = match segsize.try_into() {
174 Ok(size) => size, // it did fit
175 Err(e) => {
176 return Err(std::io::Error::new(
177 ErrorKind::Other,
178 format!(
179 "Failed to convert segment size {:?} into u32 {:?}",
180 segsize, e
181 ),
182 ))
183 }
184 };
185
186 // Write the VMClockShmHeader
187
188 // Magic number.
189 file.write_u32::<LittleEndian>(VMCLOCK_SHM_MAGIC)?;
190 // Segsize.
191 file.write_u32::<LittleEndian>(size)?;
192 // Version.
193 file.write_u16::<LittleEndian>(0_u16)?;
194 // Counter ID.
195 file.write_u8(0_u8)?;
196 // Time type.
197 file.write_u8(0_u8)?;
198 // Sequence count.
199 file.write_u32::<LittleEndian>(0_u32)?;
200
201 // Zero the rest of the segment
202 let remaining = segsize - size_of::<VMClockShmHeader>();
203 let buf = vec![0; remaining];
204 file.write_all(&buf)?;
205
206 // Make sure the amount of bytes written matches the segment size
207 let pos = file.stream_position()?;
208 if pos > size.into() {
209 return Err(std::io::Error::new(
210 ErrorKind::Other,
211 format!(
212 "SHM Writer implementation error: wrote {:?} bytes but segsize is {:?} bytes",
213 pos, size
214 ),
215 ));
216 }
217
218 // Sync all and drop (close) the descriptor
219 file.sync_all()?;
220
221 Ok(())
222 }
223
224 /// Open and map the file at the given path to memory.
225 ///
226 /// TODO: implementation is using the nix crate, but may want to revisit and see if it would be
227 /// worth refactoring to align with the reader implementation, which is similar (but
228 /// read-only).
229 fn mmap_segment_at(path: &Path, segsize: usize) -> std::io::Result<*mut c_void> {
230 let fd = nix::fcntl::open(
231 path,
232 nix::fcntl::OFlag::O_RDWR,
233 nix::sys::stat::Mode::from_bits_truncate(0o644),
234 )?;
235
236 // SAFETY: always safe when addr is None.
237 unsafe {
238 nix::sys::mman::mmap(
239 None,
240 std::num::NonZeroUsize::new(segsize).unwrap(),
241 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
242 nix::sys::mman::MapFlags::MAP_SHARED,
243 fd,
244 0,
245 )
246 .map_err(|e| {
247 let _ = nix::unistd::close(fd);
248 e.into()
249 })
250 }
251 }
252}
253
254impl VMClockShmWrite for VMClockShmWriter {
255 /// Update the clock error bound data in the memory segment.
256 ///
257 /// This function implements the lock-free mechanism that lets the writer update the memory
258 /// segment shared with many readers. The seq_count number is set to an odd number before the
259 /// update and an even number when successfully completed.
260 ///
261 fn write(&mut self, vmclock_shm_body: &VMClockShmBody) {
262 // SAFETY: pointers to fields in the memory segment have been validated on init.
263 unsafe {
264 // Start by reading the seq_count value stored in the memory segment.
265 let seq_count = &*self.seq_count_ptr;
266 let seq_count_value = seq_count.load(atomic::Ordering::Acquire);
267
268 // Mark the beginning of the update into the memory segment.
269 // The producer process may have error'ed or died in the middle of a previous update
270 // and left things hanging with an odd seq_count number. Being a bit fancy, this is
271 // our data anti-entropy protection, and make sure we enter the updating section with
272 // an odd number.
273 let seq_count_value = if seq_count_value & 0x0001 == 0 {
274 // This should be the most common case
275 seq_count_value.wrapping_add(1)
276 } else {
277 seq_count_value
278 };
279 seq_count.store(seq_count_value, atomic::Ordering::Release);
280
281 self.vmclock_shm_body.write(*vmclock_shm_body);
282 let seq_count_value = seq_count_value.wrapping_add(1);
283 seq_count.store(seq_count_value, atomic::Ordering::Release);
284 }
285 }
286}
287
288impl Drop for VMClockShmWriter {
289 /// Unmap the memory segment
290 ///
291 /// TODO: revisit to see if this can be refactored into the MmapGuard logic implemented on the
292 /// VMClockShmReader.
293 fn drop(&mut self) {
294 unsafe {
295 nix::sys::mman::munmap(self.addr, self.segsize).expect("munmap");
296 }
297 }
298}
299
300#[cfg(test)]
301mod t_writer {
302 use super::*;
303 use byteorder::{LittleEndian, ReadBytesExt};
304 use std::path::Path;
305 /// We make use of tempfile::NamedTempFile to ensure that
306 /// local files that are created during a test get removed
307 /// afterwards.
308 use tempfile::NamedTempFile;
309
310 use crate::shm::VMClockClockStatus;
311
312 macro_rules! vmclockshmbody {
313 () => {
314 VMClockShmBody {
315 disruption_marker: 0,
316 flags: 0,
317 _padding: [0x00, 0x00],
318 clock_status: VMClockClockStatus::Unknown,
319 leap_second_smearing_hint: 0,
320 tai_offset_sec: 0,
321 leap_indicator: 0,
322 counter_period_shift: 0,
323 counter_value: 0,
324 counter_period_frac_sec: 0,
325 counter_period_esterror_rate_frac_sec: 0,
326 counter_period_maxerror_rate_frac_sec: 0,
327 time_sec: 0,
328 time_frac_sec: 0,
329 time_esterror_nanosec: 0,
330 time_maxerror_nanosec: 0,
331 }
332 };
333 }
334
335 /// Helper function to remove files created during unit tests.
336 fn remove_file_or_directory(path: &str) {
337 // Busy looping on deleting the previous file, good enough for unit test
338 let p = Path::new(&path);
339 while p.exists() {
340 if p.is_dir() {
341 std::fs::remove_dir_all(&path).expect("failed to remove file");
342 } else {
343 std::fs::remove_file(&path).expect("failed to remove file");
344 }
345 }
346 }
347
348 #[test]
349 fn test_segment_size() {
350 let actual_segment_size = VMClockShmWriter::segment_size();
351 let expected_segment_size = 104_usize;
352 assert_eq!(actual_segment_size, expected_segment_size);
353 }
354
355 /// Assert that a new memory mapped segment is created it does not exist.
356 #[test]
357 fn test_writer_create_new_if_not_exist() {
358 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
359 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
360 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
361 remove_file_or_directory(&vmclock_shm_path);
362
363 // Create and wipe the memory segment
364 let vmclock_shm_body = vmclockshmbody!();
365 let mut writer =
366 VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
367 writer.write(&vmclock_shm_body);
368
369 // Read it back into a snapshot
370 let mut reader =
371 VMClockShmReader::new(&vmclock_shm_path).expect("Failed to create VMClockShmReader");
372 let snapshot = reader
373 .snapshot()
374 .expect("Failed to take a VMClockShmBody snapshot");
375
376 assert_eq!(*snapshot, vmclock_shm_body);
377 }
378
379 /// Assert that an existing memory mapped segment is wiped clean if dirty.
380 #[test]
381 fn test_writer_wipe_clean_on_new() {
382 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
383 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
384 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
385 remove_file_or_directory(&vmclock_shm_path);
386
387 // Let's write some garbage first
388 let mut file = std::fs::File::create(&vmclock_shm_path).expect("create file failed");
389 let _ = file.write(b"foobarbaz");
390 let _ = file.sync_all();
391
392 // Create and wipe the memory segment
393 let vmclock_shm_body = vmclockshmbody!();
394 let mut writer =
395 VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
396 writer.write(&vmclock_shm_body);
397
398 // Read it back into a snapshot
399 let mut reader =
400 VMClockShmReader::new(&vmclock_shm_path).expect("Failed to create VMClockShmReader");
401 let snapshot = reader
402 .snapshot()
403 .expect("Failed to take a clock error bound snapshot");
404
405 assert_eq!(*snapshot, vmclock_shm_body);
406 }
407
408 /// Assert that an existing and valid segment is reused and updated.
409 #[test]
410 fn test_writer_update_existing() {
411 let vmclock_shm_tempfile = NamedTempFile::new().expect("create vmclock file failed");
412 let vmclock_shm_temppath = vmclock_shm_tempfile.into_temp_path();
413 let vmclock_shm_path = vmclock_shm_temppath.to_str().unwrap();
414 remove_file_or_directory(&vmclock_shm_path);
415
416 // Create a clean memory segment
417 let vmclock_shm_body = vmclockshmbody!();
418 let mut writer =
419 VMClockShmWriter::new(Path::new(&vmclock_shm_path)).expect("Failed to create a writer");
420
421 // Push two updates to the shared memory segment, the seq_count moves from 0, to 2, to 4
422 writer.write(&vmclock_shm_body);
423 writer.write(&vmclock_shm_body);
424
425 // Check what the writer says
426 let seq_count = unsafe { &*writer.seq_count_ptr };
427 let seq_count_value = seq_count.load(atomic::Ordering::Acquire);
428 std::mem::drop(writer);
429 assert_eq!(seq_count_value, 4);
430
431 // Raw validation in the file
432 // A bit brittle, would be more robust not to hardcode the seek to the seq_count field
433 let mut file = std::fs::File::open(&vmclock_shm_path).expect("create file failed");
434 file.seek(std::io::SeekFrom::Start(12))
435 .expect("Failed to seek to seq_count offset");
436 let seq_count_value = file
437 .read_u64::<LittleEndian>()
438 .expect("Failed to read seq_count from file");
439 assert_eq!(seq_count_value, 4);
440 }
441}