parallel_processor/buckets/writers/
lock_free_binary_writer.rs

1use crate::buckets::writers::{
2    finalize_bucket_file, initialize_bucket_file, BucketHeader, THREADS_BUSY_WRITING,
3};
4use crate::buckets::{CheckpointData, LockFreeBucket};
5use crate::memory_data_size::MemoryDataSize;
6use crate::memory_fs::file::internal::MemoryFileMode;
7use crate::memory_fs::file::reader::FileRangeReference;
8use crate::memory_fs::file::writer::FileWriter;
9use crate::utils::memory_size_to_log2;
10use crate::DEFAULT_BINCODE_CONFIG;
11use bincode::Encode;
12use mt_debug_counters::counter::AtomicCounterGuardSum;
13use parking_lot::Mutex;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::{AtomicU64, Ordering};
16
17pub const LOCK_FREE_BUCKET_MAGIC: &[u8; 16] = b"PLAIN_INTR_BKT_M";
18
19#[derive(Clone)]
20pub struct LockFreeCheckpointSize(u8);
21impl LockFreeCheckpointSize {
22    pub const fn new_from_size(size: MemoryDataSize) -> Self {
23        Self(memory_size_to_log2(size))
24    }
25    pub const fn new_from_log2(val: u8) -> Self {
26        Self(val)
27    }
28}
29
30pub struct LockFreeBinaryWriter {
31    writer: FileWriter,
32    checkpoint_max_size_log2: u8,
33    checkpoints: Mutex<Vec<CheckpointData>>,
34    checkpoint_data: Mutex<Option<Vec<u8>>>,
35
36    file_size: AtomicU64,
37    data_format_info: Vec<u8>,
38}
39unsafe impl Send for LockFreeBinaryWriter {}
40
41impl LockFreeBinaryWriter {
42    pub const CHECKPOINT_SIZE_UNLIMITED: LockFreeCheckpointSize =
43        LockFreeCheckpointSize::new_from_log2(62);
44}
45
46impl LockFreeBucket for LockFreeBinaryWriter {
47    type InitData = (MemoryFileMode, LockFreeCheckpointSize);
48
49    fn new_serialized_data_format(
50        path_prefix: &Path,
51        (file_mode, checkpoint_max_size): &(MemoryFileMode, LockFreeCheckpointSize),
52        index: usize,
53        data_format_info: &[u8],
54    ) -> Self {
55        assert!(
56            data_format_info.len() <= BucketHeader::MAX_DATA_FORMAT_INFO_SIZE,
57            "Serialized data format info is too big, this is a bug"
58        );
59
60        let path = path_prefix.parent().unwrap().join(format!(
61            "{}.{}",
62            path_prefix.file_name().unwrap().to_str().unwrap(),
63            index
64        ));
65
66        let mut writer = FileWriter::create(path, *file_mode);
67
68        let first_checkpoint = initialize_bucket_file(&mut writer);
69
70        Self {
71            writer,
72            checkpoint_max_size_log2: checkpoint_max_size.0,
73            checkpoints: Mutex::new(vec![CheckpointData {
74                offset: first_checkpoint,
75                data: None,
76            }]),
77            file_size: AtomicU64::new(0),
78            data_format_info: data_format_info.to_vec(),
79            checkpoint_data: Mutex::new(None),
80        }
81    }
82
83    fn set_checkpoint_data<T: Encode>(
84        &self,
85        data: Option<&T>,
86        passtrough_range: Option<FileRangeReference>,
87    ) {
88        let data = data.map(|data| bincode::encode_to_vec(data, DEFAULT_BINCODE_CONFIG).unwrap());
89        *self.checkpoint_data.lock() = data.clone();
90        // Always create a new block on checkpoint data change
91
92        if let Some(passtrough_range) = passtrough_range {
93            let position = self.writer.write_all_parallel(&[], 1);
94            self.checkpoints.lock().push(CheckpointData {
95                offset: position as u64,
96                data: data.clone(),
97            });
98            unsafe {
99                // TODO: Be sure that the file has exclusive access
100                passtrough_range.copy_to_unsync(&self.writer);
101            }
102        }
103
104        let position = self.writer.write_all_parallel(&[], 1);
105        self.checkpoints.lock().push(CheckpointData {
106            offset: position as u64,
107            data,
108        });
109    }
110
111    fn get_bucket_size(&self) -> u64 {
112        self.file_size.load(Ordering::Relaxed)
113    }
114
115    fn write_data(&self, bytes: &[u8]) -> u64 {
116        let stat_raii = AtomicCounterGuardSum::new(&THREADS_BUSY_WRITING, 1);
117
118        // let _lock = self.checkpoints.lock();
119        let position = self.writer.write_all_parallel(bytes, 1);
120
121        let old_size = self
122            .file_size
123            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
124        if old_size >> self.checkpoint_max_size_log2
125            != (old_size + bytes.len() as u64) >> self.checkpoint_max_size_log2
126        {
127            self.checkpoints.lock().push(CheckpointData {
128                offset: position as u64,
129                data: self.checkpoint_data.lock().clone(),
130            });
131        }
132
133        drop(stat_raii);
134
135        old_size + bytes.len() as u64
136    }
137
138    fn get_path(&self) -> PathBuf {
139        self.writer.get_path()
140    }
141    fn finalize(self) {
142        finalize_bucket_file(
143            self.writer,
144            LOCK_FREE_BUCKET_MAGIC,
145            {
146                let mut checkpoints = self.checkpoints.into_inner();
147                checkpoints.sort();
148                checkpoints.dedup();
149                checkpoints
150            },
151            &self.data_format_info,
152        );
153    }
154}