measureme_mirror/
serialization.rs

1/// This module implements the "container" file format that `measureme` uses for
2/// storing things on disk. The format supports storing three independent
3/// streams of data: one for events, one for string data, and one for string
4/// index data (in theory it could support an arbitrary number of separate
5/// streams but three is all we need). The data of each stream is split into
6/// "pages", where each page has a small header designating what kind of
7/// data it is (i.e. event, string data, or string index), and the length of
8/// the page.
9///
10/// Pages of different kinds can be arbitrarily interleaved. The headers allow
11/// for reconstructing each of the streams later on. An example file might thus
12/// look like this:
13///
14/// ```ignore
15/// | file header | page (events) | page (string data) | page (events) | page (string index) |
16/// ```
17///
18/// The exact encoding of a page is:
19///
20/// | byte slice              | contents                                |
21/// |-------------------------|-----------------------------------------|
22/// | &[0 .. 1]               | page tag                                |
23/// | &[1 .. 5]               | page size as little endian u32          |
24/// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |
25///
26/// A page is immediately followed by the next page, without any padding.
27use parking_lot::Mutex;
28use rustc_hash::FxHashMap;
29use std::cmp::min;
30use std::convert::TryInto;
31use std::error::Error;
32use std::fmt::Debug;
33use std::fs;
34use std::io::Write;
35use std::sync::Arc;
36
37const MAX_PAGE_SIZE: usize = 256 * 1024;
38
39/// The number of bytes we consider enough to warrant their own page when
40/// deciding whether to flush a partially full buffer. Actual pages may need
41/// to be smaller, e.g. when writing the tail of the data stream.
42const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;
43
44#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
45#[repr(u8)]
46pub enum PageTag {
47    Events = 0,
48    StringData = 1,
49    StringIndex = 2,
50}
51
52impl std::convert::TryFrom<u8> for PageTag {
53    type Error = String;
54
55    fn try_from(value: u8) -> Result<Self, Self::Error> {
56        match value {
57            0 => Ok(PageTag::Events),
58            1 => Ok(PageTag::StringData),
59            2 => Ok(PageTag::StringIndex),
60            _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),
61        }
62    }
63}
64
65/// An address within a data stream. Each data stream has its own address space,
66/// i.e. the first piece of data written to the events stream will have
67/// `Addr(0)` and the first piece of data written to the string data stream
68/// will *also* have `Addr(0)`.
69//
70// TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to
71//       prevent accidental use of `Addr` values with the wrong address space.
72#[derive(Clone, Copy, Eq, PartialEq, Debug)]
73pub struct Addr(pub u64);
74
75impl Addr {
76    pub fn as_usize(self) -> usize {
77        self.0 as usize
78    }
79}
80
81#[derive(Debug)]
82pub struct SerializationSink {
83    shared_state: SharedState,
84    data: Mutex<SerializationSinkInner>,
85    page_tag: PageTag,
86}
87
88pub struct SerializationSinkBuilder(SharedState);
89
90impl SerializationSinkBuilder {
91    pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {
92        Ok(Self(SharedState(Arc::new(Mutex::new(
93            BackingStorage::File(file),
94        )))))
95    }
96
97    pub fn new_in_memory() -> SerializationSinkBuilder {
98        Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(
99            Vec::new(),
100        )))))
101    }
102
103    pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {
104        SerializationSink {
105            data: Mutex::new(SerializationSinkInner {
106                buffer: Vec::with_capacity(MAX_PAGE_SIZE),
107                addr: 0,
108            }),
109            shared_state: self.0.clone(),
110            page_tag,
111        }
112    }
113}
114
115/// The `BackingStorage` is what the data gets written to. Usually that is a
116/// file but for testing purposes it can also be an in-memory vec of bytes.
117#[derive(Debug)]
118enum BackingStorage {
119    File(fs::File),
120    Memory(Vec<u8>),
121}
122
123impl Write for BackingStorage {
124    #[inline]
125    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
126        match *self {
127            BackingStorage::File(ref mut file) => file.write(buf),
128            BackingStorage::Memory(ref mut vec) => vec.write(buf),
129        }
130    }
131
132    fn flush(&mut self) -> std::io::Result<()> {
133        match *self {
134            BackingStorage::File(ref mut file) => file.flush(),
135            BackingStorage::Memory(_) => {
136                // Nothing to do
137                Ok(())
138            }
139        }
140    }
141}
142
143/// This struct allows to treat `SerializationSink` as `std::io::Write`.
144pub struct StdWriteAdapter<'a>(&'a SerializationSink);
145
146impl<'a> Write for StdWriteAdapter<'a> {
147    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
148        self.0.write_bytes_atomic(buf);
149        Ok(buf.len())
150    }
151
152    fn flush(&mut self) -> std::io::Result<()> {
153        let mut data = self.0.data.lock();
154        let SerializationSinkInner {
155            ref mut buffer,
156            addr: _,
157        } = *data;
158
159        // First flush the local buffer.
160        self.0.flush(buffer);
161
162        // Then flush the backing store.
163        self.0.shared_state.0.lock().flush()?;
164
165        Ok(())
166    }
167}
168
169#[derive(Debug)]
170struct SerializationSinkInner {
171    buffer: Vec<u8>,
172    addr: u64,
173}
174
175/// This state is shared between all `SerializationSink`s writing to the same
176/// backing storage (e.g. the same file).
177#[derive(Clone, Debug)]
178struct SharedState(Arc<Mutex<BackingStorage>>);
179
180impl SharedState {
181    /// Copies out the contents of all pages with the given tag and
182    /// concatenates them into a single byte vec. This method is only meant to
183    /// be used for testing and will panic if the underlying backing storage is
184    /// a file instead of in memory.
185    fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {
186        let data = self.0.lock();
187        let data = match *data {
188            BackingStorage::File(_) => panic!(),
189            BackingStorage::Memory(ref data) => data,
190        };
191
192        split_streams(data).remove(&page_tag).unwrap_or(Vec::new())
193    }
194}
195
196/// This function reconstructs the individual data streams from their paged
197/// version.
198///
199/// For example, if `E` denotes the page header of an events page, `S` denotes
200/// the header of a string data page, and lower case letters denote page
201/// contents then a paged stream could look like:
202///
203/// ```ignore
204/// s = Eabcd_Sopq_Eef_Eghi_Srst
205/// ```
206///
207/// and `split_streams` would result in the following set of streams:
208///
209/// ```ignore
210/// split_streams(s) = {
211///     events: [abcdefghi],
212///     string_data: [opqrst],
213/// }
214/// ```
215pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {
216    let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();
217
218    let mut pos = 0;
219    while pos < paged_data.len() {
220        let tag = TryInto::try_into(paged_data[pos]).unwrap();
221        let page_size =
222            u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;
223
224        assert!(page_size > 0);
225
226        result
227            .entry(tag)
228            .or_default()
229            .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);
230
231        pos += page_size + 5;
232    }
233
234    result
235}
236
237impl SerializationSink {
238    /// Writes `bytes` as a single page to the shared backing storage. The
239    /// method will first write the page header (consisting of the page tag and
240    /// the number of bytes in the page) and then the page contents
241    /// (i.e. `bytes`).
242    fn write_page(&self, bytes: &[u8]) {
243        if bytes.len() > 0 {
244            // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because
245            // `MIN_PAGE_SIZE` is just a recommendation and the last page will
246            // often be smaller than that.
247            assert!(bytes.len() <= MAX_PAGE_SIZE);
248
249            let mut file = self.shared_state.0.lock();
250
251            file.write_all(&[self.page_tag as u8]).unwrap();
252
253            let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();
254            file.write_all(&page_size).unwrap();
255            file.write_all(&bytes[..]).unwrap();
256        }
257    }
258
259    /// Flushes `buffer` by writing its contents as a new page to the backing
260    /// storage and then clearing it.
261    fn flush(&self, buffer: &mut Vec<u8>) {
262        self.write_page(&buffer[..]);
263        buffer.clear();
264    }
265
266    /// Creates a copy of all data written so far. This method is meant to be
267    /// used for writing unit tests. It will panic if the underlying
268    /// `BackingStorage` is a file.
269    pub fn into_bytes(mut self) -> Vec<u8> {
270        // Swap out the contains of `self` with something that can safely be
271        // dropped without side effects.
272        let mut data = Mutex::new(SerializationSinkInner {
273            buffer: Vec::new(),
274            addr: 0,
275        });
276        std::mem::swap(&mut self.data, &mut data);
277
278        // Extract the data from the mutex.
279        let SerializationSinkInner {
280            ref mut buffer,
281            addr: _,
282        } = data.into_inner();
283
284        // Make sure we write the current contents of the buffer to the
285        // backing storage before proceeding.
286        self.flush(buffer);
287
288        self.shared_state.copy_bytes_with_page_tag(self.page_tag)
289    }
290
291    /// Atomically writes `num_bytes` of data to this `SerializationSink`.
292    /// Atomic means the data is guaranteed to be written as a contiguous range
293    /// of bytes.
294    ///
295    /// The buffer provided to the `write` callback is guaranteed to be of size
296    /// `num_bytes` and `write` is supposed to completely fill it with the data
297    /// to be written.
298    ///
299    /// The return value is the address of the data written and can be used to
300    /// refer to the data later on.
301    pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
302    where
303        W: FnOnce(&mut [u8]),
304    {
305        if num_bytes > MAX_PAGE_SIZE {
306            let mut bytes = vec![0u8; num_bytes];
307            write(&mut bytes[..]);
308            return self.write_bytes_atomic(&bytes[..]);
309        }
310
311        let mut data = self.data.lock();
312        let SerializationSinkInner {
313            ref mut buffer,
314            ref mut addr,
315        } = *data;
316
317        if buffer.len() + num_bytes > MAX_PAGE_SIZE {
318            self.flush(buffer);
319            assert!(buffer.is_empty());
320        }
321
322        let curr_addr = *addr;
323
324        let buf_start = buffer.len();
325        let buf_end = buf_start + num_bytes;
326        buffer.resize(buf_end, 0u8);
327        write(&mut buffer[buf_start..buf_end]);
328
329        *addr += num_bytes as u64;
330
331        Addr(curr_addr)
332    }
333
334    /// Atomically writes the data in `bytes` to this `SerializationSink`.
335    /// Atomic means the data is guaranteed to be written as a contiguous range
336    /// of bytes.
337    ///
338    /// This method may perform better than `write_atomic` because it may be
339    /// able to skip the sink's internal buffer. Use this method if the data to
340    /// be written is already available as a `&[u8]`.
341    ///
342    /// The return value is the address of the data written and can be used to
343    /// refer to the data later on.
344    pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
345        // For "small" data we go to the buffered version immediately.
346        if bytes.len() <= 128 {
347            return self.write_atomic(bytes.len(), |sink| {
348                sink.copy_from_slice(bytes);
349            });
350        }
351
352        let mut data = self.data.lock();
353        let SerializationSinkInner {
354            ref mut buffer,
355            ref mut addr,
356        } = *data;
357
358        let curr_addr = Addr(*addr);
359        *addr += bytes.len() as u64;
360
361        let mut bytes_left = bytes;
362
363        // Do we have too little data in the buffer? If so, fill up the buffer
364        // to the minimum page size.
365        if buffer.len() < MIN_PAGE_SIZE {
366            let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());
367            buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);
368            bytes_left = &bytes_left[num_bytes_to_take..];
369        }
370
371        if bytes_left.is_empty() {
372            return curr_addr;
373        }
374
375        // Make sure we flush the buffer before writing out any other pages.
376        self.flush(buffer);
377
378        for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {
379            if chunk.len() == MAX_PAGE_SIZE {
380                // This chunk has the maximum size. It might or might not be the
381                // last one. In either case we want to write it to disk
382                // immediately because there is no reason to copy it to the
383                // buffer first.
384                self.write_page(chunk);
385            } else {
386                // This chunk is less than the chunk size that we requested, so
387                // it must be the last one. If it is big enough to warrant its
388                // own page, we write it to disk immediately. Otherwise, we copy
389                // it to the buffer.
390                if chunk.len() >= MIN_PAGE_SIZE {
391                    self.write_page(chunk);
392                } else {
393                    debug_assert!(buffer.is_empty());
394                    buffer.extend_from_slice(chunk);
395                }
396            }
397        }
398
399        curr_addr
400    }
401
402    pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {
403        StdWriteAdapter(self)
404    }
405}
406
407impl Drop for SerializationSink {
408    fn drop(&mut self) {
409        let mut data = self.data.lock();
410        let SerializationSinkInner {
411            ref mut buffer,
412            addr: _,
413        } = *data;
414
415        self.flush(buffer);
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    // This function writes `chunk_count` byte-slices of size `chunk_size` to
424    // three `SerializationSinks` that all map to the same underlying stream,
425    // so we get interleaved pages with different tags.
426    // It then extracts the data out again and asserts that it is the same as
427    // has been written.
428    fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)
429    where
430        W: Fn(&SerializationSink, &[u8]) -> Addr,
431    {
432        let sink_builder = SerializationSinkBuilder::new_in_memory();
433        let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];
434        let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();
435
436        {
437            let sinks: Vec<SerializationSink> =
438                tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();
439
440            for chunk_index in 0..chunk_count {
441                let expected_addr = Addr((chunk_index * chunk_size) as u64);
442                for sink in sinks.iter() {
443                    assert_eq!(write(sink, &expected_chunk[..]), expected_addr);
444                }
445            }
446        }
447
448        let streams: Vec<Vec<u8>> = tags
449            .iter()
450            .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))
451            .collect();
452
453        for stream in streams {
454            for chunk in stream.chunks(chunk_size) {
455                assert_eq!(chunk, expected_chunk);
456            }
457        }
458    }
459
460    fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {
461        sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))
462    }
463
464    fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {
465        sink.write_bytes_atomic(bytes)
466    }
467
468    // Creates two roundtrip tests, one using `SerializationSink::write_atomic`
469    // and one using `SerializationSink::write_bytes_atomic`.
470    macro_rules! mk_roundtrip_test {
471        ($name:ident, $chunk_size:expr, $chunk_count:expr) => {
472            mod $name {
473                use super::*;
474
475                #[test]
476                fn write_atomic() {
477                    test_roundtrip($chunk_size, $chunk_count, write_closure);
478                }
479
480                #[test]
481                fn write_bytes_atomic() {
482                    test_roundtrip($chunk_size, $chunk_count, write_slice);
483                }
484            }
485        };
486    }
487
488    mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);
489    mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);
490
491    mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);
492    mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);
493    mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);
494
495    mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);
496    mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);
497    mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);
498}