mmap_sync/
synchronizer.rs

1//! The `synchronizer` module is the core component of the `mmap-sync` library, providing a `Synchronizer` struct for concurrent data access.
2//!
3//! The `Synchronizer` offers a simple interface for reading and writing data from/to shared memory. It uses memory-mapped files and wait-free synchronization to provide high concurrency wait-free reads over a single writer instance. This design is inspired by the [Left-Right concurrency control technique](https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/left-right-2014.pdf), allowing for efficient and flexible inter-process communication.
4//!
5//! Furthermore, with the aid of the [rkyv](https://rkyv.org/) library, `Synchronizer` can perform zero-copy deserialization, reducing time and memory usage when accessing data.
6use std::ffi::OsStr;
7use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
8use std::time::Duration;
9
10use bytecheck::CheckBytes;
11use rkyv::ser::serializers::{AlignedSerializer, AllocSerializer};
12use rkyv::ser::Serializer;
13use rkyv::validation::validators::DefaultValidator;
14use rkyv::{archived_root, check_archived_root, AlignedVec, Archive, Serialize};
15use thiserror::Error;
16use wyhash::WyHash;
17
18use crate::data::DataContainer;
19use crate::guard::{ReadGuard, ReadResult};
20use crate::instance::InstanceVersion;
21use crate::locks::{LockDisabled, WriteLockStrategy};
22use crate::state::StateContainer;
23use crate::synchronizer::SynchronizerError::*;
24
25/// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes.
26///
27/// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions.
28///
29/// Template parameters:
30///   - `H` - hasher used for checksum calculation
31///   - `WL` - optional write locking to prevent multiple writers. (default [`LockDisabled`])
32///   - `N` - serializer scratch space size
33///   - `SD` - sleep duration in nanoseconds used by writer during lock acquisition (default 1s)
34pub struct Synchronizer<
35    H: Hasher + Default = WyHash,
36    WL = LockDisabled,
37    const N: usize = 1024,
38    const SD: u64 = 1_000_000_000,
39> {
40    /// Container storing state mmap
41    state_container: StateContainer<WL>,
42    /// Container storing data mmap
43    data_container: DataContainer,
44    /// Hasher used for checksum calculation
45    build_hasher: BuildHasherDefault<H>,
46    /// Re-usable buffer for serialization
47    serialize_buffer: Option<AlignedVec>,
48}
49
50/// `SynchronizerError` enumerates all possible errors returned by this library.
51/// These errors mainly represent the failures that might occur during reading or writing
52/// operations in data or state files.
53#[derive(Error, Debug)]
54pub enum SynchronizerError {
55    /// An error occurred while writing to the data file.
56    #[error("error writing data file: {0}")]
57    FailedDataWrite(std::io::Error),
58    /// An error occurred while reading from the data file.
59    #[error("error reading data file: {0}")]
60    FailedDataRead(std::io::Error),
61    /// An error occurred while reading from the state file.
62    #[error("error reading state file: {0}")]
63    FailedStateRead(std::io::Error),
64    /// An error occurred while writing an entity.
65    #[error("error writing entity")]
66    FailedEntityWrite,
67    /// An error occurred while reading an entity.
68    #[error("error reading entity")]
69    FailedEntityRead,
70    /// The state was not properly initialized.
71    #[error("uninitialized state")]
72    UninitializedState,
73    /// The instance version parameters were invalid.
74    #[error("invalid instance version params")]
75    InvalidInstanceVersionParams,
76    /// Write locking is enabled and the lock is held by another writer.
77    #[error("write blocked by conflicting lock")]
78    WriteLockConflict,
79}
80
81impl Synchronizer {
82    /// Create new instance of `Synchronizer` using given `path_prefix` and default template parameters
83    pub fn new(path_prefix: &OsStr) -> Self {
84        Self::with_params(path_prefix)
85    }
86}
87
88impl<'a, H, WL, const N: usize, const SD: u64> Synchronizer<H, WL, N, SD>
89where
90    H: Hasher + Default,
91    WL: WriteLockStrategy<'a>,
92{
93    /// Create new instance of `Synchronizer` using given `path_prefix` and template parameters
94    pub fn with_params(path_prefix: &OsStr) -> Self {
95        Synchronizer {
96            state_container: StateContainer::new(path_prefix),
97            data_container: DataContainer::new(path_prefix),
98            build_hasher: BuildHasherDefault::default(),
99            serialize_buffer: Some(AlignedVec::new()),
100        }
101    }
102
103    /// Writes a given `entity` into the next available data file.
104    ///
105    /// Returns the number of bytes written to the data file and a boolean flag, for diagnostic
106    /// purposes, indicating whether the reader counter was reset due to a reader exiting without
107    /// decrementing it.
108    ///
109    /// # Parameters
110    /// - `entity`: The entity to be written to the data file.
111    /// - `grace_duration`: The maximum period to wait for readers to finish before resetting the
112    ///                     reader count to 0. This handles scenarios where a reader process has
113    ///                     crashed or exited abnormally, failing to decrement the reader count.
114    ///                     After the `grace_duration` has elapsed, if there are still active
115    ///                     readers, the reader count is reset to 0 to restore synchronization state.
116    ///
117    /// # Returns
118    /// A result containing a tuple of the number of bytes written and a boolean indicating whether
119    /// the reader count was reset, or a `SynchronizerError` if the operation fails.
120    pub fn write<T>(
121        &'a mut self,
122        entity: &T,
123        grace_duration: Duration,
124    ) -> Result<(usize, bool), SynchronizerError>
125    where
126        T: Serialize<AllocSerializer<N>>,
127        T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
128    {
129        let mut buf = self.serialize_buffer.take().ok_or(FailedEntityWrite)?;
130        buf.clear();
131
132        // serialize given entity into bytes
133        let mut serializer = AllocSerializer::new(
134            AlignedSerializer::new(buf),
135            Default::default(),
136            Default::default(),
137        );
138        let _ = serializer
139            .serialize_value(entity)
140            .map_err(|_| FailedEntityWrite)?;
141        let data = serializer.into_serializer().into_inner();
142
143        // ensure that serialized bytes can be deserialized back to `T` struct successfully
144        check_archived_root::<T>(&data).map_err(|_| FailedEntityRead)?;
145
146        // fetch current state from mapped memory
147        let state = self.state_container.state::<true>(true)?;
148
149        // calculate data checksum
150        let mut hasher = self.build_hasher.build_hasher();
151        hasher.write(&data);
152        let checksum = hasher.finish();
153
154        // acquire next available data file idx and write data to it
155        let acquire_sleep_duration = Duration::from_nanos(SD);
156        let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
157        let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
158        let size = self.data_container.write(&data, new_version)?;
159
160        // switch readers to new version
161        state.switch_version(new_version);
162
163        // Restore buffer for potential reuse
164        self.serialize_buffer.replace(data);
165
166        Ok((size, reset))
167    }
168
169    /// Write raw data bytes representing type `T` into the next available data file.
170    /// Returns number of bytes written to data file and a boolean flag, for diagnostic purposes,
171    /// indicating that we have reset our readers counter after a reader died without decrementing it.
172    pub fn write_raw<T>(
173        &'a mut self,
174        data: &[u8],
175        grace_duration: Duration,
176    ) -> Result<(usize, bool), SynchronizerError>
177    where
178        T: Serialize<AllocSerializer<N>>,
179        T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
180    {
181        // fetch current state from mapped memory
182        let state = self.state_container.state::<true>(true)?;
183
184        // calculate data checksum
185        let mut hasher = self.build_hasher.build_hasher();
186        hasher.write(data);
187        let checksum = hasher.finish();
188
189        // acquire next available data file idx and write data to it
190        let acquire_sleep_duration = Duration::from_nanos(SD);
191        let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
192        let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
193        let size = self.data_container.write(data, new_version)?;
194
195        // switch readers to new version
196        state.switch_version(new_version);
197
198        Ok((size, reset))
199    }
200
201    /// Reads and returns an `entity` struct from mapped memory wrapped in `ReadGuard`.
202    ///
203    /// # Parameters
204    /// - `check_bytes`: Whether to check that `entity` bytes can be safely read for type `T`,
205    ///                  `false` - bytes check will not be performed (faster, but less safe),
206    ///                  `true` - bytes check will be performed (slower, but safer).
207    ///
208    /// # Safety
209    ///
210    /// This method is marked as unsafe due to the potential for memory corruption if the returned
211    /// result is used beyond the `grace_duration` set in the `write` method. The caller must ensure
212    /// the `ReadGuard` (and any references derived from it) are dropped before this time period
213    /// elapses to ensure safe operation.
214    ///
215    /// Additionally, the use of `unsafe` here is related to the internal use of the
216    /// `rkyv::archived_root` function, which has its own safety considerations. Particularly, it
217    /// assumes the byte slice provided to it accurately represents an archived object, and that the
218    /// root of the object is stored at the end of the slice.
219    pub unsafe fn read<T>(
220        &'a mut self,
221        check_bytes: bool,
222    ) -> Result<ReadResult<T>, SynchronizerError>
223    where
224        T: Archive,
225        T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
226    {
227        // fetch current state from mapped memory
228        let state = self.state_container.state::<false>(false)?;
229
230        // fetch current version
231        let version = state.version()?;
232
233        // create and lock state guard for reading
234        let guard = ReadGuard::new(state, version)?;
235
236        // fetch data for current version from mapped memory
237        let (data, switched) = self.data_container.data(version)?;
238
239        // fetch entity from data using zero-copy deserialization
240        let entity = match check_bytes {
241            false => archived_root::<T>(data),
242            true => check_archived_root::<T>(data).map_err(|_| FailedEntityRead)?,
243        };
244
245        Ok(ReadResult::new(guard, entity, switched))
246    }
247
248    /// Returns current `InstanceVersion` stored within the state, useful for detecting
249    /// whether synchronized `entity` has changed.
250    pub fn version(&'a mut self) -> Result<InstanceVersion, SynchronizerError> {
251        // fetch current state from mapped memory
252        let state = self.state_container.state::<false>(false)?;
253
254        // fetch current version
255        state.version()
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use crate::instance::InstanceVersion;
262    use crate::locks::SingleWriter;
263    use crate::synchronizer::{Synchronizer, SynchronizerError};
264    use bytecheck::CheckBytes;
265    use rand::distributions::Uniform;
266    use rand::prelude::*;
267    use rkyv::{Archive, Deserialize, Serialize};
268    use std::collections::HashMap;
269    use std::fs;
270    use std::path::Path;
271    use std::time::Duration;
272    use wyhash::WyHash;
273
274    #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
275    #[archive_attr(derive(CheckBytes))]
276    struct MockEntity {
277        version: u32,
278        map: HashMap<u64, Vec<f32>>,
279    }
280
281    struct MockEntityGenerator {
282        rng: StdRng,
283    }
284
285    impl MockEntityGenerator {
286        fn new(seed: u8) -> Self {
287            MockEntityGenerator {
288                rng: StdRng::from_seed([seed; 32]),
289            }
290        }
291
292        fn gen(&mut self, n: usize) -> MockEntity {
293            let mut entity = MockEntity {
294                version: self.rng.gen(),
295                map: HashMap::new(),
296            };
297            let range = Uniform::<f32>::from(0.0..100.0);
298            for _ in 0..n {
299                let key: u64 = self.rng.gen();
300                let n_vals = self.rng.gen::<usize>() % 20;
301                let vals: Vec<f32> = (0..n_vals).map(|_| self.rng.sample(range)).collect();
302                entity.map.insert(key, vals);
303            }
304            entity
305        }
306    }
307
308    #[test]
309    fn test_synchronizer() {
310        let path = "/tmp/synchro_test";
311        let state_path = path.to_owned() + "_state";
312        let data_path_0 = path.to_owned() + "_data_0";
313        let data_path_1 = path.to_owned() + "_data_1";
314
315        // clean up test files before tests
316        fs::remove_file(&state_path).unwrap_or_default();
317        fs::remove_file(&data_path_0).unwrap_or_default();
318        fs::remove_file(&data_path_1).unwrap_or_default();
319
320        // create writer and reader synchronizers
321        let mut writer = Synchronizer::new(path.as_ref());
322        let mut reader = Synchronizer::new(path.as_ref());
323
324        // use deterministic random generator for reproducible results
325        let mut entity_generator = MockEntityGenerator::new(3);
326
327        // check that `read` returns error when writer didn't write yet
328        let res = unsafe { reader.read::<MockEntity>(false) };
329        assert!(res.is_err());
330        assert_eq!(
331            res.err().unwrap().to_string(),
332            "error reading state file: No such file or directory (os error 2)"
333        );
334        assert!(!Path::new(&state_path).exists());
335
336        // check if can write entity with correct size
337        let entity = entity_generator.gen(100);
338        let (size, reset) = writer.write(&entity, Duration::from_secs(1)).unwrap();
339        assert!(size > 0);
340        assert_eq!(reset, false);
341        assert!(Path::new(&state_path).exists());
342        assert!(!Path::new(&data_path_1).exists());
343        assert_eq!(
344            reader.version().unwrap(),
345            InstanceVersion(8817430144856633152)
346        );
347
348        // check that first time scoped `read` works correctly and switches the data
349        fetch_and_assert_entity(&mut reader, &entity, true);
350
351        // check that second time scoped `read` works correctly and doesn't switch the data
352        fetch_and_assert_entity(&mut reader, &entity, false);
353
354        // check if can write entity again
355        let entity = entity_generator.gen(200);
356        let (size, reset) = writer.write(&entity, Duration::from_secs(1)).unwrap();
357        assert!(size > 0);
358        assert_eq!(reset, false);
359        assert!(Path::new(&state_path).exists());
360        assert!(Path::new(&data_path_0).exists());
361        assert!(Path::new(&data_path_1).exists());
362        assert_eq!(
363            reader.version().unwrap(),
364            InstanceVersion(1441050725688826209)
365        );
366
367        // check that another scoped `read` works correctly and switches the data
368        fetch_and_assert_entity(&mut reader, &entity, true);
369
370        // write entity twice to switch to the same `idx` without any reads in between
371        let entity = entity_generator.gen(100);
372        let (size, reset) = writer.write(&entity, Duration::from_secs(1)).unwrap();
373        assert!(size > 0);
374        assert_eq!(reset, false);
375        assert_eq!(
376            reader.version().unwrap(),
377            InstanceVersion(14058099486534675680)
378        );
379
380        let entity = entity_generator.gen(200);
381        let (size, reset) = writer.write(&entity, Duration::from_secs(1)).unwrap();
382        assert!(size > 0);
383        assert_eq!(reset, false);
384        assert_eq!(
385            reader.version().unwrap(),
386            InstanceVersion(18228729609619266545)
387        );
388
389        fetch_and_assert_entity(&mut reader, &entity, true);
390    }
391
392    fn fetch_and_assert_entity(
393        synchronizer: &mut Synchronizer,
394        expected_entity: &MockEntity,
395        expected_is_switched: bool,
396    ) {
397        let actual_entity = unsafe { synchronizer.read::<MockEntity>(false).unwrap() };
398        assert_eq!(actual_entity.map, expected_entity.map);
399        assert_eq!(actual_entity.version, expected_entity.version);
400        assert_eq!(actual_entity.is_switched(), expected_is_switched);
401    }
402
403    #[test]
404    fn single_writer_lock_prevents_multiple_writers() {
405        static PATH: &str = "/tmp/synchronizer_single_writer";
406        let mut entity_generator = MockEntityGenerator::new(3);
407        let entity = entity_generator.gen(100);
408
409        let mut writer1 = Synchronizer::<WyHash, SingleWriter>::with_params(PATH.as_ref());
410        let mut writer2 = Synchronizer::<WyHash, SingleWriter>::with_params(PATH.as_ref());
411
412        writer1.write(&entity, Duration::from_secs(1)).unwrap();
413        assert!(matches!(
414            writer2.write(&entity, Duration::from_secs(1)),
415            Err(SynchronizerError::WriteLockConflict)
416        ));
417    }
418}