parallel_processor/buckets/writers/
compressed_binary_writer.rs

1use crate::buckets::writers::{finalize_bucket_file, initialize_bucket_file, THREADS_BUSY_WRITING};
2use crate::buckets::{CheckpointData, LockFreeBucket};
3use crate::memory_data_size::MemoryDataSize;
4use crate::memory_fs::file::flush::GlobalFlush;
5use crate::memory_fs::file::internal::MemoryFileMode;
6use crate::memory_fs::file::reader::FileRangeReference;
7use crate::memory_fs::file::writer::FileWriter;
8use crate::utils::memory_size_to_log2;
9use crate::DEFAULT_BINCODE_CONFIG;
10use bincode::Encode;
11use lz4::{BlockMode, BlockSize, ContentChecksum};
12use mt_debug_counters::counter::AtomicCounterGuardSum;
13use parking_lot::Mutex;
14use replace_with::replace_with_or_abort;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17
18use super::BucketHeader;
19
20pub const COMPRESSED_BUCKET_MAGIC: &[u8; 16] = b"CPLZ4_INTR_BKT_M";
21
22#[derive(Clone)]
23pub struct CompressedCheckpointSize(u8);
24impl CompressedCheckpointSize {
25    pub const fn new_from_size(size: MemoryDataSize) -> Self {
26        Self(memory_size_to_log2(size))
27    }
28    pub const fn new_from_log2(val: u8) -> Self {
29        Self(val)
30    }
31}
32
33fn create_lz4_stream<W: Write>(writer: W, level: CompressionLevelInfo) -> lz4::Encoder<W> {
34    let (queue_occupation, queue_size) = GlobalFlush::global_queue_occupation();
35
36    let level = if queue_size < 2 * queue_occupation {
37        level.slow_disk
38    } else {
39        level.fast_disk
40    };
41
42    lz4::EncoderBuilder::new()
43        .level(level)
44        .checksum(ContentChecksum::NoChecksum)
45        .block_mode(BlockMode::Linked)
46        .block_size(BlockSize::Max64KB)
47        .build(writer)
48        .unwrap()
49}
50
51struct CompressedBinaryWriterInternal {
52    writer: lz4::Encoder<FileWriter>,
53    checkpoint_max_size: u64,
54    checkpoints: Vec<CheckpointData>,
55    current_chunk_size: u64,
56    level: CompressionLevelInfo,
57    data_format_info: Vec<u8>,
58
59    checkpoint_data: Option<Vec<u8>>,
60}
61
62pub struct CompressedBinaryWriter {
63    inner: Mutex<CompressedBinaryWriterInternal>,
64    path: PathBuf,
65}
66unsafe impl Send for CompressedBinaryWriter {}
67
68impl CompressedBinaryWriterInternal {
69    fn create_new_block(&mut self, passtrough_range: Option<FileRangeReference>) {
70        replace_with_or_abort(&mut self.writer, |writer| {
71            let (file_buf, res) = writer.finish();
72            res.unwrap();
73
74            let checkpoint_pos = if let Some(passtrough_range) = passtrough_range {
75                // Add an optional passtrough block
76                self.checkpoints.push(CheckpointData {
77                    offset: file_buf.len() as u64,
78                    data: self.checkpoint_data.clone(),
79                });
80
81                unsafe {
82                    passtrough_range.copy_to_unsync(&file_buf);
83                }
84
85                file_buf.len()
86            } else {
87                file_buf.len()
88            };
89
90            self.checkpoints.push(CheckpointData {
91                offset: checkpoint_pos as u64,
92                data: self.checkpoint_data.clone(),
93            });
94
95            create_lz4_stream(file_buf, self.level)
96        });
97        self.current_chunk_size = 0;
98    }
99}
100
101impl CompressedBinaryWriter {
102    pub const CHECKPOINT_SIZE_UNLIMITED: CompressedCheckpointSize =
103        CompressedCheckpointSize::new_from_log2(62);
104}
105
106#[derive(Copy, Clone)]
107pub struct CompressionLevelInfo {
108    pub fast_disk: u32,
109    pub slow_disk: u32,
110}
111
112impl LockFreeBucket for CompressedBinaryWriter {
113    type InitData = (
114        MemoryFileMode,
115        CompressedCheckpointSize,
116        CompressionLevelInfo,
117    );
118
119    fn new_serialized_data_format(
120        path_prefix: &Path,
121        (file_mode, checkpoint_max_size, compression_level): &(
122            MemoryFileMode,
123            CompressedCheckpointSize,
124            CompressionLevelInfo,
125        ),
126        index: usize,
127        data_format_info: &[u8],
128    ) -> Self {
129        assert!(
130            data_format_info.len() <= BucketHeader::MAX_DATA_FORMAT_INFO_SIZE,
131            "Serialized data format info is too big, this is a bug"
132        );
133
134        let path = path_prefix.parent().unwrap().join(format!(
135            "{}.{}",
136            path_prefix.file_name().unwrap().to_str().unwrap(),
137            index
138        ));
139
140        let mut file = FileWriter::create(&path, *file_mode);
141
142        let first_checkpoint = initialize_bucket_file(&mut file);
143
144        let writer = create_lz4_stream(file, *compression_level);
145
146        Self {
147            inner: Mutex::new(CompressedBinaryWriterInternal {
148                writer,
149                checkpoint_max_size: (1 << checkpoint_max_size.0),
150                checkpoints: vec![CheckpointData {
151                    offset: first_checkpoint,
152                    data: None,
153                }],
154                current_chunk_size: 0,
155                level: *compression_level,
156                data_format_info: data_format_info.to_vec(),
157                checkpoint_data: None,
158            }),
159            path,
160        }
161    }
162
163    fn set_checkpoint_data<T: Encode>(
164        &self,
165        data: Option<&T>,
166        passtrough_range: Option<FileRangeReference>,
167    ) {
168        let mut inner = self.inner.lock();
169        inner.checkpoint_data =
170            data.map(|data| bincode::encode_to_vec(data, DEFAULT_BINCODE_CONFIG).unwrap());
171        // Always create a new block on checkpoint data change
172        inner.create_new_block(passtrough_range);
173    }
174
175    fn get_bucket_size(&self) -> u64 {
176        self.inner.lock().writer.writer().len()
177    }
178
179    fn write_data(&self, bytes: &[u8]) -> u64 {
180        let stat_raii = AtomicCounterGuardSum::new(&THREADS_BUSY_WRITING, 1);
181        //
182        let mut inner = self.inner.lock();
183
184        inner.writer.write_all(bytes).unwrap();
185        inner.current_chunk_size += bytes.len() as u64;
186        if inner.current_chunk_size > inner.checkpoint_max_size {
187            inner.create_new_block(None);
188        }
189
190        drop(stat_raii);
191        inner.writer.writer().len()
192    }
193
194    fn get_path(&self) -> PathBuf {
195        self.path.clone()
196    }
197    fn finalize(self) {
198        let mut inner = self.inner.into_inner();
199
200        inner.writer.flush().unwrap();
201        let (file, res) = inner.writer.finish();
202        res.unwrap();
203
204        finalize_bucket_file(
205            file,
206            COMPRESSED_BUCKET_MAGIC,
207            inner.checkpoints,
208            &inner.data_format_info,
209        );
210    }
211}