parallel_processor/buckets/writers/
compressed_binary_writer.rs

1use crate::buckets::writers::{finalize_bucket_file, initialize_bucket_file, THREADS_BUSY_WRITING};
2use crate::buckets::{CheckpointData, LockFreeBucket};
3use crate::memory_data_size::MemoryDataSize;
4use crate::memory_fs::file::flush::GlobalFlush;
5use crate::memory_fs::file::internal::MemoryFileMode;
6use crate::memory_fs::file::reader::FileRangeReference;
7use crate::memory_fs::file::writer::FileWriter;
8use crate::utils::memory_size_to_log2;
9use lz4::{BlockMode, BlockSize, ContentChecksum};
10use mt_debug_counters::counter::AtomicCounterGuardSum;
11use parking_lot::Mutex;
12use replace_with::replace_with_or_abort;
13use serde::Serialize;
14use std::io::Write;
15use std::path::{Path, PathBuf};
16
17use super::BucketHeader;
18
19pub const COMPRESSED_BUCKET_MAGIC: &[u8; 16] = b"CPLZ4_INTR_BKT_M";
20
21#[derive(Clone)]
22pub struct CompressedCheckpointSize(u8);
23impl CompressedCheckpointSize {
24    pub const fn new_from_size(size: MemoryDataSize) -> Self {
25        Self(memory_size_to_log2(size))
26    }
27    pub const fn new_from_log2(val: u8) -> Self {
28        Self(val)
29    }
30}
31
32fn create_lz4_stream<W: Write>(writer: W, level: CompressionLevelInfo) -> lz4::Encoder<W> {
33    let (queue_occupation, queue_size) = GlobalFlush::global_queue_occupation();
34
35    let level = if queue_size < 2 * queue_occupation {
36        level.slow_disk
37    } else {
38        level.fast_disk
39    };
40
41    lz4::EncoderBuilder::new()
42        .level(level)
43        .checksum(ContentChecksum::NoChecksum)
44        .block_mode(BlockMode::Linked)
45        .block_size(BlockSize::Max64KB)
46        .build(writer)
47        .unwrap()
48}
49
50struct CompressedBinaryWriterInternal {
51    writer: lz4::Encoder<FileWriter>,
52    checkpoint_max_size: u64,
53    checkpoints: Vec<CheckpointData>,
54    current_chunk_size: u64,
55    level: CompressionLevelInfo,
56    data_format_info: Vec<u8>,
57
58    checkpoint_data: Option<Vec<u8>>,
59}
60
61pub struct CompressedBinaryWriter {
62    inner: Mutex<CompressedBinaryWriterInternal>,
63    path: PathBuf,
64}
65unsafe impl Send for CompressedBinaryWriter {}
66
67impl CompressedBinaryWriterInternal {
68    fn create_new_block(&mut self, passtrough_range: Option<FileRangeReference>) {
69        replace_with_or_abort(&mut self.writer, |writer| {
70            let (file_buf, res) = writer.finish();
71            res.unwrap();
72
73            let checkpoint_pos = if let Some(passtrough_range) = passtrough_range {
74                // Add an optional passtrough block
75                self.checkpoints.push(CheckpointData {
76                    offset: file_buf.len() as u64,
77                    data: self.checkpoint_data.clone(),
78                });
79
80                unsafe {
81                    passtrough_range.copy_to_unsync(&file_buf);
82                }
83
84                file_buf.len()
85            } else {
86                file_buf.len()
87            };
88
89            self.checkpoints.push(CheckpointData {
90                offset: checkpoint_pos as u64,
91                data: self.checkpoint_data.clone(),
92            });
93
94            create_lz4_stream(file_buf, self.level)
95        });
96        self.current_chunk_size = 0;
97    }
98}
99
100impl CompressedBinaryWriter {
101    pub const CHECKPOINT_SIZE_UNLIMITED: CompressedCheckpointSize =
102        CompressedCheckpointSize::new_from_log2(62);
103}
104
105#[derive(Copy, Clone)]
106pub struct CompressionLevelInfo {
107    pub fast_disk: u32,
108    pub slow_disk: u32,
109}
110
111impl LockFreeBucket for CompressedBinaryWriter {
112    type InitData = (
113        MemoryFileMode,
114        CompressedCheckpointSize,
115        CompressionLevelInfo,
116    );
117
118    fn new_serialized_data_format(
119        path_prefix: &Path,
120        (file_mode, checkpoint_max_size, compression_level): &(
121            MemoryFileMode,
122            CompressedCheckpointSize,
123            CompressionLevelInfo,
124        ),
125        index: usize,
126        data_format_info: &[u8],
127    ) -> Self {
128        assert!(
129            data_format_info.len() <= BucketHeader::MAX_DATA_FORMAT_INFO_SIZE,
130            "Serialized data format info is too big, this is a bug"
131        );
132
133        let path = path_prefix.parent().unwrap().join(format!(
134            "{}.{}",
135            path_prefix.file_name().unwrap().to_str().unwrap(),
136            index
137        ));
138
139        let mut file = FileWriter::create(&path, *file_mode);
140
141        let first_checkpoint = initialize_bucket_file(&mut file);
142
143        let writer = create_lz4_stream(file, *compression_level);
144
145        Self {
146            inner: Mutex::new(CompressedBinaryWriterInternal {
147                writer,
148                checkpoint_max_size: (1 << checkpoint_max_size.0),
149                checkpoints: vec![CheckpointData {
150                    offset: first_checkpoint,
151                    data: None,
152                }],
153                current_chunk_size: 0,
154                level: *compression_level,
155                data_format_info: data_format_info.to_vec(),
156                checkpoint_data: None,
157            }),
158            path,
159        }
160    }
161
162    fn set_checkpoint_data<T: Serialize>(
163        &self,
164        data: Option<&T>,
165        passtrough_range: Option<FileRangeReference>,
166    ) {
167        let mut inner = self.inner.lock();
168        inner.checkpoint_data = data.map(|data| bincode::serialize(data).unwrap());
169        // Always create a new block on checkpoint data change
170        inner.create_new_block(passtrough_range);
171    }
172
173    fn write_data(&self, bytes: &[u8]) {
174        let stat_raii = AtomicCounterGuardSum::new(&THREADS_BUSY_WRITING, 1);
175        //
176        let mut inner = self.inner.lock();
177
178        inner.writer.write_all(bytes).unwrap();
179        inner.current_chunk_size += bytes.len() as u64;
180        if inner.current_chunk_size > inner.checkpoint_max_size {
181            inner.create_new_block(None);
182        }
183
184        drop(stat_raii);
185    }
186
187    fn get_path(&self) -> PathBuf {
188        self.path.clone()
189    }
190    fn finalize(self) {
191        let mut inner = self.inner.into_inner();
192
193        inner.writer.flush().unwrap();
194        let (file, res) = inner.writer.finish();
195        res.unwrap();
196
197        finalize_bucket_file(
198            file,
199            COMPRESSED_BUCKET_MAGIC,
200            inner.checkpoints,
201            &inner.data_format_info,
202        );
203    }
204}