1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use crate::serialization::{Addr, SerializationSink};
use std::error::Error;
use std::fs;
use std::io::{Write};
use std::path::Path;
use parking_lot::Mutex;

pub struct FileSerializationSink {
    data: Mutex<Inner>,
}

struct Inner {
    file: fs::File,
    buffer: Vec<u8>,
    buf_pos: usize,
    addr: u32,
}

impl SerializationSink for FileSerializationSink {
    fn from_path(path: &Path) -> Result<Self, Box<dyn Error>> {
        fs::create_dir_all(path.parent().unwrap())?;

        let file = fs::File::create(path)?;

        Ok(FileSerializationSink {
            data: Mutex::new(Inner {
                file,
                buffer: vec![0; 1024*512],
                buf_pos: 0,
                addr: 0
            }),
        })
    }

    #[inline]
    fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
    where
        W: FnOnce(&mut [u8]),
    {
        let mut data = self.data.lock();
        let Inner {
            ref mut file,
            ref mut buffer,
            ref mut buf_pos,
            ref mut addr
        } = *data;

        let curr_addr = *addr;
        *addr += num_bytes as u32;

        let buf_start = *buf_pos;
        let buf_end = buf_start + num_bytes;

        if buf_end <= buffer.len() {
            // We have enough space in the buffer, just write the data to it.
            write(&mut buffer[buf_start .. buf_end]);
            *buf_pos = buf_end;
        } else {
            // We don't have enough space in the buffer, so flush to disk
            file.write_all(&buffer[..buf_start]).unwrap();

            if num_bytes <= buffer.len() {
                // There's enough space in the buffer, after flushing
                write(&mut buffer[0 .. num_bytes]);
                *buf_pos = num_bytes;
            } else {
                // Even after flushing the buffer there isn't enough space, so
                // fall back to dynamic allocation
                let mut temp_buffer = vec![0; num_bytes];
                write(&mut temp_buffer[..]);
                file.write_all(&temp_buffer[..]).unwrap();
                *buf_pos = 0;
            }
        }

        Addr(curr_addr)
    }
}

impl Drop for FileSerializationSink {
    fn drop(&mut self) {
        let mut data = self.data.lock();
        let Inner {
            ref mut file,
            ref mut buffer,
            ref mut buf_pos,
            addr: _,
        } = *data;

        if *buf_pos > 0 {
            file.write_all(&buffer[..*buf_pos]).unwrap();
        }
    }
}