parallel_processor/buckets/
mod.rs1use arc_swap::ArcSwap;
2use bincode::{Decode, Encode};
3use parking_lot::Mutex;
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use crate::memory_fs::file::reader::FileRangeReference;
9use crate::DEFAULT_BINCODE_CONFIG;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22 Decompress,
23 Passtrough,
24}
25
26#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28 offset: u64,
29 data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34 self.offset.partial_cmp(&other.offset)
35 }
36}
37
38impl Ord for CheckpointData {
39 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40 self.offset.cmp(&other.offset)
41 }
42}
43
44pub trait LockFreeBucket: Sized {
45 type InitData: Clone;
46
47 fn new_serialized_data_format(
48 path: &Path,
49 data: &Self::InitData,
50 index: usize,
51 data_format: &[u8],
52 ) -> Self;
53
54 fn new<T: Encode>(path: &Path, data: &Self::InitData, index: usize, data_format: &T) -> Self {
55 Self::new_serialized_data_format(
56 path,
57 data,
58 index,
59 &bincode::encode_to_vec(data_format, DEFAULT_BINCODE_CONFIG).unwrap(),
60 )
61 }
62
63 fn set_checkpoint_data<T: Encode>(
64 &self,
65 data: Option<&T>,
66 passtrough_range: Option<FileRangeReference>,
67 );
68
69 fn get_bucket_size(&self) -> u64;
70 fn write_data(&self, bytes: &[u8]) -> u64;
71 fn get_path(&self) -> PathBuf;
72 fn finalize(self);
73}
74
75#[derive(Encode, Decode, Debug, Clone)]
76pub struct MultiChunkBucket {
77 pub index: usize,
78 pub chunks: Vec<PathBuf>,
79 pub extra_bucket_data: Option<ExtraBucketData>,
80}
81
82impl MultiChunkBucket {
83 pub fn into_single(mut self) -> SingleBucket {
84 assert!(self.chunks.len() == 1);
85 SingleBucket {
86 index: self.index,
87 path: self.chunks.pop().unwrap(),
88 extra_bucket_data: self.extra_bucket_data,
89 }
90 }
91}
92
93#[derive(Encode, Decode)]
94pub struct SingleBucket {
95 pub index: usize,
96 pub path: PathBuf,
97 pub extra_bucket_data: Option<ExtraBucketData>,
98}
99
100impl SingleBucket {
101 pub fn to_multi_chunk(self) -> MultiChunkBucket {
102 MultiChunkBucket {
103 index: self.index,
104 chunks: vec![self.path],
105 extra_bucket_data: self.extra_bucket_data,
106 }
107 }
108}
109
110#[derive(Debug, Clone)]
111pub enum ChunkingStatus {
112 SameChunk,
113 NewChunk,
114}
115
116#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
117pub struct BucketsCount {
118 pub normal_buckets_count: usize,
119 pub normal_buckets_count_log: usize,
120 pub total_buckets_count: usize,
121 pub extra_buckets_count: ExtraBuckets,
122}
123
124impl BucketsCount {
125 pub const ONE: Self = Self::new(0, ExtraBuckets::None);
126
127 pub fn from_power_of_two(size: usize, extra_buckets: ExtraBuckets) -> Self {
128 assert_eq!(size, size.next_power_of_two());
129 Self::new(size.ilog2() as usize, extra_buckets)
130 }
131
132 pub const fn new(size_log: usize, extra_buckets: ExtraBuckets) -> Self {
133 let normal_buckets_count = 1 << size_log;
134 let extra_buckets_count = match extra_buckets {
135 ExtraBuckets::None => 0,
136 ExtraBuckets::Extra { count, .. } => count,
137 };
138
139 Self {
140 normal_buckets_count,
141 normal_buckets_count_log: size_log,
142 total_buckets_count: normal_buckets_count + extra_buckets_count,
143 extra_buckets_count: extra_buckets,
144 }
145 }
146
147 pub fn get_extra_buckets_count(&self) -> usize {
148 match self.extra_buckets_count {
149 ExtraBuckets::None => 0,
150 ExtraBuckets::Extra { count, .. } => count,
151 }
152 }
153}
154
155pub struct MultiThreadBuckets<B: LockFreeBucket> {
156 active_buckets: Vec<ArcSwap<B>>,
157 stored_buckets: Vec<Mutex<MultiChunkBucket>>,
158 chunking_size_threshold: Option<NonZeroU64>,
159 bucket_count_lock: Mutex<usize>,
160 base_path: Option<PathBuf>,
161 init_data: Option<B::InitData>,
162 serialized_format_info: Vec<u8>,
163 size: BucketsCount,
164}
165
166#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
167pub struct ExtraBucketData(pub usize);
168
169#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq, Eq)]
170pub enum ExtraBuckets {
171 None,
172 Extra { count: usize, data: ExtraBucketData },
173}
174
175impl<B: LockFreeBucket> MultiThreadBuckets<B> {
176 pub const EMPTY: Self = Self {
177 active_buckets: vec![],
178 stored_buckets: vec![],
179 chunking_size_threshold: None,
180 bucket_count_lock: Mutex::new(0),
181 base_path: None,
182 init_data: None,
183 serialized_format_info: vec![],
184 size: BucketsCount {
185 normal_buckets_count: 0,
186 normal_buckets_count_log: 0,
187 total_buckets_count: 0,
188 extra_buckets_count: ExtraBuckets::None,
189 },
190 };
191
192 pub fn get_buckets_count(&self) -> &BucketsCount {
193 &self.size
194 }
195
196 pub fn create_matching_multichunks(&self) -> Vec<Mutex<MultiChunkBucket>> {
197 self.stored_buckets
198 .iter()
199 .map(|s| {
200 let s = s.lock();
201 Mutex::new(MultiChunkBucket {
202 index: s.index,
203 chunks: vec![],
204 extra_bucket_data: s.extra_bucket_data.clone(),
205 })
206 })
207 .collect()
208 }
209
210 pub fn new(
211 size: BucketsCount,
212 path: PathBuf,
213 chunking_size_threshold: Option<u64>,
214 init_data: &B::InitData,
215 format_info: &impl Encode,
216 ) -> MultiThreadBuckets<B> {
217 let mut buckets = Vec::with_capacity(size.total_buckets_count);
218
219 for i in 0..size.total_buckets_count {
220 buckets.push(ArcSwap::from_pointee(B::new(
221 &path,
222 init_data,
223 i,
224 format_info,
225 )));
226 }
227 MultiThreadBuckets {
228 active_buckets: buckets,
229 stored_buckets: (0..size.total_buckets_count)
230 .map(|index| {
231 Mutex::new(MultiChunkBucket {
232 index,
233 chunks: vec![],
234 extra_bucket_data: match size.extra_buckets_count {
235 ExtraBuckets::None => None,
236 ExtraBuckets::Extra { data, .. } => {
237 if index >= size.normal_buckets_count {
238 Some(data)
239 } else {
240 None
241 }
242 }
243 },
244 })
245 })
246 .collect(),
247 chunking_size_threshold: chunking_size_threshold.map(NonZeroU64::new).flatten(),
248 bucket_count_lock: Mutex::new(size.total_buckets_count),
249 base_path: Some(path),
250 init_data: Some(init_data.clone()),
251 serialized_format_info: bincode::encode_to_vec(format_info, DEFAULT_BINCODE_CONFIG)
252 .unwrap(),
253 size,
254 }
255 }
256
257 pub fn get_stored_buckets(&self) -> &Vec<Mutex<MultiChunkBucket>> {
258 &self.stored_buckets
259 }
260
261 pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
262 assert!(
263 self.stored_buckets
264 .iter_mut()
265 .all(|bucket| bucket.get_mut().chunks.is_empty())
266 && self.chunking_size_threshold.is_none()
267 );
268 let buckets = std::mem::take(&mut self.active_buckets);
269 buckets
270 .into_iter()
271 .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap())
272 }
273
274 pub fn get_path(&self, bucket: u16) -> PathBuf {
275 self.active_buckets[bucket as usize].load().get_path()
276 }
277
278 pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
279 let bucket_guard = self.active_buckets[index as usize].load();
280 let last_bucket_size = bucket_guard.write_data(data);
281
282 drop(bucket_guard);
283
284 if let Some(chunk_threshold) = self.chunking_size_threshold {
286 if last_bucket_size >= chunk_threshold.get() {
287 let mut buckets_count = self.bucket_count_lock.lock();
288
289 if self.active_buckets[index as usize].load().get_bucket_size()
290 < chunk_threshold.get()
291 {
292 return ChunkingStatus::SameChunk;
294 }
295
296 let mut stored_bucket = self.active_buckets[index as usize].swap(Arc::new(
298 B::new_serialized_data_format(
299 &self.base_path.as_deref().unwrap(),
300 &self.init_data.as_ref().unwrap(),
301 *buckets_count,
302 &self.serialized_format_info,
303 ),
304 ));
305
306 let stored_bucket = loop {
307 match Arc::try_unwrap(stored_bucket) {
309 Ok(bucket) => break bucket,
310 Err(waiting_arc) => {
311 stored_bucket = waiting_arc;
312 std::hint::spin_loop();
313 }
314 }
315 };
316
317 let bucket_path = stored_bucket.get_path();
319 stored_bucket.finalize();
320 self.stored_buckets[index as usize]
321 .lock()
322 .chunks
323 .push(bucket_path);
324
325 *buckets_count += 1;
326 ChunkingStatus::NewChunk
327 } else {
328 ChunkingStatus::SameChunk
329 }
330 } else {
331 ChunkingStatus::SameChunk
332 }
333 }
334
335 pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
336 assert!(self.chunking_size_threshold.is_none());
337 let buckets = self.finalize();
338 buckets
339 .into_iter()
340 .map(|mut bucket| {
341 assert!(bucket.chunks.len() == 1);
342 SingleBucket {
343 index: bucket.index,
344 path: bucket.chunks.pop().unwrap(),
345 extra_bucket_data: bucket.extra_bucket_data,
346 }
347 })
348 .collect()
349 }
350
351 pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
352 let mut self_ = Arc::try_unwrap(self)
353 .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
354
355 self_
356 .active_buckets
357 .drain(..)
358 .zip(self_.stored_buckets.drain(..))
359 .map(|(bucket, stored)| {
360 let mut stored = stored.into_inner();
361 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
362 stored.chunks.push(bucket.get_path());
363 bucket.finalize();
364 stored
365 })
366 .collect()
367 }
368}
369
370impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
371 fn drop(&mut self) {
372 self.active_buckets.drain(..).for_each(|bucket| {
373 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
374 bucket.finalize();
375 });
376 }
377}
378
379unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
380
381unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}