parallel_processor/buckets/writers/
mod.rs

1use crate::{memory_fs::file::writer::FileWriter, DEFAULT_BINCODE_CONFIG};
2use bincode::{Decode, Encode};
3use desse::{Desse, DesseSized};
4use mt_debug_counters::counter::{AtomicCounter, SumMode};
5use std::io::Write;
6
7use super::CheckpointData;
8
9pub mod compressed_binary_writer;
10pub mod lock_free_binary_writer;
11
12pub(crate) static THREADS_BUSY_WRITING: AtomicCounter<SumMode> =
13    declare_counter_i64!("threads_busy_writing", SumMode, false);
14
15#[derive(Debug, Desse, DesseSized, Default)]
16pub(crate) struct BucketHeader {
17    pub magic: [u8; 16],
18    pub index_offset: u64,
19    pub data_format_info: [u8; Self::MAX_DATA_FORMAT_INFO_SIZE],
20}
21
22impl BucketHeader {
23    pub const MAX_DATA_FORMAT_INFO_SIZE: usize = 32;
24}
25
26#[derive(Encode, Decode)]
27pub(crate) struct BucketCheckpoints {
28    pub index: Vec<CheckpointData>,
29}
30
31pub(crate) fn initialize_bucket_file(file: &mut FileWriter) -> u64 {
32    // Write empty header
33    file.write_all(&BucketHeader::default().serialize()[..])
34        .unwrap();
35
36    file.len()
37}
38
39pub(crate) fn finalize_bucket_file(
40    mut file: FileWriter,
41    magic: &[u8; 16],
42    checkpoints: Vec<CheckpointData>,
43    format_info: &[u8],
44) {
45    file.flush().unwrap();
46    let index_position = file.len() as u64;
47    bincode::encode_into_writer(
48        &BucketCheckpoints { index: checkpoints },
49        &mut file,
50        DEFAULT_BINCODE_CONFIG,
51    )
52    .unwrap();
53
54    let data_format_info = {
55        let mut array = [0; 32];
56        array[0..format_info.len()].copy_from_slice(format_info);
57        array
58    };
59
60    file.write_at_start(
61        &BucketHeader {
62            magic: *magic,
63            index_offset: index_position,
64            data_format_info,
65        }
66        .serialize()[..],
67    )
68    .unwrap();
69    file.flush_async();
70}