parallel_processor/buckets/
mod.rs

1use arc_swap::ArcSwap;
2use bincode::{Decode, Encode};
3use parking_lot::Mutex;
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use crate::memory_fs::file::reader::FileRangeReference;
9use crate::DEFAULT_BINCODE_CONFIG;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17/// This enum serves as a way to specifying the behavior of the
18/// bucket portions created after setting the checkpoint data.
19/// If set on passtrough there is the option to directly read binary data and copy it somewhere else
20#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22    Decompress,
23    Passtrough,
24}
25
26#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28    offset: u64,
29    data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34        self.offset.partial_cmp(&other.offset)
35    }
36}
37
38impl Ord for CheckpointData {
39    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40        self.offset.cmp(&other.offset)
41    }
42}
43
44pub trait LockFreeBucket: Sized {
45    type InitData: Clone;
46
47    fn new_serialized_data_format(
48        path: &Path,
49        data: &Self::InitData,
50        index: usize,
51        data_format: &[u8],
52    ) -> Self;
53
54    fn new<T: Encode>(path: &Path, data: &Self::InitData, index: usize, data_format: &T) -> Self {
55        Self::new_serialized_data_format(
56            path,
57            data,
58            index,
59            &bincode::encode_to_vec(data_format, DEFAULT_BINCODE_CONFIG).unwrap(),
60        )
61    }
62
63    fn set_checkpoint_data<T: Encode>(
64        &self,
65        data: Option<&T>,
66        passtrough_range: Option<FileRangeReference>,
67    );
68
69    fn get_bucket_size(&self) -> u64;
70    fn write_data(&self, bytes: &[u8]) -> u64;
71    fn get_path(&self) -> PathBuf;
72    fn finalize(self);
73}
74
75#[derive(Encode, Decode, Debug, Clone)]
76pub struct MultiChunkBucket {
77    pub index: usize,
78    pub chunks: Vec<PathBuf>,
79    pub extra_bucket_data: Option<ExtraBucketData>,
80}
81
82impl MultiChunkBucket {
83    pub fn into_single(mut self) -> SingleBucket {
84        assert!(self.chunks.len() == 1);
85        SingleBucket {
86            index: self.index,
87            path: self.chunks.pop().unwrap(),
88            extra_bucket_data: self.extra_bucket_data,
89        }
90    }
91}
92
93#[derive(Encode, Decode)]
94pub struct SingleBucket {
95    pub index: usize,
96    pub path: PathBuf,
97    pub extra_bucket_data: Option<ExtraBucketData>,
98}
99
100impl SingleBucket {
101    pub fn to_multi_chunk(self) -> MultiChunkBucket {
102        MultiChunkBucket {
103            index: self.index,
104            chunks: vec![self.path],
105            extra_bucket_data: self.extra_bucket_data,
106        }
107    }
108}
109
110#[derive(Debug, Clone)]
111pub enum ChunkingStatus {
112    SameChunk,
113    NewChunk,
114}
115
116#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
117pub struct BucketsCount {
118    pub normal_buckets_count: usize,
119    pub normal_buckets_count_log: usize,
120    pub total_buckets_count: usize,
121    pub extra_buckets_count: ExtraBuckets,
122}
123
124impl BucketsCount {
125    pub const ONE: Self = Self::new(0, ExtraBuckets::None);
126
127    pub fn from_power_of_two(size: usize, extra_buckets: ExtraBuckets) -> Self {
128        assert_eq!(size, size.next_power_of_two());
129        Self::new(size.ilog2() as usize, extra_buckets)
130    }
131
132    pub const fn new(size_log: usize, extra_buckets: ExtraBuckets) -> Self {
133        let normal_buckets_count = 1 << size_log;
134        let extra_buckets_count = match extra_buckets {
135            ExtraBuckets::None => 0,
136            ExtraBuckets::Extra { count, .. } => count,
137        };
138
139        Self {
140            normal_buckets_count,
141            normal_buckets_count_log: size_log,
142            total_buckets_count: normal_buckets_count + extra_buckets_count,
143            extra_buckets_count: extra_buckets,
144        }
145    }
146
147    pub fn get_extra_buckets_count(&self) -> usize {
148        match self.extra_buckets_count {
149            ExtraBuckets::None => 0,
150            ExtraBuckets::Extra { count, .. } => count,
151        }
152    }
153}
154
155pub struct MultiThreadBuckets<B: LockFreeBucket> {
156    active_buckets: Vec<ArcSwap<B>>,
157    stored_buckets: Vec<Mutex<MultiChunkBucket>>,
158    chunking_size_threshold: Option<NonZeroU64>,
159    bucket_count_lock: Mutex<usize>,
160    base_path: Option<PathBuf>,
161    init_data: Option<B::InitData>,
162    serialized_format_info: Vec<u8>,
163    size: BucketsCount,
164}
165
166#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
167pub struct ExtraBucketData(pub usize);
168
169#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
170pub enum ExtraBuckets {
171    None,
172    Extra { count: usize, data: ExtraBucketData },
173}
174
175impl<B: LockFreeBucket> MultiThreadBuckets<B> {
176    pub const EMPTY: Self = Self {
177        active_buckets: vec![],
178        stored_buckets: vec![],
179        chunking_size_threshold: None,
180        bucket_count_lock: Mutex::new(0),
181        base_path: None,
182        init_data: None,
183        serialized_format_info: vec![],
184        size: BucketsCount {
185            normal_buckets_count: 0,
186            normal_buckets_count_log: 0,
187            total_buckets_count: 0,
188            extra_buckets_count: ExtraBuckets::None,
189        },
190    };
191
192    pub fn get_buckets_count(&self) -> &BucketsCount {
193        &self.size
194    }
195
196    pub fn create_matching_multichunks(&self) -> Vec<Mutex<MultiChunkBucket>> {
197        self.stored_buckets
198            .iter()
199            .map(|s| {
200                let s = s.lock();
201                Mutex::new(MultiChunkBucket {
202                    index: s.index,
203                    chunks: vec![],
204                    extra_bucket_data: s.extra_bucket_data.clone(),
205                })
206            })
207            .collect()
208    }
209
210    pub fn new(
211        size: BucketsCount,
212        path: PathBuf,
213        chunking_size_threshold: Option<u64>,
214        init_data: &B::InitData,
215        format_info: &impl Encode,
216    ) -> MultiThreadBuckets<B> {
217        let mut buckets = Vec::with_capacity(size.total_buckets_count);
218
219        for i in 0..size.total_buckets_count {
220            buckets.push(ArcSwap::from_pointee(B::new(
221                &path,
222                init_data,
223                i,
224                format_info,
225            )));
226        }
227        MultiThreadBuckets {
228            active_buckets: buckets,
229            stored_buckets: (0..size.total_buckets_count)
230                .map(|index| {
231                    Mutex::new(MultiChunkBucket {
232                        index,
233                        chunks: vec![],
234                        extra_bucket_data: match size.extra_buckets_count {
235                            ExtraBuckets::None => None,
236                            ExtraBuckets::Extra { data, .. } => {
237                                if index >= size.normal_buckets_count {
238                                    Some(data)
239                                } else {
240                                    None
241                                }
242                            }
243                        },
244                    })
245                })
246                .collect(),
247            chunking_size_threshold: chunking_size_threshold.map(NonZeroU64::new).flatten(),
248            bucket_count_lock: Mutex::new(size.total_buckets_count),
249            base_path: Some(path),
250            init_data: Some(init_data.clone()),
251            serialized_format_info: bincode::encode_to_vec(format_info, DEFAULT_BINCODE_CONFIG)
252                .unwrap(),
253            size,
254        }
255    }
256
257    pub fn get_stored_buckets(&self) -> &Vec<Mutex<MultiChunkBucket>> {
258        &self.stored_buckets
259    }
260
261    pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
262        assert!(
263            self.stored_buckets
264                .iter_mut()
265                .all(|bucket| bucket.get_mut().chunks.is_empty())
266                && self.chunking_size_threshold.is_none()
267        );
268        let buckets = std::mem::take(&mut self.active_buckets);
269        buckets
270            .into_iter()
271            .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap())
272    }
273
274    pub fn get_path(&self, bucket: u16) -> PathBuf {
275        self.active_buckets[bucket as usize].load().get_path()
276    }
277
278    pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
279        let bucket_guard = self.active_buckets[index as usize].load();
280        let last_bucket_size = bucket_guard.write_data(data);
281
282        drop(bucket_guard);
283
284        // If the disk usage limit is set, check if the disk usage is not exceeded
285        if let Some(chunk_threshold) = self.chunking_size_threshold {
286            if last_bucket_size >= chunk_threshold.get() {
287                let mut buckets_count = self.bucket_count_lock.lock();
288
289                if self.active_buckets[index as usize].load().get_bucket_size()
290                    < chunk_threshold.get()
291                {
292                    // Do not add a new chunk if a previous writer already swapped it
293                    return ChunkingStatus::SameChunk;
294                }
295
296                // Take the largest bucket and add it to the stored buckets
297                let mut stored_bucket = self.active_buckets[index as usize].swap(Arc::new(
298                    B::new_serialized_data_format(
299                        &self.base_path.as_deref().unwrap(),
300                        &self.init_data.as_ref().unwrap(),
301                        *buckets_count,
302                        &self.serialized_format_info,
303                    ),
304                ));
305
306                let stored_bucket = loop {
307                    // Wait for the bucket to end all the pending writes before finalizing it
308                    match Arc::try_unwrap(stored_bucket) {
309                        Ok(bucket) => break bucket,
310                        Err(waiting_arc) => {
311                            stored_bucket = waiting_arc;
312                            std::hint::spin_loop();
313                        }
314                    }
315                };
316
317                // Add the bucket to the stored buckets and clear its active usage
318                let bucket_path = stored_bucket.get_path();
319                stored_bucket.finalize();
320                self.stored_buckets[index as usize]
321                    .lock()
322                    .chunks
323                    .push(bucket_path);
324
325                *buckets_count += 1;
326                ChunkingStatus::NewChunk
327            } else {
328                ChunkingStatus::SameChunk
329            }
330        } else {
331            ChunkingStatus::SameChunk
332        }
333    }
334
335    pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
336        assert!(self.chunking_size_threshold.is_none());
337        let buckets = self.finalize();
338        buckets
339            .into_iter()
340            .map(|mut bucket| {
341                assert!(bucket.chunks.len() == 1);
342                SingleBucket {
343                    index: bucket.index,
344                    path: bucket.chunks.pop().unwrap(),
345                    extra_bucket_data: bucket.extra_bucket_data,
346                }
347            })
348            .collect()
349    }
350
351    pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
352        let mut self_ = Arc::try_unwrap(self)
353            .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
354
355        self_
356            .active_buckets
357            .drain(..)
358            .zip(self_.stored_buckets.drain(..))
359            .map(|(bucket, stored)| {
360                let mut stored = stored.into_inner();
361                let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
362                stored.chunks.push(bucket.get_path());
363                bucket.finalize();
364                stored
365            })
366            .collect()
367    }
368}
369
370impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
371    fn drop(&mut self) {
372        self.active_buckets.drain(..).for_each(|bucket| {
373            let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
374            bucket.finalize();
375        });
376    }
377}
378
379unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
380
381unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}