parallel_processor/buckets/writers/
lock_free_binary_writer.rs

1use crate::buckets::writers::{
2    finalize_bucket_file, initialize_bucket_file, BucketHeader, THREADS_BUSY_WRITING,
3};
4use crate::buckets::{CheckpointData, LockFreeBucket};
5use crate::memory_data_size::MemoryDataSize;
6use crate::memory_fs::file::internal::MemoryFileMode;
7use crate::memory_fs::file::reader::FileRangeReference;
8use crate::memory_fs::file::writer::FileWriter;
9use crate::utils::memory_size_to_log2;
10use mt_debug_counters::counter::AtomicCounterGuardSum;
11use parking_lot::Mutex;
12use serde::Serialize;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16pub const LOCK_FREE_BUCKET_MAGIC: &[u8; 16] = b"PLAIN_INTR_BKT_M";
17
18#[derive(Clone)]
19pub struct LockFreeCheckpointSize(u8);
20impl LockFreeCheckpointSize {
21    pub const fn new_from_size(size: MemoryDataSize) -> Self {
22        Self(memory_size_to_log2(size))
23    }
24    pub const fn new_from_log2(val: u8) -> Self {
25        Self(val)
26    }
27}
28
29pub struct LockFreeBinaryWriter {
30    writer: FileWriter,
31    checkpoint_max_size_log2: u8,
32    checkpoints: Mutex<Vec<CheckpointData>>,
33    checkpoint_data: Mutex<Option<Vec<u8>>>,
34
35    file_size: AtomicU64,
36    data_format_info: Vec<u8>,
37}
38unsafe impl Send for LockFreeBinaryWriter {}
39
40impl LockFreeBinaryWriter {
41    pub const CHECKPOINT_SIZE_UNLIMITED: LockFreeCheckpointSize =
42        LockFreeCheckpointSize::new_from_log2(62);
43}
44
45impl LockFreeBucket for LockFreeBinaryWriter {
46    type InitData = (MemoryFileMode, LockFreeCheckpointSize);
47
48    fn new_serialized_data_format(
49        path_prefix: &Path,
50        (file_mode, checkpoint_max_size): &(MemoryFileMode, LockFreeCheckpointSize),
51        index: usize,
52        data_format_info: &[u8],
53    ) -> Self {
54        assert!(
55            data_format_info.len() <= BucketHeader::MAX_DATA_FORMAT_INFO_SIZE,
56            "Serialized data format info is too big, this is a bug"
57        );
58
59        let path = path_prefix.parent().unwrap().join(format!(
60            "{}.{}",
61            path_prefix.file_name().unwrap().to_str().unwrap(),
62            index
63        ));
64
65        let mut writer = FileWriter::create(path, *file_mode);
66
67        let first_checkpoint = initialize_bucket_file(&mut writer);
68
69        Self {
70            writer,
71            checkpoint_max_size_log2: checkpoint_max_size.0,
72            checkpoints: Mutex::new(vec![CheckpointData {
73                offset: first_checkpoint,
74                data: None,
75            }]),
76            file_size: AtomicU64::new(0),
77            data_format_info: data_format_info.to_vec(),
78            checkpoint_data: Mutex::new(None),
79        }
80    }
81
82    fn set_checkpoint_data<T: Serialize>(
83        &self,
84        data: Option<&T>,
85        passtrough_range: Option<FileRangeReference>,
86    ) {
87        let data = data.map(|data| bincode::serialize(data).unwrap());
88        *self.checkpoint_data.lock() = data.clone();
89        // Always create a new block on checkpoint data change
90
91        if let Some(passtrough_range) = passtrough_range {
92            let position = self.writer.write_all_parallel(&[], 1);
93            self.checkpoints.lock().push(CheckpointData {
94                offset: position as u64,
95                data: data.clone(),
96            });
97            unsafe {
98                // TODO: Be sure that the file has exclusive access
99                passtrough_range.copy_to_unsync(&self.writer);
100            }
101        }
102
103        let position = self.writer.write_all_parallel(&[], 1);
104        self.checkpoints.lock().push(CheckpointData {
105            offset: position as u64,
106            data,
107        });
108    }
109
110    fn write_data(&self, bytes: &[u8]) {
111        let stat_raii = AtomicCounterGuardSum::new(&THREADS_BUSY_WRITING, 1);
112
113        // let _lock = self.checkpoints.lock();
114        let position = self.writer.write_all_parallel(bytes, 1);
115
116        let old_size = self
117            .file_size
118            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
119        if old_size >> self.checkpoint_max_size_log2
120            != (old_size + bytes.len() as u64) >> self.checkpoint_max_size_log2
121        {
122            self.checkpoints.lock().push(CheckpointData {
123                offset: position as u64,
124                data: self.checkpoint_data.lock().clone(),
125            });
126        }
127
128        drop(stat_raii);
129    }
130
131    fn get_path(&self) -> PathBuf {
132        self.writer.get_path()
133    }
134    fn finalize(self) {
135        finalize_bucket_file(
136            self.writer,
137            LOCK_FREE_BUCKET_MAGIC,
138            {
139                let mut checkpoints = self.checkpoints.into_inner();
140                checkpoints.sort();
141                checkpoints.dedup();
142                checkpoints
143            },
144            &self.data_format_info,
145        );
146    }
147}