parallel_processor/buckets/
mod.rs1use arc_swap::ArcSwap;
2use bincode::{Decode, Encode};
3use parking_lot::Mutex;
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use crate::memory_fs::file::reader::FileRangeReference;
9use crate::DEFAULT_BINCODE_CONFIG;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22 Decompress,
23 Passtrough,
24}
25
26#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28 offset: u64,
29 data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34 self.offset.partial_cmp(&other.offset)
35 }
36}
37
38impl Ord for CheckpointData {
39 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40 self.offset.cmp(&other.offset)
41 }
42}
43
44pub trait LockFreeBucket: Sized {
45 type InitData: Clone;
46
47 fn new_serialized_data_format(
48 path: &Path,
49 data: &Self::InitData,
50 index: usize,
51 data_format: &[u8],
52 ) -> Self;
53
54 fn new<T: Encode>(path: &Path, data: &Self::InitData, index: usize, data_format: &T) -> Self {
55 Self::new_serialized_data_format(
56 path,
57 data,
58 index,
59 &bincode::encode_to_vec(data_format, DEFAULT_BINCODE_CONFIG).unwrap(),
60 )
61 }
62
63 fn set_checkpoint_data<T: Encode>(
64 &self,
65 data: Option<&T>,
66 passtrough_range: Option<FileRangeReference>,
67 );
68
69 fn get_bucket_size(&self) -> u64;
70 fn write_data(&self, bytes: &[u8]) -> u64;
71 fn get_path(&self) -> PathBuf;
72 fn finalize(self);
73}
74
75#[derive(Debug, Clone)]
76pub struct MultiChunkBucket {
77 pub index: usize,
78 pub chunks: Vec<PathBuf>,
79 pub extra_bucket_data: Option<ExtraBucketData>,
80}
81
82impl MultiChunkBucket {
83 pub fn into_single(mut self) -> SingleBucket {
84 assert!(self.chunks.len() == 1);
85 SingleBucket {
86 index: self.index,
87 path: self.chunks.pop().unwrap(),
88 extra_bucket_data: self.extra_bucket_data,
89 }
90 }
91}
92
93pub struct SingleBucket {
94 pub index: usize,
95 pub path: PathBuf,
96 pub extra_bucket_data: Option<ExtraBucketData>,
97}
98
99impl SingleBucket {
100 pub fn to_multi_chunk(self) -> MultiChunkBucket {
101 MultiChunkBucket {
102 index: self.index,
103 chunks: vec![self.path],
104 extra_bucket_data: self.extra_bucket_data,
105 }
106 }
107}
108
109#[derive(Debug, Clone)]
110pub enum ChunkingStatus {
111 SameChunk,
112 NewChunk,
113}
114
115#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
116pub struct BucketsCount {
117 pub normal_buckets_count: usize,
118 pub normal_buckets_count_log: usize,
119 pub total_buckets_count: usize,
120 pub extra_buckets_count: ExtraBuckets,
121}
122
123impl BucketsCount {
124 pub const ONE: Self = Self::new(0, ExtraBuckets::None);
125
126 pub fn from_power_of_two(size: usize, extra_buckets: ExtraBuckets) -> Self {
127 assert_eq!(size, size.next_power_of_two());
128 Self::new(size.ilog2() as usize, extra_buckets)
129 }
130
131 pub const fn new(size_log: usize, extra_buckets: ExtraBuckets) -> Self {
132 let normal_buckets_count = 1 << size_log;
133 let extra_buckets_count = match extra_buckets {
134 ExtraBuckets::None => 0,
135 ExtraBuckets::Extra { count, .. } => count,
136 };
137
138 Self {
139 normal_buckets_count,
140 normal_buckets_count_log: size_log,
141 total_buckets_count: normal_buckets_count + extra_buckets_count,
142 extra_buckets_count: extra_buckets,
143 }
144 }
145
146 pub fn get_extra_buckets_count(&self) -> usize {
147 match self.extra_buckets_count {
148 ExtraBuckets::None => 0,
149 ExtraBuckets::Extra { count, .. } => count,
150 }
151 }
152}
153
154pub struct MultiThreadBuckets<B: LockFreeBucket> {
155 active_buckets: Vec<ArcSwap<B>>,
156 stored_buckets: Vec<Mutex<MultiChunkBucket>>,
157 chunking_size_threshold: Option<NonZeroU64>,
158 bucket_count_lock: Mutex<usize>,
159 base_path: Option<PathBuf>,
160 init_data: Option<B::InitData>,
161 serialized_format_info: Vec<u8>,
162 size: BucketsCount,
163}
164
165#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
166pub struct ExtraBucketData(pub usize);
167
168#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
169pub enum ExtraBuckets {
170 None,
171 Extra { count: usize, data: ExtraBucketData },
172}
173
174impl<B: LockFreeBucket> MultiThreadBuckets<B> {
175 pub const EMPTY: Self = Self {
176 active_buckets: vec![],
177 stored_buckets: vec![],
178 chunking_size_threshold: None,
179 bucket_count_lock: Mutex::new(0),
180 base_path: None,
181 init_data: None,
182 serialized_format_info: vec![],
183 size: BucketsCount {
184 normal_buckets_count: 0,
185 normal_buckets_count_log: 0,
186 total_buckets_count: 0,
187 extra_buckets_count: ExtraBuckets::None,
188 },
189 };
190
191 pub fn get_buckets_count(&self) -> &BucketsCount {
192 &self.size
193 }
194
195 pub fn create_matching_multichunks(&self) -> Vec<Mutex<MultiChunkBucket>> {
196 self.stored_buckets
197 .iter()
198 .map(|s| {
199 let s = s.lock();
200 Mutex::new(MultiChunkBucket {
201 index: s.index,
202 chunks: vec![],
203 extra_bucket_data: s.extra_bucket_data.clone(),
204 })
205 })
206 .collect()
207 }
208
209 pub fn new(
210 size: BucketsCount,
211 path: PathBuf,
212 chunking_size_threshold: Option<u64>,
213 init_data: &B::InitData,
214 format_info: &impl Encode,
215 ) -> MultiThreadBuckets<B> {
216 let mut buckets = Vec::with_capacity(size.total_buckets_count);
217
218 for i in 0..size.total_buckets_count {
219 buckets.push(ArcSwap::from_pointee(B::new(
220 &path,
221 init_data,
222 i,
223 format_info,
224 )));
225 }
226 MultiThreadBuckets {
227 active_buckets: buckets,
228 stored_buckets: (0..size.total_buckets_count)
229 .map(|index| {
230 Mutex::new(MultiChunkBucket {
231 index,
232 chunks: vec![],
233 extra_bucket_data: match size.extra_buckets_count {
234 ExtraBuckets::None => None,
235 ExtraBuckets::Extra { data, .. } => {
236 if index >= size.normal_buckets_count {
237 Some(data)
238 } else {
239 None
240 }
241 }
242 },
243 })
244 })
245 .collect(),
246 chunking_size_threshold: chunking_size_threshold.map(NonZeroU64::new).flatten(),
247 bucket_count_lock: Mutex::new(size.total_buckets_count),
248 base_path: Some(path),
249 init_data: Some(init_data.clone()),
250 serialized_format_info: bincode::encode_to_vec(format_info, DEFAULT_BINCODE_CONFIG)
251 .unwrap(),
252 size,
253 }
254 }
255
256 pub fn get_stored_buckets(&self) -> &Vec<Mutex<MultiChunkBucket>> {
257 &self.stored_buckets
258 }
259
260 pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
261 assert!(
262 self.stored_buckets
263 .iter_mut()
264 .all(|bucket| bucket.get_mut().chunks.is_empty())
265 && self.chunking_size_threshold.is_none()
266 );
267 let buckets = std::mem::take(&mut self.active_buckets);
268 buckets
269 .into_iter()
270 .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap())
271 }
272
273 pub fn get_path(&self, bucket: u16) -> PathBuf {
274 self.active_buckets[bucket as usize].load().get_path()
275 }
276
277 pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
278 let bucket_guard = self.active_buckets[index as usize].load();
279 let last_bucket_size = bucket_guard.write_data(data);
280
281 drop(bucket_guard);
282
283 if let Some(chunk_threshold) = self.chunking_size_threshold {
285 if last_bucket_size >= chunk_threshold.get() {
286 let mut buckets_count = self.bucket_count_lock.lock();
287
288 if self.active_buckets[index as usize].load().get_bucket_size()
289 < chunk_threshold.get()
290 {
291 return ChunkingStatus::SameChunk;
293 }
294
295 let mut stored_bucket = self.active_buckets[index as usize].swap(Arc::new(
297 B::new_serialized_data_format(
298 &self.base_path.as_deref().unwrap(),
299 &self.init_data.as_ref().unwrap(),
300 *buckets_count,
301 &self.serialized_format_info,
302 ),
303 ));
304
305 let stored_bucket = loop {
306 match Arc::try_unwrap(stored_bucket) {
308 Ok(bucket) => break bucket,
309 Err(waiting_arc) => {
310 stored_bucket = waiting_arc;
311 std::hint::spin_loop();
312 }
313 }
314 };
315
316 let bucket_path = stored_bucket.get_path();
318 stored_bucket.finalize();
319 self.stored_buckets[index as usize]
320 .lock()
321 .chunks
322 .push(bucket_path);
323
324 *buckets_count += 1;
325 ChunkingStatus::NewChunk
326 } else {
327 ChunkingStatus::SameChunk
328 }
329 } else {
330 ChunkingStatus::SameChunk
331 }
332 }
333
334 pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
335 assert!(self.chunking_size_threshold.is_none());
336 let buckets = self.finalize();
337 buckets
338 .into_iter()
339 .map(|mut bucket| {
340 assert!(bucket.chunks.len() == 1);
341 SingleBucket {
342 index: bucket.index,
343 path: bucket.chunks.pop().unwrap(),
344 extra_bucket_data: bucket.extra_bucket_data,
345 }
346 })
347 .collect()
348 }
349
350 pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
351 let mut self_ = Arc::try_unwrap(self)
352 .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
353
354 self_
355 .active_buckets
356 .drain(..)
357 .zip(self_.stored_buckets.drain(..))
358 .map(|(bucket, stored)| {
359 let mut stored = stored.into_inner();
360 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
361 stored.chunks.push(bucket.get_path());
362 bucket.finalize();
363 stored
364 })
365 .collect()
366 }
367}
368
369impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
370 fn drop(&mut self) {
371 self.active_buckets.drain(..).for_each(|bucket| {
372 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
373 bucket.finalize();
374 });
375 }
376}
377
378unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
379
380unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}