parallel_processor/buckets/
mod.rs

1use arc_swap::ArcSwap;
2use bincode::{Decode, Encode};
3use parking_lot::Mutex;
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use crate::memory_fs::file::reader::FileRangeReference;
9use crate::DEFAULT_BINCODE_CONFIG;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17/// This enum serves as a way to specifying the behavior of the
18/// bucket portions created after setting the checkpoint data.
19/// If set on passtrough there is the option to directly read binary data and copy it somewhere else
20#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22    Decompress,
23    Passtrough,
24}
25
26#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28    offset: u64,
29    data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34        self.offset.partial_cmp(&other.offset)
35    }
36}
37
38impl Ord for CheckpointData {
39    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40        self.offset.cmp(&other.offset)
41    }
42}
43
44pub trait LockFreeBucket: Sized {
45    type InitData: Clone;
46
47    fn new_serialized_data_format(
48        path: &Path,
49        data: &Self::InitData,
50        index: usize,
51        data_format: &[u8],
52    ) -> Self;
53
54    fn new<T: Encode>(path: &Path, data: &Self::InitData, index: usize, data_format: &T) -> Self {
55        Self::new_serialized_data_format(
56            path,
57            data,
58            index,
59            &bincode::encode_to_vec(data_format, DEFAULT_BINCODE_CONFIG).unwrap(),
60        )
61    }
62
63    fn set_checkpoint_data<T: Encode>(
64        &self,
65        data: Option<&T>,
66        passtrough_range: Option<FileRangeReference>,
67    );
68
69    fn get_bucket_size(&self) -> u64;
70    fn write_data(&self, bytes: &[u8]) -> u64;
71    fn get_path(&self) -> PathBuf;
72    fn finalize(self);
73}
74
75#[derive(Debug, Clone)]
76pub struct MultiChunkBucket {
77    pub index: usize,
78    pub chunks: Vec<PathBuf>,
79    pub extra_bucket_data: Option<ExtraBucketData>,
80}
81
82impl MultiChunkBucket {
83    pub fn into_single(mut self) -> SingleBucket {
84        assert!(self.chunks.len() == 1);
85        SingleBucket {
86            index: self.index,
87            path: self.chunks.pop().unwrap(),
88            extra_bucket_data: self.extra_bucket_data,
89        }
90    }
91}
92
93pub struct SingleBucket {
94    pub index: usize,
95    pub path: PathBuf,
96    pub extra_bucket_data: Option<ExtraBucketData>,
97}
98
99impl SingleBucket {
100    pub fn to_multi_chunk(self) -> MultiChunkBucket {
101        MultiChunkBucket {
102            index: self.index,
103            chunks: vec![self.path],
104            extra_bucket_data: self.extra_bucket_data,
105        }
106    }
107}
108
109#[derive(Debug, Clone)]
110pub enum ChunkingStatus {
111    SameChunk,
112    NewChunk,
113}
114
115#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
116pub struct BucketsCount {
117    pub normal_buckets_count: usize,
118    pub normal_buckets_count_log: usize,
119    pub total_buckets_count: usize,
120    pub extra_buckets_count: ExtraBuckets,
121}
122
123impl BucketsCount {
124    pub const ONE: Self = Self::new(0, ExtraBuckets::None);
125
126    pub fn from_power_of_two(size: usize, extra_buckets: ExtraBuckets) -> Self {
127        assert_eq!(size, size.next_power_of_two());
128        Self::new(size.ilog2() as usize, extra_buckets)
129    }
130
131    pub const fn new(size_log: usize, extra_buckets: ExtraBuckets) -> Self {
132        let normal_buckets_count = 1 << size_log;
133        let extra_buckets_count = match extra_buckets {
134            ExtraBuckets::None => 0,
135            ExtraBuckets::Extra { count, .. } => count,
136        };
137
138        Self {
139            normal_buckets_count,
140            normal_buckets_count_log: size_log,
141            total_buckets_count: normal_buckets_count + extra_buckets_count,
142            extra_buckets_count: extra_buckets,
143        }
144    }
145
146    pub fn get_extra_buckets_count(&self) -> usize {
147        match self.extra_buckets_count {
148            ExtraBuckets::None => 0,
149            ExtraBuckets::Extra { count, .. } => count,
150        }
151    }
152}
153
154pub struct MultiThreadBuckets<B: LockFreeBucket> {
155    active_buckets: Vec<ArcSwap<B>>,
156    stored_buckets: Vec<Mutex<MultiChunkBucket>>,
157    chunking_size_threshold: Option<NonZeroU64>,
158    bucket_count_lock: Mutex<usize>,
159    base_path: Option<PathBuf>,
160    init_data: Option<B::InitData>,
161    serialized_format_info: Vec<u8>,
162    size: BucketsCount,
163}
164
165#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
166pub struct ExtraBucketData(pub usize);
167
168#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
169pub enum ExtraBuckets {
170    None,
171    Extra { count: usize, data: ExtraBucketData },
172}
173
174impl<B: LockFreeBucket> MultiThreadBuckets<B> {
175    pub const EMPTY: Self = Self {
176        active_buckets: vec![],
177        stored_buckets: vec![],
178        chunking_size_threshold: None,
179        bucket_count_lock: Mutex::new(0),
180        base_path: None,
181        init_data: None,
182        serialized_format_info: vec![],
183        size: BucketsCount {
184            normal_buckets_count: 0,
185            normal_buckets_count_log: 0,
186            total_buckets_count: 0,
187            extra_buckets_count: ExtraBuckets::None,
188        },
189    };
190
191    pub fn get_buckets_count(&self) -> &BucketsCount {
192        &self.size
193    }
194
195    pub fn create_matching_multichunks(&self) -> Vec<Mutex<MultiChunkBucket>> {
196        self.stored_buckets
197            .iter()
198            .map(|s| {
199                let s = s.lock();
200                Mutex::new(MultiChunkBucket {
201                    index: s.index,
202                    chunks: vec![],
203                    extra_bucket_data: s.extra_bucket_data.clone(),
204                })
205            })
206            .collect()
207    }
208
209    pub fn new(
210        size: BucketsCount,
211        path: PathBuf,
212        chunking_size_threshold: Option<u64>,
213        init_data: &B::InitData,
214        format_info: &impl Encode,
215    ) -> MultiThreadBuckets<B> {
216        let mut buckets = Vec::with_capacity(size.total_buckets_count);
217
218        for i in 0..size.total_buckets_count {
219            buckets.push(ArcSwap::from_pointee(B::new(
220                &path,
221                init_data,
222                i,
223                format_info,
224            )));
225        }
226        MultiThreadBuckets {
227            active_buckets: buckets,
228            stored_buckets: (0..size.total_buckets_count)
229                .map(|index| {
230                    Mutex::new(MultiChunkBucket {
231                        index,
232                        chunks: vec![],
233                        extra_bucket_data: match size.extra_buckets_count {
234                            ExtraBuckets::None => None,
235                            ExtraBuckets::Extra { data, .. } => {
236                                if index >= size.normal_buckets_count {
237                                    Some(data)
238                                } else {
239                                    None
240                                }
241                            }
242                        },
243                    })
244                })
245                .collect(),
246            chunking_size_threshold: chunking_size_threshold.map(NonZeroU64::new).flatten(),
247            bucket_count_lock: Mutex::new(size.total_buckets_count),
248            base_path: Some(path),
249            init_data: Some(init_data.clone()),
250            serialized_format_info: bincode::encode_to_vec(format_info, DEFAULT_BINCODE_CONFIG)
251                .unwrap(),
252            size,
253        }
254    }
255
256    pub fn get_stored_buckets(&self) -> &Vec<Mutex<MultiChunkBucket>> {
257        &self.stored_buckets
258    }
259
260    pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
261        assert!(
262            self.stored_buckets
263                .iter_mut()
264                .all(|bucket| bucket.get_mut().chunks.is_empty())
265                && self.chunking_size_threshold.is_none()
266        );
267        let buckets = std::mem::take(&mut self.active_buckets);
268        buckets
269            .into_iter()
270            .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap())
271    }
272
273    pub fn get_path(&self, bucket: u16) -> PathBuf {
274        self.active_buckets[bucket as usize].load().get_path()
275    }
276
277    pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
278        let bucket_guard = self.active_buckets[index as usize].load();
279        let last_bucket_size = bucket_guard.write_data(data);
280
281        drop(bucket_guard);
282
283        // If the disk usage limit is set, check if the disk usage is not exceeded
284        if let Some(chunk_threshold) = self.chunking_size_threshold {
285            if last_bucket_size >= chunk_threshold.get() {
286                let mut buckets_count = self.bucket_count_lock.lock();
287
288                if self.active_buckets[index as usize].load().get_bucket_size()
289                    < chunk_threshold.get()
290                {
291                    // Do not add a new chunk if a previous writer already swapped it
292                    return ChunkingStatus::SameChunk;
293                }
294
295                // Take the largest bucket and add it to the stored buckets
296                let mut stored_bucket = self.active_buckets[index as usize].swap(Arc::new(
297                    B::new_serialized_data_format(
298                        &self.base_path.as_deref().unwrap(),
299                        &self.init_data.as_ref().unwrap(),
300                        *buckets_count,
301                        &self.serialized_format_info,
302                    ),
303                ));
304
305                let stored_bucket = loop {
306                    // Wait for the bucket to end all the pending writes before finalizing it
307                    match Arc::try_unwrap(stored_bucket) {
308                        Ok(bucket) => break bucket,
309                        Err(waiting_arc) => {
310                            stored_bucket = waiting_arc;
311                            std::hint::spin_loop();
312                        }
313                    }
314                };
315
316                // Add the bucket to the stored buckets and clear its active usage
317                let bucket_path = stored_bucket.get_path();
318                stored_bucket.finalize();
319                self.stored_buckets[index as usize]
320                    .lock()
321                    .chunks
322                    .push(bucket_path);
323
324                *buckets_count += 1;
325                ChunkingStatus::NewChunk
326            } else {
327                ChunkingStatus::SameChunk
328            }
329        } else {
330            ChunkingStatus::SameChunk
331        }
332    }
333
334    pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
335        assert!(self.chunking_size_threshold.is_none());
336        let buckets = self.finalize();
337        buckets
338            .into_iter()
339            .map(|mut bucket| {
340                assert!(bucket.chunks.len() == 1);
341                SingleBucket {
342                    index: bucket.index,
343                    path: bucket.chunks.pop().unwrap(),
344                    extra_bucket_data: bucket.extra_bucket_data,
345                }
346            })
347            .collect()
348    }
349
350    pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
351        let mut self_ = Arc::try_unwrap(self)
352            .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
353
354        self_
355            .active_buckets
356            .drain(..)
357            .zip(self_.stored_buckets.drain(..))
358            .map(|(bucket, stored)| {
359                let mut stored = stored.into_inner();
360                let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
361                stored.chunks.push(bucket.get_path());
362                bucket.finalize();
363                stored
364            })
365            .collect()
366    }
367}
368
369impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
370    fn drop(&mut self) {
371        self.active_buckets.drain(..).for_each(|bucket| {
372            let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
373            bucket.finalize();
374        });
375    }
376}
377
378unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
379
380unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}