parallel_processor/buckets/
mod.rs

1use arc_swap::ArcSwap;
2use parking_lot::Mutex;
3use serde::{Deserialize, Serialize};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use crate::memory_fs::file::reader::FileRangeReference;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17/// This enum serves as a way to specifying the behavior of the
18/// bucket portions created after setting the checkpoint data.
19/// If set on passtrough there is the option to directly read binary data and copy it somewhere else
20#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22    Decompress,
23    Passtrough,
24}
25
26#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28    offset: u64,
29    data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34        self.offset.partial_cmp(&other.offset)
35    }
36}
37
38impl Ord for CheckpointData {
39    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40        self.offset.cmp(&other.offset)
41    }
42}
43
44pub trait LockFreeBucket: Sized {
45    type InitData: Clone;
46
47    fn new_serialized_data_format(
48        path: &Path,
49        data: &Self::InitData,
50        index: usize,
51        data_format: &[u8],
52    ) -> Self;
53
54    fn new<T: Serialize>(
55        path: &Path,
56        data: &Self::InitData,
57        index: usize,
58        data_format: &T,
59    ) -> Self {
60        Self::new_serialized_data_format(
61            path,
62            data,
63            index,
64            &bincode::serialize(data_format).unwrap(),
65        )
66    }
67
68    fn set_checkpoint_data<T: Serialize>(
69        &self,
70        data: Option<&T>,
71        passtrough_range: Option<FileRangeReference>,
72    );
73
74    fn write_data(&self, bytes: &[u8]);
75    fn get_path(&self) -> PathBuf;
76    fn finalize(self);
77}
78
79#[derive(Debug, Clone)]
80pub struct MultiChunkBucket {
81    pub index: usize,
82    pub chunks: Vec<PathBuf>,
83    pub was_compacted: bool,
84}
85
86impl MultiChunkBucket {
87    pub fn into_single(mut self) -> SingleBucket {
88        assert!(self.chunks.len() == 1);
89        SingleBucket {
90            index: self.index,
91            path: self.chunks.pop().unwrap(),
92        }
93    }
94}
95
96pub struct SingleBucket {
97    pub index: usize,
98    pub path: PathBuf,
99}
100
101impl SingleBucket {
102    pub fn to_multi_chunk(self) -> MultiChunkBucket {
103        MultiChunkBucket {
104            index: self.index,
105            chunks: vec![self.path],
106            was_compacted: false,
107        }
108    }
109}
110
111#[derive(Debug, Clone)]
112pub enum ChunkingStatus {
113    SameChunk,
114    NewChunks { bucket_indexes: Vec<u16> },
115}
116
117pub struct MultiThreadBuckets<B: LockFreeBucket> {
118    active_buckets: Vec<ArcSwap<(AtomicU64, B)>>,
119    stored_buckets: Mutex<Vec<MultiChunkBucket>>,
120    disk_usage: AtomicU64,
121    active_disk_usage_limit: Option<NonZeroU64>,
122    bucket_count_lock: Mutex<usize>,
123    base_path: Option<PathBuf>,
124    init_data: Option<B::InitData>,
125    serialized_format_info: Vec<u8>,
126}
127
128impl<B: LockFreeBucket> MultiThreadBuckets<B> {
129    pub const EMPTY: Self = Self {
130        active_buckets: vec![],
131        stored_buckets: Mutex::new(vec![]),
132        disk_usage: AtomicU64::new(0),
133        active_disk_usage_limit: None,
134        bucket_count_lock: Mutex::new(0),
135        base_path: None,
136        init_data: None,
137        serialized_format_info: vec![],
138    };
139
140    pub fn new(
141        size: usize,
142        path: PathBuf,
143        active_disk_usage_limit: Option<u64>,
144        init_data: &B::InitData,
145        format_info: &impl Serialize,
146    ) -> MultiThreadBuckets<B> {
147        let mut buckets = Vec::with_capacity(size);
148
149        for i in 0..size {
150            buckets.push(ArcSwap::from_pointee((
151                AtomicU64::new(0),
152                B::new(&path, init_data, i, format_info),
153            )));
154        }
155        MultiThreadBuckets {
156            active_buckets: buckets,
157            stored_buckets: Mutex::new(
158                (0..size)
159                    .map(|index| MultiChunkBucket {
160                        index,
161                        chunks: vec![],
162                        was_compacted: false,
163                    })
164                    .collect(),
165            ),
166            disk_usage: AtomicU64::new(0),
167            active_disk_usage_limit: active_disk_usage_limit.map(NonZeroU64::new).flatten(),
168            bucket_count_lock: Mutex::new(size),
169            base_path: Some(path),
170            init_data: Some(init_data.clone()),
171            serialized_format_info: bincode::serialize(format_info).unwrap(),
172        }
173    }
174
175    pub fn get_stored_buckets(&self) -> &Mutex<Vec<MultiChunkBucket>> {
176        &self.stored_buckets
177    }
178
179    pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
180        assert!(
181            self.stored_buckets
182                .lock()
183                .iter()
184                .all(|bucket| bucket.chunks.is_empty())
185                && self.active_disk_usage_limit.is_none()
186        );
187        let buckets = std::mem::take(&mut self.active_buckets);
188        buckets
189            .into_iter()
190            .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap().1)
191    }
192
193    pub fn get_path(&self, bucket: u16) -> PathBuf {
194        self.active_buckets[bucket as usize].load().1.get_path()
195    }
196
197    pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
198        let bucket_guard = self.active_buckets[index as usize].load();
199        bucket_guard.1.write_data(data);
200
201        // Add the data size to both the bucket and the global disk usages
202        bucket_guard
203            .0
204            .fetch_add(data.len() as u64, Ordering::Relaxed);
205        let mut disk_usage = self
206            .disk_usage
207            .fetch_add(data.len() as u64, Ordering::Relaxed)
208            + data.len() as u64;
209
210        drop(bucket_guard);
211
212        // If the disk usage limit is set, check if the disk usage is not exceeded
213        if let Some(max_usage) = self.active_disk_usage_limit {
214            let mut new_chunks_bucket_indexes = vec![];
215            while disk_usage > max_usage.get() {
216                let mut buckets_count = self.bucket_count_lock.lock();
217                // Take the largest bucket and add it to the stored buckets
218                let swap_bucket_index = self
219                    .active_buckets
220                    .iter()
221                    .enumerate()
222                    .max_by_key(|(_, bucket)| bucket.load().0.load(Ordering::Relaxed))
223                    .map(|(i, _bucket)| i)
224                    .unwrap();
225
226                let mut stored_bucket = self.active_buckets[swap_bucket_index].swap(Arc::new((
227                    AtomicU64::new(0),
228                    B::new_serialized_data_format(
229                        &self.base_path.as_deref().unwrap(),
230                        &self.init_data.as_ref().unwrap(),
231                        *buckets_count,
232                        &self.serialized_format_info,
233                    ),
234                )));
235
236                let (bucket_usage, stored_bucket) = loop {
237                    // Wait for the bucket to end all the pending writes before finalizing it
238                    match Arc::try_unwrap(stored_bucket) {
239                        Ok(bucket) => break bucket,
240                        Err(waiting_arc) => {
241                            stored_bucket = waiting_arc;
242                            std::hint::spin_loop();
243                        }
244                    }
245                };
246                let bucket_usage = bucket_usage.into_inner();
247
248                // Add the bucket to the stored buckets and clear its active usage
249                disk_usage =
250                    self.disk_usage.fetch_sub(bucket_usage, Ordering::Relaxed) - bucket_usage;
251                let bucket_path = stored_bucket.get_path();
252                stored_bucket.finalize();
253                self.stored_buckets.lock()[swap_bucket_index]
254                    .chunks
255                    .push(bucket_path);
256
257                new_chunks_bucket_indexes.push(swap_bucket_index as u16);
258
259                *buckets_count += 1;
260            }
261            ChunkingStatus::NewChunks {
262                bucket_indexes: new_chunks_bucket_indexes,
263            }
264        } else {
265            ChunkingStatus::SameChunk
266        }
267    }
268
269    pub fn count(&self) -> usize {
270        self.active_buckets.len()
271    }
272
273    pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
274        assert!(self.active_disk_usage_limit.is_none());
275        let buckets = self.finalize();
276        buckets
277            .into_iter()
278            .map(|mut bucket| {
279                assert!(bucket.chunks.len() == 1);
280                SingleBucket {
281                    index: bucket.index,
282                    path: bucket.chunks.pop().unwrap(),
283                }
284            })
285            .collect()
286    }
287
288    pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
289        let mut self_ = Arc::try_unwrap(self)
290            .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
291
292        let mut stored_buckets = self_.stored_buckets.lock();
293
294        self_
295            .active_buckets
296            .drain(..)
297            .zip(stored_buckets.drain(..))
298            .map(|(bucket, mut stored)| {
299                let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
300                stored.chunks.push(bucket.1.get_path());
301                bucket.1.finalize();
302                stored
303            })
304            .collect()
305    }
306}
307
308impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
309    fn drop(&mut self) {
310        self.active_buckets.drain(..).for_each(|bucket| {
311            let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
312            bucket.1.finalize();
313        });
314    }
315}
316
317unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
318
319unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}