shm_snapshot/
writer.rs

1use core::iter::Extend;
2use core::sync::atomic::{AtomicU64, Ordering};
3use memmap2::MmapRaw;
4
5/// A memory-mapped file into which this writer adds new snapshot.
6pub struct Writer {
7    pub(crate) head: Head,
8}
9
10/// A read view of a file.
11///
12/// Can be used to recover data, or convert into a `Writer`.
13pub struct File {
14    pub(crate) head: Head,
15}
16
17/// A view onto a memory-mapped file, which has a configured layout.
18pub struct FileDiscovery<'lt> {
19    pub(crate) file: &'lt File,
20    pub(crate) configuration: ConfigureFile,
21}
22
23/// Describes the layout of a shared memory in a [`Writer`].
24#[derive(Default, Debug)]
25pub struct ConfigureFile {
26    /// The number of entries in the sequence ring buffer.
27    pub entries: u64,
28    /// The number of bytes in the data ring buffer.
29    pub data: u64,
30    /// The offset of the next-to-write entry.
31    pub initial_offset: u64,
32    /// The indicate version in the file, or an explicit invalid number.
33    ///
34    /// Can't allow it to be public, it's not supposed to be arbitrarily set.
35    pub(crate) layout_version: u64,
36}
37
38pub struct Head {
39    head: WriteHead,
40    /// The memory map protecting the validity of the write head. This is purely for safety, not
41    /// accessing the field besides `Drop`.
42    #[allow(dead_code)]
43    file: MmapRaw,
44}
45
46/// The descriptor of a singular snapshot.
47#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
48pub struct Snapshot {
49    /// The offset of data in the data ring.
50    pub offset: u64,
51    /// The length of data in the data ring. A non-zero length marks a valid entry, a zero length
52    /// an invalid entry.
53    pub length: u64,
54}
55
56pub(crate) trait Collect<T> {
57    fn insert_one(&mut self, _: T) -> bool;
58}
59
60impl<T> Collect<T> for Vec<T> {
61    fn insert_one(&mut self, val: T) -> bool {
62        self.push(val);
63        true
64    }
65}
66
67pub(crate) struct Entry<'lt> {
68    index: u64,
69    offset: u64,
70    length: u64,
71    head: &'lt mut WriteHead,
72}
73
74/// An unfinished entry in a writer's ring, which can be atomically committed.
75pub struct PreparedTransaction<'lt> {
76    offset: u64,
77    length: u64,
78    head: &'lt mut WriteHead,
79    tail: &'lt [DataPage],
80}
81
82/// Resolved pointers _into_ a memory map.
83///
84/// # Safety
85///
86/// Do **NOT** under and circumstance return the references with unchanged lifetimes. The correct
87/// lifetime is the `'self` of an encompassing `Head`.
88///
89/// It is vitally important that this struct is always paired with a backing file that keeps the
90/// allocation. The members lifetime is a lie, the truth impossible to represent as it should have
91/// a `'self` lifetime to the owner of the memory. The backing file allocation might be leaked to
92/// turn these into true representations though (leaking the allocation with it). If the SharedFd
93/// is used similar to an alternative heap then this is correct.
94pub(crate) struct WriteHead {
95    /// Our process / thread internal view of the head page mapped in the file.
96    ///
97    /// This exists solely for internal consistency.
98    pub(crate) cache: HeadCache,
99    pub(crate) meta: &'static HeadPage,
100    pub(crate) sequence: &'static [SequencePage],
101    pub(crate) data: &'static [DataPage],
102    /// Data pages from the shared memory which we do not touch ourselves, i.e. user reserved.
103    pub(crate) tail: &'static [DataPage],
104}
105
106struct HeadMapRaw {
107    meta: *const HeadPage,
108    sequence: *const [SequencePage],
109    data: *const [DataPage],
110}
111
112impl Head {
113    fn fitting_power_of_two(value: u64) -> u64 {
114        const HIGEST_BIT_SET: u64 = !((!0) >> 1);
115        // Must be a power of two, use the next lower one.
116        HIGEST_BIT_SET >> value.leading_zeros()
117    }
118
119    pub(crate) fn discover(&self, cfg: &mut ConfigureFile) {
120        let entry_mask = self.head.meta.entry_mask.load(Ordering::Relaxed);
121        let data_mask = self.head.meta.page_mask.load(Ordering::Relaxed);
122        let page_write_offset = self.head.meta.page_write_offset.load(Ordering::Relaxed);
123
124        let layout_version = self.head.meta.version.load(Ordering::Relaxed);
125        assert!(entry_mask < usize::MAX as u64);
126        assert!(data_mask < usize::MAX as u64);
127
128        let sequence = (entry_mask + 1) as usize;
129        // Assume this refers to the whole tail at this point?
130        let pages = self.head.data.len();
131        let psequence = sequence / SequencePage::DATA_COUNT
132            + usize::from(sequence % SequencePage::DATA_COUNT != 0);
133
134        let data_space = (pages - psequence) as u64 * core::mem::size_of::<DataPage>() as u64;
135        let available_entries = Self::fitting_power_of_two(entry_mask + 1);
136        let available_data = Self::fitting_power_of_two(data_space);
137
138        cfg.entries = available_entries;
139        cfg.data = available_data.min(data_mask + 1);
140        cfg.initial_offset = page_write_offset;
141        cfg.layout_version = layout_version;
142    }
143
144    pub(crate) fn configure(&mut self, cfg: &ConfigureFile) {
145        Self::configure_head(&mut self.head, cfg)
146    }
147
148    fn configure_head(head: &mut WriteHead, cfg: &ConfigureFile) {
149        assert!(cfg.entries.next_power_of_two() == cfg.entries);
150        assert!(cfg.data.next_power_of_two() == cfg.data);
151        assert!(cfg.is_initialized());
152
153        head.pre_configure_entries(cfg.entries);
154        head.pre_configure_pages(cfg.data);
155        head.pre_configure_write(cfg.initial_offset);
156        head.configure_pages();
157    }
158
159    #[inline(always)]
160    pub(crate) fn valid(&self, into: &mut impl Extend<Snapshot>) {
161        Self::valid_in_head(&self.head, into)
162    }
163
164    pub(crate) fn valid_at(&self, into: &mut impl Extend<Snapshot>, cfg: &ConfigureFile) {
165        let mut alternate_head = WriteHead {
166            cache: HeadCache { ..self.head.cache },
167            ..self.head
168        };
169
170        Self::configure_head(&mut alternate_head, cfg);
171        Self::valid_in_head(&alternate_head, into);
172    }
173
174    pub(crate) fn retain_at(&self, retain: &dyn super::RetainSnapshot, cfg: &ConfigureFile) {
175        let mut alternate_head = WriteHead {
176            cache: HeadCache { ..self.head.cache },
177            ..self.head
178        };
179
180        Self::configure_head(&mut alternate_head, cfg);
181        Self::retain_in_head(&alternate_head, retain);
182    }
183
184    pub(crate) fn entry_at(&self, idx: super::SnapshotIndex) -> Snapshot {
185        let snapshot = self.head.entry_at_relaxed(idx.entry);
186        core::sync::atomic::fence(Ordering::Acquire);
187        snapshot
188    }
189
190    fn valid_in_head(head: &WriteHead, into: &mut impl Extend<Snapshot>) {
191        struct Collector<T>(T);
192
193        impl<T, V> Collect<T> for Collector<&'_ mut V>
194        where
195            V: Extend<T>,
196        {
197            fn insert_one(&mut self, val: T) -> bool {
198                self.0.extend(core::iter::once(val));
199                true
200            }
201        }
202
203        // Relaxed ordering is enough since we're the only reader still.
204        head.iter_valid(&mut Collector(into), Ordering::Relaxed);
205    }
206
207    fn retain_in_head(head: &WriteHead, into: &dyn super::RetainSnapshot) {
208        struct Retain<'lt>(&'lt dyn super::RetainSnapshot);
209
210        impl Collect<Snapshot> for Retain<'_> {
211            fn insert_one(&mut self, val: Snapshot) -> bool {
212                self.0.contains(&val)
213            }
214        }
215
216        head.iter_valid(&mut Retain(into), Ordering::Relaxed);
217    }
218
219    pub(crate) fn read(&self, snapshot: &Snapshot, into: &mut [u8]) {
220        self.head.read(snapshot, into);
221    }
222
223    pub(crate) fn read_at(&self, snapshot: &Snapshot, into: &mut [u8], cfg: &ConfigureFile) {
224        let mut alternate_head = WriteHead {
225            cache: HeadCache { ..self.head.cache },
226            ..self.head
227        };
228
229        Self::configure_head(&mut alternate_head, cfg);
230        alternate_head.read(snapshot, into);
231    }
232
233    /// Construct this wrapper
234    pub(crate) fn from_map(file: MmapRaw) -> Self {
235        /// The head page we simulate if the file is too small to contain anything.
236        ///
237        /// The user will just notice that we can't write, but the construction itself won't fail.
238        /// That happens later when the head is converted to a writer and the caller selected some
239        /// minimum requirements. Here we just fulfill validity.
240        static FALLBACK_HEAD: HeadPage = HeadPage {
241            version: AtomicU64::new(ConfigureFile::MAGIC_VERSION),
242            entry_mask: AtomicU64::new(0),
243            page_mask: AtomicU64::new(0),
244            page_write_offset: AtomicU64::new(0),
245        };
246
247        let ptr = file.as_mut_ptr();
248        let len = file.len();
249
250        let head = if let Some(head) = unsafe { Self::map_all_raw(ptr, len) } {
251            // Safety: pointers returned are still in-bounds. By keeping `file` we also ensure that
252            // the mapping is kept in place. The types themselves are full atomics, meaning we do
253            // not have any uniqueness requirements on the pointer.
254            //
255            // The one scary part is the safety requirement of the pointee being initialized
256            // memory. We assume that this is the case for all memory mapped files, initializing
257            // pages to zero on faulty access.
258            unsafe {
259                WriteHead {
260                    cache: HeadCache::new(),
261                    meta: &*head.meta,
262                    sequence: &*head.sequence,
263                    data: &*head.data,
264                    tail: &[],
265                }
266            }
267        } else {
268            WriteHead {
269                cache: HeadCache::new(),
270                meta: &FALLBACK_HEAD,
271                data: &[],
272                sequence: &[],
273                tail: &[],
274            }
275        };
276
277        Head { head, file }
278    }
279
280    pub(crate) fn tail(&self) -> &'_ [AtomicU64] {
281        DataPage::as_slice_of_u64(self.head.tail)
282    }
283
284    /// Safety:
285    ///
286    /// Call promises that `ptr` points to an allocation valid for at least `len` bytes, that is
287    /// adding the len to the pointer must be in-bounds.
288    unsafe fn map_all_raw(ptr: *mut u8, len: usize) -> Option<HeadMapRaw> {
289        let tail_len = len.checked_sub(HeadPage::PAGE_SZ)?;
290        let tail = ptr.add(HeadPage::PAGE_SZ);
291
292        let sequence_ptr = tail as *const SequencePage;
293        let sequence_len = tail_len / core::mem::size_of::<SequencePage>();
294        let data_ptr = tail as *const DataPage;
295        let data_len = tail_len / core::mem::size_of::<DataPage>();
296
297        Some(HeadMapRaw {
298            meta: ptr as *const HeadPage,
299            sequence: core::ptr::slice_from_raw_parts(sequence_ptr, sequence_len),
300            data: core::ptr::slice_from_raw_parts(data_ptr, data_len),
301        })
302    }
303}
304
305impl ConfigureFile {
306    pub(crate) const MAGIC_VERSION: u64 = 0x96c2_a6f4b68519b3;
307
308    /// Is the configuration data complete?
309    pub fn is_initialized(&self) -> bool {
310        self.layout_version == Self::MAGIC_VERSION
311    }
312
313    /// Complete this configuration, if it is not already.
314    pub fn or_insert_with(&mut self, replace: impl FnOnce(&mut Self)) {
315        if !self.is_initialized() {
316            replace(self);
317            self.layout_version = ConfigureFile::MAGIC_VERSION;
318        }
319    }
320}
321
322impl Head {
323    pub(crate) fn write_with(
324        &mut self,
325        data: &[u8],
326        intermediate: &mut dyn FnMut(PreparedTransaction) -> bool,
327    ) -> Result<u64, ()> {
328        let mut entry = self.head.entry();
329        let Some(end_ptr) = entry.new_write_offset(data.len()) else {
330            return Err(());
331        };
332
333        entry.invalidate_heads(end_ptr);
334        entry.copy_from_slice(data);
335
336        if intermediate(PreparedTransaction {
337            offset: entry.offset,
338            length: entry.length,
339            tail: entry.head.tail,
340            head: entry.head,
341        }) {
342            Ok(entry.commit())
343        } else {
344            Err(())
345        }
346    }
347}
348
349impl WriteHead {
350    pub(crate) fn pre_configure_entries(&mut self, num: u64) {
351        assert!(num.next_power_of_two() == num);
352        self.cache.entry_mask = num - 1;
353    }
354
355    pub(crate) fn pre_configure_pages(&mut self, num: u64) {
356        assert!(num.next_power_of_two() == num);
357        self.cache.page_mask = num - 1;
358    }
359
360    pub(crate) fn pre_configure_write(&mut self, offset: u64) {
361        self.cache.page_write_offset = offset;
362    }
363
364    pub(crate) fn configure_pages(&mut self) {
365        assert_eq!(
366            core::mem::size_of::<DataPage>(),
367            core::mem::size_of::<SequencePage>()
368        );
369
370        let sequence: usize = (self.cache.entry_mask + 1)
371            .try_into()
372            .expect("Invalid configured entry mask");
373        let sequence = sequence.next_power_of_two();
374
375        let data: usize = (self.cache.page_mask + 1)
376            .try_into()
377            .expect("Invalid configured page mask");
378        let data = data.next_power_of_two();
379
380        let psequence = sequence / SequencePage::DATA_COUNT
381            + usize::from(sequence % SequencePage::DATA_COUNT != 0);
382        let pdata = data / core::mem::size_of::<DataPage>()
383            + usize::from(data % core::mem::size_of::<DataPage>() != 0);
384
385        self.sequence = &self.sequence[..psequence];
386        let (data, tail) = self.data[psequence..].split_at(pdata);
387        self.data = data;
388        self.tail = tail;
389
390        self.meta
391            .entry_mask
392            .store(self.cache.entry_mask, Ordering::Relaxed);
393        self.meta
394            .page_mask
395            .store(self.cache.page_mask, Ordering::Relaxed);
396        self.meta
397            .page_write_offset
398            .store(self.cache.page_write_offset, Ordering::Relaxed);
399
400        self.meta
401            .version
402            .store(ConfigureFile::MAGIC_VERSION, Ordering::Release);
403    }
404
405    pub(crate) fn entry(&mut self) -> Entry<'_> {
406        let index = self.cache.entry_write_offset;
407        let offset = self.cache.page_write_offset;
408        Entry {
409            head: self,
410            length: 0,
411            index,
412            offset,
413        }
414    }
415
416    pub(crate) fn iter_valid(&self, extend: &mut dyn Collect<Snapshot>, ordering: Ordering) {
417        // Always use the stored one. If we're iterating a pre-loaded file then this is the one
418        // stored from the previous run, or zeroed if new. If we're iterating over our current
419        // writer then we've previously written it, i.e. the ordering here is always good too, no
420        // matter which one is used precisely.
421        let max = self.meta.entry_mask.load(ordering);
422        let seqs = self.sequence.iter().flat_map(|seq| &seq.data);
423
424        for (idx, seq) in seqs.enumerate() {
425            if idx as u64 > max {
426                break;
427            }
428
429            let length = seq.length.load(ordering);
430
431            if length == 0 {
432                continue;
433            }
434
435            if !extend.insert_one(Snapshot {
436                length,
437                offset: seq.offset.load(ordering),
438            }) {
439                seq.length.store(0, ordering);
440            }
441        }
442    }
443
444    pub(crate) fn new_write_offset(&self, n: usize) -> Option<u64> {
445        let len = u64::try_from(n);
446        if let Some(len) = len.ok().filter(|&l| l <= self.cache.page_mask) {
447            Some(self.cache.page_write_offset.wrapping_add(len))
448        } else {
449            None
450        }
451    }
452
453    /// Invalidate all heads so that `n` bytes can be written.
454    pub(crate) fn invalidate_heads_to(&mut self, end: u64) {
455        let mut entry = self.cache.entry_read_offset;
456        let mut data = self.cache.page_read_offset;
457
458        loop {
459            if data >= end {
460                break;
461            }
462
463            // The entry write offset is ahead of the entry read offset.
464            if entry == self.cache.entry_write_offset {
465                data = end;
466                break;
467            }
468
469            let length = self.invalidate_at(entry);
470            entry = entry.wrapping_add(1);
471            data = data.wrapping_add(length);
472        }
473
474        self.cache.entry_read_offset = entry;
475        self.cache.page_read_offset = data;
476    }
477
478    pub(crate) fn copy_from_slice(&mut self, data: &[u8]) -> u64 {
479        let mut n = self.cache.page_write_offset;
480
481        for (&b, idx) in data.iter().zip(n..) {
482            self.write_at(idx, b);
483            n = n.wrapping_add(1);
484        }
485
486        let count = n.wrapping_sub(self.cache.page_write_offset);
487        self.cache.page_write_offset = n;
488        count
489    }
490
491    pub(crate) fn read(&self, snapshot: &Snapshot, into: &mut [u8]) {
492        for (b, offset) in into.iter_mut().zip(0..snapshot.length) {
493            let idx = snapshot.offset.wrapping_add(offset);
494            *b = self.read_at(idx);
495        }
496    }
497
498    fn get_entry_atomic(&self, idx: u64) -> &SequenceEntry {
499        let idx = (idx & self.cache.entry_mask) as usize;
500
501        let page = idx / SequencePage::DATA_COUNT;
502        let entry = idx % SequencePage::DATA_COUNT;
503
504        &self.sequence[page].data[entry]
505    }
506
507    fn invalidate_at(&mut self, idx: u64) -> u64 {
508        let entry = self.get_entry_atomic(idx);
509        entry.length.swap(0, Ordering::Relaxed)
510    }
511
512    fn insert_at(&mut self, idx: u64, snap: Snapshot) {
513        let entry = self.get_entry_atomic(idx);
514
515        entry.offset.store(snap.offset, Ordering::Release);
516        entry.length.store(snap.length, Ordering::Release);
517    }
518
519    fn entry_at_relaxed(&self, idx: u64) -> Snapshot {
520        let entry = self.get_entry_atomic(idx);
521
522        Snapshot {
523            offset: entry.offset.load(Ordering::Relaxed),
524            length: entry.length.load(Ordering::Relaxed),
525        }
526    }
527
528    fn idx_at(&self, idx: u64) -> (usize, usize, u32) {
529        let idx = idx & self.cache.page_mask;
530
531        let offset = idx % 8;
532        let idx = idx / 8;
533        let shift = 8 * offset;
534
535        let data_idx = idx as usize % DataPage::DATA_COUNT;
536        let page_idx = idx as usize / DataPage::DATA_COUNT;
537        (page_idx, data_idx, shift as u32)
538    }
539
540    fn write_at(&self, idx: u64, byte: u8) {
541        let (page_idx, data_idx, shift) = self.idx_at(idx);
542        let word = &self.data[page_idx].data[data_idx];
543        let mask = 0xffu64 << shift;
544
545        let old = word.load(Ordering::Relaxed) & !mask;
546        let new = old | (u64::from(byte) << shift);
547        word.store(new, Ordering::Relaxed);
548    }
549
550    fn read_at(&self, idx: u64) -> u8 {
551        let (page_idx, data_idx, shift) = self.idx_at(idx);
552
553        let word = &self.data[page_idx].data[data_idx];
554        let old = word.load(Ordering::Relaxed);
555
556        ((old >> shift) & 0xff) as u8
557    }
558}
559
560impl Entry<'_> {
561    /// Consume the entry, putting it into the sequence buffer.
562    pub(crate) fn commit(self) -> u64 {
563        let end = self.head.cache.page_write_offset;
564        self.head
565            .meta
566            .page_write_offset
567            .store(end, Ordering::Relaxed);
568
569        debug_assert!(
570            end.wrapping_sub(self.offset) >= self.length,
571            "Failed to reserve enough space in the data section for the entry, risking corrupted data with following writes"
572        );
573
574        self.head.insert_at(
575            self.index,
576            Snapshot {
577                length: self.length,
578                offset: self.offset,
579            },
580        );
581
582        self.index
583    }
584
585    pub(crate) fn new_write_offset(&self, n: usize) -> Option<u64> {
586        self.head.new_write_offset(n)
587    }
588
589    pub(crate) fn invalidate_heads(&mut self, end: u64) {
590        self.head.invalidate_heads_to(end);
591    }
592
593    pub(crate) fn copy_from_slice(&mut self, data: &[u8]) {
594        self.length += self.head.copy_from_slice(data);
595    }
596}
597
598impl<'lt> PreparedTransaction<'lt> {
599    pub fn replace(&mut self, data: &[u8]) {
600        assert!(
601            data.len() as u64 <= self.length,
602            "{} > {}",
603            data.len(),
604            self.length
605        );
606        let mut n = self.offset;
607
608        for (&b, idx) in data.iter().zip(n..) {
609            self.head.write_at(idx, b);
610            n = n.wrapping_add(1);
611        }
612    }
613
614    pub fn tail(&self) -> &'lt [AtomicU64] {
615        DataPage::as_slice_of_u64(self.tail)
616    }
617}
618
619pub(crate) struct HeadCache {
620    entry_mask: u64,
621    entry_read_offset: u64,
622    entry_write_offset: u64,
623    page_mask: u64,
624    page_write_offset: u64,
625    page_read_offset: u64,
626}
627
628impl HeadCache {
629    pub(crate) fn new() -> Self {
630        HeadCache {
631            entry_mask: 0,
632            entry_read_offset: 0,
633            entry_write_offset: 0,
634            page_mask: 0,
635            page_write_offset: 0,
636            page_read_offset: 0,
637        }
638    }
639}
640
641#[derive(Default)]
642pub(crate) struct HeadPage {
643    /// Magic 8-byte sequence, denoting the layout of this file and identifying it as shm-snapshot.
644    version: AtomicU64,
645    /// The mask to translate stream index to a specific descriptor offset.
646    entry_mask: AtomicU64,
647    /// The mask to translate stream offset to a data page offset.
648    page_mask: AtomicU64,
649    /// The stream offset of the next byte to write.
650    page_write_offset: AtomicU64,
651}
652
653impl HeadPage {
654    const PAGE_SZ: usize = 4096;
655}
656
657pub(crate) struct SequencePage {
658    data: [SequenceEntry; Self::DATA_COUNT],
659}
660
661struct SequenceEntry {
662    offset: AtomicU64,
663    length: AtomicU64,
664}
665
666impl Default for SequencePage {
667    fn default() -> Self {
668        SequencePage {
669            data: [0; Self::DATA_COUNT].map(|_i| SequenceEntry {
670                offset: AtomicU64::new(0),
671                length: AtomicU64::new(0),
672            }),
673        }
674    }
675}
676
677impl SequencePage {
678    // FIXME: I currently don't target 32-bit atomic targets. But if then this should depend on
679    // such a target choice. The code written should then also get another implementation, and
680    // `Writer` only access this by indirection.
681    const DATA_COUNT: usize = 4096 / 16;
682}
683
684pub struct DataPage {
685    pub data: [AtomicU64; Self::DATA_COUNT],
686}
687
688impl DataPage {
689    // One AtomicU64 per entry dividing the page.
690    const DATA_COUNT: usize = 4096 / 8;
691
692    pub fn as_slice_of_u64(this: &[DataPage]) -> &[AtomicU64] {
693        let count = Self::DATA_COUNT * this.len();
694        unsafe { &*core::ptr::slice_from_raw_parts(this.as_ptr() as *const AtomicU64, count) }
695    }
696}
697
698impl Default for DataPage {
699    fn default() -> Self {
700        DataPage {
701            data: [0; Self::DATA_COUNT].map(|_i| AtomicU64::new(0)),
702        }
703    }
704}