parallel_processor/memory_fs/
allocator.rs1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use parking_lot::lock_api::RawMutex as _;
4use parking_lot::{Condvar, Mutex, MutexGuard};
5use std::alloc::{alloc, dealloc, Layout};
6use std::cmp::{max, min};
7use std::slice::{from_raw_parts, from_raw_parts_mut};
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10
11const ALLOCATOR_ALIGN: usize = 4096;
12const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
13const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
14const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
15
16#[macro_export]
17#[cfg(feature = "track-usage")]
18macro_rules! chunk_usage {
19 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
20 $crate::memory_fs::allocator::ChunkUsage_::$mode $({
21 $($param: $value),*
22 })?
23 }
24}
25
26#[macro_export]
27#[cfg(not(feature = "track-usage"))]
28macro_rules! chunk_usage {
29 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
30 ()
31 };
32}
33
34#[derive(Debug, Clone, Hash, Eq, PartialEq)]
35pub enum ChunkUsage_ {
36 FileBuffer { path: String },
37 InMemoryFile { path: String },
38 ReadStriped { path: String },
39 TemporarySpace,
40}
41
42pub struct AllocatedChunk {
43 memory: usize,
44 len: AtomicUsize,
45 max_len_log2: usize,
46 dealloc_fn: fn(usize, usize),
47 #[cfg(feature = "track-usage")]
48 _usage: ChunkUsage_,
49}
50unsafe impl Sync for AllocatedChunk {}
51unsafe impl Send for AllocatedChunk {}
52
53impl Clone for AllocatedChunk {
54 #[track_caller]
55 fn clone(&self) -> Self {
56 panic!("This method should not be called, check for vector cloning!");
57 }
58}
59
60impl AllocatedChunk {
61 pub const INVALID: Self = Self {
62 memory: 0,
63 len: AtomicUsize::new(0),
64 max_len_log2: 0,
65 dealloc_fn: |_, _| {},
66 #[cfg(feature = "track-usage")]
67 _usage: ChunkUsage_::TemporarySpace,
68 };
69
70 #[inline(always)]
71 #[allow(dead_code)]
72 fn zero_memory(&mut self) {
73 unsafe {
74 std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
75 }
76 }
77
78 #[inline(always)]
79 #[allow(mutable_transmutes)]
80 pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
81 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
82 std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
83 *off_len += len;
84 }
85
86 #[inline(always)]
87 #[allow(mutable_transmutes)]
88 pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
89 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
90 std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
91 *off_len += len;
92 }
93
94 #[inline(always)]
95 #[allow(mutable_transmutes)]
96 pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
97 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
98 let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
99 *off_len += len;
100 slice
101 }
102
103 pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
104 let result = self
105 .len
106 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
107 if value + data.len() <= (1 << self.max_len_log2) {
108 Some(value + data.len())
109 } else {
110 None
111 }
112 });
113
114 match result {
115 Ok(addr_offset) => {
116 unsafe {
117 std::ptr::copy_nonoverlapping(
118 data.as_ptr(),
119 (self.memory + addr_offset) as *mut u8,
120 data.len(),
121 );
122 }
123 Some(addr_offset as u64)
124 }
125 Err(_) => None,
126 }
127 }
128
129 #[inline(always)]
130 pub fn has_space_for(&self, len: usize) -> bool {
131 self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
132 }
133
134 #[inline(always)]
135 pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
136 from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
137 }
138
139 #[inline(always)]
140 pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
141 self.memory as *mut u8
142 }
143
144 #[inline(always)]
145 pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
146 &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
147 }
148
149 #[inline(always)]
150 pub fn len(&self) -> usize {
151 self.len.load(Ordering::Relaxed)
152 }
153
154 #[inline(always)]
155 pub fn max_len(&self) -> usize {
156 1 << self.max_len_log2
157 }
158
159 #[inline(always)]
160 pub fn remaining_bytes(&self) -> usize {
161 (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
162 }
163
164 #[inline(always)]
165 pub fn clear(&self) {
166 self.len.store(0, Ordering::Relaxed);
167 }
168
169 #[inline(always)]
170 pub fn get(&self) -> &[u8] {
171 unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
172 }
173
174 #[inline(always)]
175 pub unsafe fn set_len(&self, len: usize) {
176 self.len.store(len, Ordering::Relaxed);
177 }
178}
179
180impl Drop for AllocatedChunk {
181 fn drop(&mut self) {
182 (self.dealloc_fn)(self.memory, self.max_len_log2);
183 }
184}
185
186pub struct ChunksAllocator {
187 buffers_list: Mutex<Vec<(usize, usize)>>,
188 chunks_wait_condvar: Condvar,
189 chunks: Mutex<Vec<usize>>,
190 min_free_chunks: AtomicUsize,
191 chunks_total_count: AtomicUsize,
192 chunk_padded_size: AtomicUsize,
193 chunk_usable_size: AtomicUsize,
194 chunks_log_size: AtomicUsize,
195}
196unsafe impl Sync for ChunksAllocator {}
197unsafe impl Send for ChunksAllocator {}
198
199#[cfg(feature = "track-usage")]
200static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
201 Mutex::const_new(parking_lot::RawMutex::INIT, None);
202
203impl ChunksAllocator {
204 const fn new() -> ChunksAllocator {
205 ChunksAllocator {
206 buffers_list: Mutex::new(vec![]),
207 chunks_wait_condvar: Condvar::new(),
208 chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
209 min_free_chunks: AtomicUsize::new(0),
210 chunks_total_count: AtomicUsize::new(0),
211 chunk_padded_size: AtomicUsize::new(0),
212 chunk_usable_size: AtomicUsize::new(0),
213 chunks_log_size: AtomicUsize::new(0),
214 }
215 }
216
217 fn allocate_contiguous_chunk(
218 &self,
219 chunks_count: usize,
220 buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
221 ) -> impl Iterator<Item = usize> {
222 let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
223
224 self.chunks_total_count
225 .fetch_add(chunks_count, Ordering::Relaxed);
226
227 self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
228
229 let data = unsafe {
230 alloc(Layout::from_size_align_unchecked(
231 chunks_count * chunk_padded_size,
232 ALLOCATOR_ALIGN,
233 ))
234 };
235
236 #[cfg(feature = "memory-guards")]
237 unsafe {
238 let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
239 for i in 0..chunks_count {
240 let guard = first_guard.add(i * chunk_padded_size);
241 libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
242 }
243 }
244
245 buffers_list.push((data as usize, chunks_count));
246
247 (0..chunks_count)
248 .into_iter()
249 .rev()
250 .map(move |c| data as usize + (c * chunk_padded_size))
251 }
252
253 pub fn initialize(
254 &self,
255 memory: MemoryDataSize,
256 mut chunks_log_size: usize,
257 min_chunks_count: usize,
258 ) {
259 if self.buffers_list.lock().len() > 0 {
260 return;
262 }
263
264 #[cfg(feature = "track-usage")]
265 {
266 *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
267 }
268
269 chunks_log_size = min(
270 MAXIMUM_CHUNK_SIZE_LOG,
271 max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
272 );
273
274 let chunk_usable_size = 1usize << chunks_log_size;
275 let chunk_padded_size = chunk_usable_size
276 + if cfg!(feature = "memory-guards") {
277 4096
278 } else {
279 0
280 };
281 let total_padded_mem_size: MemoryDataSize =
282 MemoryDataSize::from_octets(chunk_padded_size as f64);
283
284 let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
285
286 self.chunk_padded_size
287 .store(chunk_padded_size, Ordering::Relaxed);
288 self.chunk_usable_size
289 .store(chunk_usable_size, Ordering::Relaxed);
290 self.chunks_log_size
291 .store(chunks_log_size, Ordering::Relaxed);
292
293 let chunks_iter =
294 self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
295
296 self.chunks.lock().extend(chunks_iter);
297
298 crate::log_info!(
299 "Allocator initialized: mem: {} chunks: {} log2: {}",
300 memory,
301 chunks_count,
302 chunks_log_size
303 );
304 }
305
306 pub fn giveback_free_memory(&self) {
307 #[cfg(not(target_os = "windows"))]
308 {
309 let pagesize = page_size::get();
310
311 let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
312
313 let chunks = self.chunks.lock();
314 for chunk_start in chunks.iter() {
315 unsafe {
316 libc::madvise(
317 *chunk_start as *mut libc::c_void,
318 pages_per_chunk * pagesize,
319 libc::MADV_DONTNEED,
320 );
321 }
322 }
323 }
324 }
325
326 pub fn giveback_all_memory(&self) {
327 MemoryFs::flush_all_to_disk();
328
329 loop {
330 {
331 if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
333 break;
334 }
335 }
336 std::thread::sleep(Duration::from_millis(10));
337 }
338
339 #[cfg(not(target_os = "windows"))]
340 unsafe {
341 for (buffer, chunks_count) in self.buffers_list.lock().iter() {
342 libc::madvise(
343 *buffer as *mut libc::c_void,
344 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
345 libc::MADV_DONTNEED,
346 );
347 }
348 }
349 }
350
351 pub fn request_chunk(
352 &self,
353 #[cfg(feature = "track-usage")] usage: ChunkUsage_,
354 #[cfg(not(feature = "track-usage"))] _: (),
355 ) -> AllocatedChunk {
356 let mut tries_count = 0;
357 let mut chunks_lock = self.chunks.lock();
358
359 loop {
360 let el = chunks_lock.pop();
361 let free_count = chunks_lock.len();
362 drop(chunks_lock);
363
364 match el.map(|chunk| AllocatedChunk {
365 memory: chunk,
366 len: AtomicUsize::new(0),
367 max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
368 #[cfg(feature = "track-usage")]
369 _usage: usage.clone(),
370 dealloc_fn: |ptr, _size_log2| {
371 CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
372 CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
373 },
374 }) {
375 None => {
376 if !MemoryFs::reduce_pressure() {
377 tries_count += 1;
378 }
379
380 chunks_lock = self.chunks.lock();
381 if chunks_lock.len() == 0 {
382 if !self
383 .chunks_wait_condvar
384 .wait_for(&mut chunks_lock, Duration::from_millis(25))
385 .timed_out()
386 {
387 tries_count = 0;
388 continue;
389 }
390 }
391
392 if tries_count > 10 {
393 #[cfg(feature = "track-usage")]
394 {
395 super::file::internal::MemoryFileInternal::debug_dump_files();
396 crate::log_info!(
397 "Usages: {:?}",
398 USAGE_MAP
399 .lock()
400 .as_ref()
401 .unwrap()
402 .iter()
403 .filter(|x| *x.1 != 0)
404 .map(|x| format!("{:?}", x))
405 .collect::<Vec<_>>()
406 .join("\n")
407 )
408 }
409
410 let mut buffers_list = self.buffers_list.lock();
411 if chunks_lock.len() == 0 {
415 let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
416
417 let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
418 * alloc_multiplier)
419 / self.chunk_usable_size.load(Ordering::Relaxed);
420 let chunks_iter = self
421 .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
422 chunks_lock.extend(chunks_iter);
423
424 crate::log_info!(
425 "Allocated {} extra chunks for temporary files ({})",
426 extra_chunks_count,
427 OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
428 );
429 }
430 tries_count = 0;
432 }
433 }
434 Some(chunk) => {
435 self.min_free_chunks
436 .fetch_min(free_count, Ordering::Relaxed);
437 #[cfg(feature = "track-usage")]
438 {
439 *USAGE_MAP
440 .lock()
441 .as_mut()
442 .unwrap()
443 .entry(usage.clone())
444 .or_insert(0) += 1;
445 }
446
447 return chunk;
448 }
449 }
450 }
451 }
452
453 pub fn get_free_memory(&self) -> MemoryDataSize {
454 MemoryDataSize::from_octets(
455 (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
456 )
457 }
458
459 pub fn get_reserved_memory(&self) -> MemoryDataSize {
460 MemoryDataSize::from_octets(
461 ((self.chunks_total_count.load(Ordering::Relaxed)
462 - self.min_free_chunks.load(Ordering::Relaxed))
463 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
464 )
465 }
466
467 pub fn get_total_memory(&self) -> MemoryDataSize {
468 MemoryDataSize::from_octets(
469 (self.chunks_total_count.load(Ordering::Relaxed)
470 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
471 )
472 }
473
474 pub fn deinitialize(&self) {
475 let mut chunks = self.chunks.lock();
476
477 let mut counter = 0;
478 while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
480 drop(chunks);
481 std::thread::sleep(Duration::from_millis(200));
482
483 counter += 1;
484 if counter % 256 == 0 {
485 crate::log_warn!("WARNING: Cannot flush all the data!");
486 }
487
488 chunks = self.chunks.lock();
489 }
490
491 FILES_FLUSH_HASH_MAP.lock().take();
492
493 {
494 chunks.clear();
495 self.chunks_total_count.swap(0, Ordering::Relaxed);
496
497 for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
498 unsafe {
499 dealloc(
500 addr as *mut u8,
501 Layout::from_size_align_unchecked(
502 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
503 ALLOCATOR_ALIGN,
504 ),
505 )
506 }
507 }
508 }
509 }
510}
511
512pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
513
514#[cfg(test)]
515mod tests {
516 use crate::memory_data_size::MemoryDataSize;
517 use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
518 use rayon::prelude::*;
519
520 #[test]
521 fn allocate_memory() {
522 CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
523 for _ in 0..5 {
524 let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
525 Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
526 })
527 .take(1024 * 2)
528 .collect();
529
530 allocated_chunks.par_iter_mut().for_each(|x| {
531 x.zero_memory();
532 });
533 }
534 CHUNKS_ALLOCATOR.deinitialize();
535 }
536}