parallel_processor/buckets/
mod.rs1use arc_swap::ArcSwap;
2use parking_lot::Mutex;
3use serde::{Deserialize, Serialize};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use crate::memory_fs::file::reader::FileRangeReference;
10
11pub mod bucket_writer;
12pub mod concurrent;
13pub mod readers;
14pub mod single;
15pub mod writers;
16
17#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
21pub enum CheckpointStrategy {
22 Decompress,
23 Passtrough,
24}
25
26#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
27pub(crate) struct CheckpointData {
28 offset: u64,
29 data: Option<Vec<u8>>,
30}
31
32impl PartialOrd for CheckpointData {
33 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34 self.offset.partial_cmp(&other.offset)
35 }
36}
37
38impl Ord for CheckpointData {
39 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40 self.offset.cmp(&other.offset)
41 }
42}
43
44pub trait LockFreeBucket: Sized {
45 type InitData: Clone;
46
47 fn new_serialized_data_format(
48 path: &Path,
49 data: &Self::InitData,
50 index: usize,
51 data_format: &[u8],
52 ) -> Self;
53
54 fn new<T: Serialize>(
55 path: &Path,
56 data: &Self::InitData,
57 index: usize,
58 data_format: &T,
59 ) -> Self {
60 Self::new_serialized_data_format(
61 path,
62 data,
63 index,
64 &bincode::serialize(data_format).unwrap(),
65 )
66 }
67
68 fn set_checkpoint_data<T: Serialize>(
69 &self,
70 data: Option<&T>,
71 passtrough_range: Option<FileRangeReference>,
72 );
73
74 fn write_data(&self, bytes: &[u8]);
75 fn get_path(&self) -> PathBuf;
76 fn finalize(self);
77}
78
79#[derive(Debug, Clone)]
80pub struct MultiChunkBucket {
81 pub index: usize,
82 pub chunks: Vec<PathBuf>,
83 pub was_compacted: bool,
84}
85
86impl MultiChunkBucket {
87 pub fn into_single(mut self) -> SingleBucket {
88 assert!(self.chunks.len() == 1);
89 SingleBucket {
90 index: self.index,
91 path: self.chunks.pop().unwrap(),
92 }
93 }
94}
95
96pub struct SingleBucket {
97 pub index: usize,
98 pub path: PathBuf,
99}
100
101impl SingleBucket {
102 pub fn to_multi_chunk(self) -> MultiChunkBucket {
103 MultiChunkBucket {
104 index: self.index,
105 chunks: vec![self.path],
106 was_compacted: false,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
112pub enum ChunkingStatus {
113 SameChunk,
114 NewChunks { bucket_indexes: Vec<u16> },
115}
116
117pub struct MultiThreadBuckets<B: LockFreeBucket> {
118 active_buckets: Vec<ArcSwap<(AtomicU64, B)>>,
119 stored_buckets: Mutex<Vec<MultiChunkBucket>>,
120 disk_usage: AtomicU64,
121 active_disk_usage_limit: Option<NonZeroU64>,
122 bucket_count_lock: Mutex<usize>,
123 base_path: Option<PathBuf>,
124 init_data: Option<B::InitData>,
125 serialized_format_info: Vec<u8>,
126}
127
128impl<B: LockFreeBucket> MultiThreadBuckets<B> {
129 pub const EMPTY: Self = Self {
130 active_buckets: vec![],
131 stored_buckets: Mutex::new(vec![]),
132 disk_usage: AtomicU64::new(0),
133 active_disk_usage_limit: None,
134 bucket_count_lock: Mutex::new(0),
135 base_path: None,
136 init_data: None,
137 serialized_format_info: vec![],
138 };
139
140 pub fn new(
141 size: usize,
142 path: PathBuf,
143 active_disk_usage_limit: Option<u64>,
144 init_data: &B::InitData,
145 format_info: &impl Serialize,
146 ) -> MultiThreadBuckets<B> {
147 let mut buckets = Vec::with_capacity(size);
148
149 for i in 0..size {
150 buckets.push(ArcSwap::from_pointee((
151 AtomicU64::new(0),
152 B::new(&path, init_data, i, format_info),
153 )));
154 }
155 MultiThreadBuckets {
156 active_buckets: buckets,
157 stored_buckets: Mutex::new(
158 (0..size)
159 .map(|index| MultiChunkBucket {
160 index,
161 chunks: vec![],
162 was_compacted: false,
163 })
164 .collect(),
165 ),
166 disk_usage: AtomicU64::new(0),
167 active_disk_usage_limit: active_disk_usage_limit.map(NonZeroU64::new).flatten(),
168 bucket_count_lock: Mutex::new(size),
169 base_path: Some(path),
170 init_data: Some(init_data.clone()),
171 serialized_format_info: bincode::serialize(format_info).unwrap(),
172 }
173 }
174
175 pub fn get_stored_buckets(&self) -> &Mutex<Vec<MultiChunkBucket>> {
176 &self.stored_buckets
177 }
178
179 pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
180 assert!(
181 self.stored_buckets
182 .lock()
183 .iter()
184 .all(|bucket| bucket.chunks.is_empty())
185 && self.active_disk_usage_limit.is_none()
186 );
187 let buckets = std::mem::take(&mut self.active_buckets);
188 buckets
189 .into_iter()
190 .map(|bucket| Arc::into_inner(bucket.into_inner()).unwrap().1)
191 }
192
193 pub fn get_path(&self, bucket: u16) -> PathBuf {
194 self.active_buckets[bucket as usize].load().1.get_path()
195 }
196
197 pub fn add_data(&self, index: u16, data: &[u8]) -> ChunkingStatus {
198 let bucket_guard = self.active_buckets[index as usize].load();
199 bucket_guard.1.write_data(data);
200
201 bucket_guard
203 .0
204 .fetch_add(data.len() as u64, Ordering::Relaxed);
205 let mut disk_usage = self
206 .disk_usage
207 .fetch_add(data.len() as u64, Ordering::Relaxed)
208 + data.len() as u64;
209
210 drop(bucket_guard);
211
212 if let Some(max_usage) = self.active_disk_usage_limit {
214 let mut new_chunks_bucket_indexes = vec![];
215 while disk_usage > max_usage.get() {
216 let mut buckets_count = self.bucket_count_lock.lock();
217 let swap_bucket_index = self
219 .active_buckets
220 .iter()
221 .enumerate()
222 .max_by_key(|(_, bucket)| bucket.load().0.load(Ordering::Relaxed))
223 .map(|(i, _bucket)| i)
224 .unwrap();
225
226 let mut stored_bucket = self.active_buckets[swap_bucket_index].swap(Arc::new((
227 AtomicU64::new(0),
228 B::new_serialized_data_format(
229 &self.base_path.as_deref().unwrap(),
230 &self.init_data.as_ref().unwrap(),
231 *buckets_count,
232 &self.serialized_format_info,
233 ),
234 )));
235
236 let (bucket_usage, stored_bucket) = loop {
237 match Arc::try_unwrap(stored_bucket) {
239 Ok(bucket) => break bucket,
240 Err(waiting_arc) => {
241 stored_bucket = waiting_arc;
242 std::hint::spin_loop();
243 }
244 }
245 };
246 let bucket_usage = bucket_usage.into_inner();
247
248 disk_usage =
250 self.disk_usage.fetch_sub(bucket_usage, Ordering::Relaxed) - bucket_usage;
251 let bucket_path = stored_bucket.get_path();
252 stored_bucket.finalize();
253 self.stored_buckets.lock()[swap_bucket_index]
254 .chunks
255 .push(bucket_path);
256
257 new_chunks_bucket_indexes.push(swap_bucket_index as u16);
258
259 *buckets_count += 1;
260 }
261 ChunkingStatus::NewChunks {
262 bucket_indexes: new_chunks_bucket_indexes,
263 }
264 } else {
265 ChunkingStatus::SameChunk
266 }
267 }
268
269 pub fn count(&self) -> usize {
270 self.active_buckets.len()
271 }
272
273 pub fn finalize_single(self: Arc<Self>) -> Vec<SingleBucket> {
274 assert!(self.active_disk_usage_limit.is_none());
275 let buckets = self.finalize();
276 buckets
277 .into_iter()
278 .map(|mut bucket| {
279 assert!(bucket.chunks.len() == 1);
280 SingleBucket {
281 index: bucket.index,
282 path: bucket.chunks.pop().unwrap(),
283 }
284 })
285 .collect()
286 }
287
288 pub fn finalize(self: Arc<Self>) -> Vec<MultiChunkBucket> {
289 let mut self_ = Arc::try_unwrap(self)
290 .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));
291
292 let mut stored_buckets = self_.stored_buckets.lock();
293
294 self_
295 .active_buckets
296 .drain(..)
297 .zip(stored_buckets.drain(..))
298 .map(|(bucket, mut stored)| {
299 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
300 stored.chunks.push(bucket.1.get_path());
301 bucket.1.finalize();
302 stored
303 })
304 .collect()
305 }
306}
307
308impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
309 fn drop(&mut self) {
310 self.active_buckets.drain(..).for_each(|bucket| {
311 let bucket = Arc::into_inner(bucket.into_inner()).unwrap();
312 bucket.1.finalize();
313 });
314 }
315}
316
317unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}
318
319unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}