parallel_processor/memory_fs/
allocator.rs

1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use parking_lot::lock_api::RawMutex as _;
4use parking_lot::{Condvar, Mutex, MutexGuard};
5use std::alloc::{alloc, dealloc, Layout};
6use std::cmp::{max, min};
7use std::slice::{from_raw_parts, from_raw_parts_mut};
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10
11const ALLOCATOR_ALIGN: usize = 4096;
12const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
13const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
14const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
15
16#[macro_export]
17#[cfg(feature = "track-usage")]
18macro_rules! chunk_usage {
19    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
20        $crate::memory_fs::allocator::ChunkUsage_::$mode $({
21            $($param: $value),*
22        })?
23    }
24}
25
26#[macro_export]
27#[cfg(not(feature = "track-usage"))]
28macro_rules! chunk_usage {
29    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
30        ()
31    };
32}
33
34#[derive(Debug, Clone, Hash, Eq, PartialEq)]
35pub enum ChunkUsage_ {
36    FileBuffer { path: String },
37    InMemoryFile { path: String },
38    ReadStriped { path: String },
39    TemporarySpace,
40}
41
42pub struct AllocatedChunk {
43    memory: usize,
44    len: AtomicUsize,
45    max_len_log2: usize,
46    dealloc_fn: fn(usize, usize),
47    #[cfg(feature = "track-usage")]
48    _usage: ChunkUsage_,
49}
50unsafe impl Sync for AllocatedChunk {}
51unsafe impl Send for AllocatedChunk {}
52
53impl Clone for AllocatedChunk {
54    #[track_caller]
55    fn clone(&self) -> Self {
56        panic!("This method should not be called, check for vector cloning!");
57    }
58}
59
60impl AllocatedChunk {
61    pub const INVALID: Self = Self {
62        memory: 0,
63        len: AtomicUsize::new(0),
64        max_len_log2: 0,
65        dealloc_fn: |_, _| {},
66        #[cfg(feature = "track-usage")]
67        _usage: ChunkUsage_::TemporarySpace,
68    };
69
70    #[inline(always)]
71    #[allow(dead_code)]
72    fn zero_memory(&mut self) {
73        unsafe {
74            std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
75        }
76    }
77
78    #[inline(always)]
79    #[allow(mutable_transmutes)]
80    pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
81        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
82        std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
83        *off_len += len;
84    }
85
86    #[inline(always)]
87    #[allow(mutable_transmutes)]
88    pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
89        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
90        std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
91        *off_len += len;
92    }
93
94    #[inline(always)]
95    #[allow(mutable_transmutes)]
96    pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
97        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
98        let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
99        *off_len += len;
100        slice
101    }
102
103    pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
104        let result = self
105            .len
106            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
107                if value + data.len() <= (1 << self.max_len_log2) {
108                    Some(value + data.len())
109                } else {
110                    None
111                }
112            });
113
114        match result {
115            Ok(addr_offset) => {
116                unsafe {
117                    std::ptr::copy_nonoverlapping(
118                        data.as_ptr(),
119                        (self.memory + addr_offset) as *mut u8,
120                        data.len(),
121                    );
122                }
123                Some(addr_offset as u64)
124            }
125            Err(_) => None,
126        }
127    }
128
129    #[inline(always)]
130    pub fn has_space_for(&self, len: usize) -> bool {
131        self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
132    }
133
134    #[inline(always)]
135    pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
136        from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
137    }
138
139    #[inline(always)]
140    pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
141        self.memory as *mut u8
142    }
143
144    #[inline(always)]
145    pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
146        &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
147    }
148
149    #[inline(always)]
150    pub fn len(&self) -> usize {
151        self.len.load(Ordering::Relaxed)
152    }
153
154    #[inline(always)]
155    pub fn max_len(&self) -> usize {
156        1 << self.max_len_log2
157    }
158
159    #[inline(always)]
160    pub fn remaining_bytes(&self) -> usize {
161        (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
162    }
163
164    #[inline(always)]
165    pub fn clear(&self) {
166        self.len.store(0, Ordering::Relaxed);
167    }
168
169    #[inline(always)]
170    pub fn get(&self) -> &[u8] {
171        unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
172    }
173
174    #[inline(always)]
175    pub unsafe fn set_len(&self, len: usize) {
176        self.len.store(len, Ordering::Relaxed);
177    }
178}
179
180impl Drop for AllocatedChunk {
181    fn drop(&mut self) {
182        (self.dealloc_fn)(self.memory, self.max_len_log2);
183    }
184}
185
186pub struct ChunksAllocator {
187    buffers_list: Mutex<Vec<(usize, usize)>>,
188    chunks_wait_condvar: Condvar,
189    chunks: Mutex<Vec<usize>>,
190    min_free_chunks: AtomicUsize,
191    chunks_total_count: AtomicUsize,
192    chunk_padded_size: AtomicUsize,
193    chunk_usable_size: AtomicUsize,
194    chunks_log_size: AtomicUsize,
195}
196unsafe impl Sync for ChunksAllocator {}
197unsafe impl Send for ChunksAllocator {}
198
199#[cfg(feature = "track-usage")]
200static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
201    Mutex::const_new(parking_lot::RawMutex::INIT, None);
202
203impl ChunksAllocator {
204    const fn new() -> ChunksAllocator {
205        ChunksAllocator {
206            buffers_list: Mutex::new(vec![]),
207            chunks_wait_condvar: Condvar::new(),
208            chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
209            min_free_chunks: AtomicUsize::new(0),
210            chunks_total_count: AtomicUsize::new(0),
211            chunk_padded_size: AtomicUsize::new(0),
212            chunk_usable_size: AtomicUsize::new(0),
213            chunks_log_size: AtomicUsize::new(0),
214        }
215    }
216
217    fn allocate_contiguous_chunk(
218        &self,
219        chunks_count: usize,
220        buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
221    ) -> impl Iterator<Item = usize> {
222        let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
223
224        self.chunks_total_count
225            .fetch_add(chunks_count, Ordering::Relaxed);
226
227        self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
228
229        let data = unsafe {
230            alloc(Layout::from_size_align_unchecked(
231                chunks_count * chunk_padded_size,
232                ALLOCATOR_ALIGN,
233            ))
234        };
235
236        #[cfg(feature = "memory-guards")]
237        unsafe {
238            let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
239            for i in 0..chunks_count {
240                let guard = first_guard.add(i * chunk_padded_size);
241                libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
242            }
243        }
244
245        buffers_list.push((data as usize, chunks_count));
246
247        (0..chunks_count)
248            .into_iter()
249            .rev()
250            .map(move |c| data as usize + (c * chunk_padded_size))
251    }
252
253    pub fn initialize(
254        &self,
255        memory: MemoryDataSize,
256        mut chunks_log_size: usize,
257        min_chunks_count: usize,
258    ) {
259        if self.buffers_list.lock().len() > 0 {
260            // Already allocated
261            return;
262        }
263
264        #[cfg(feature = "track-usage")]
265        {
266            *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
267        }
268
269        chunks_log_size = min(
270            MAXIMUM_CHUNK_SIZE_LOG,
271            max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
272        );
273
274        let chunk_usable_size = 1usize << chunks_log_size;
275        let chunk_padded_size = chunk_usable_size
276            + if cfg!(feature = "memory-guards") {
277                4096
278            } else {
279                0
280            };
281        let total_padded_mem_size: MemoryDataSize =
282            MemoryDataSize::from_octets(chunk_padded_size as f64);
283
284        let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
285
286        self.chunk_padded_size
287            .store(chunk_padded_size, Ordering::Relaxed);
288        self.chunk_usable_size
289            .store(chunk_usable_size, Ordering::Relaxed);
290        self.chunks_log_size
291            .store(chunks_log_size, Ordering::Relaxed);
292
293        let chunks_iter =
294            self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
295
296        self.chunks.lock().extend(chunks_iter);
297
298        crate::log_info!(
299            "Allocator initialized: mem: {} chunks: {} log2: {}",
300            memory,
301            chunks_count,
302            chunks_log_size
303        );
304    }
305
306    pub fn giveback_free_memory(&self) {
307        #[cfg(not(target_os = "windows"))]
308        {
309            let pagesize = page_size::get();
310
311            let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
312
313            let chunks = self.chunks.lock();
314            for chunk_start in chunks.iter() {
315                unsafe {
316                    libc::madvise(
317                        *chunk_start as *mut libc::c_void,
318                        pages_per_chunk * pagesize,
319                        libc::MADV_DONTNEED,
320                    );
321                }
322            }
323        }
324    }
325
326    pub fn giveback_all_memory(&self) {
327        MemoryFs::flush_all_to_disk();
328
329        loop {
330            {
331                // Wait for all the chunks to be freed
332                if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
333                    break;
334                }
335            }
336            std::thread::sleep(Duration::from_millis(10));
337        }
338
339        #[cfg(not(target_os = "windows"))]
340        unsafe {
341            for (buffer, chunks_count) in self.buffers_list.lock().iter() {
342                libc::madvise(
343                    *buffer as *mut libc::c_void,
344                    chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
345                    libc::MADV_DONTNEED,
346                );
347            }
348        }
349    }
350
351    pub fn request_chunk(
352        &self,
353        #[cfg(feature = "track-usage")] usage: ChunkUsage_,
354        #[cfg(not(feature = "track-usage"))] _: (),
355    ) -> AllocatedChunk {
356        let mut tries_count = 0;
357        let mut chunks_lock = self.chunks.lock();
358
359        loop {
360            let el = chunks_lock.pop();
361            let free_count = chunks_lock.len();
362            drop(chunks_lock);
363
364            match el.map(|chunk| AllocatedChunk {
365                memory: chunk,
366                len: AtomicUsize::new(0),
367                max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
368                #[cfg(feature = "track-usage")]
369                _usage: usage.clone(),
370                dealloc_fn: |ptr, _size_log2| {
371                    CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
372                    CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
373                },
374            }) {
375                None => {
376                    if !MemoryFs::reduce_pressure() {
377                        tries_count += 1;
378                    }
379
380                    chunks_lock = self.chunks.lock();
381                    if chunks_lock.len() == 0 {
382                        if !self
383                            .chunks_wait_condvar
384                            .wait_for(&mut chunks_lock, Duration::from_millis(25))
385                            .timed_out()
386                        {
387                            tries_count = 0;
388                            continue;
389                        }
390                    }
391
392                    if tries_count > 10 {
393                        #[cfg(feature = "track-usage")]
394                        {
395                            super::file::internal::MemoryFileInternal::debug_dump_files();
396                            crate::log_info!(
397                                "Usages: {:?}",
398                                USAGE_MAP
399                                    .lock()
400                                    .as_ref()
401                                    .unwrap()
402                                    .iter()
403                                    .filter(|x| *x.1 != 0)
404                                    .map(|x| format!("{:?}", x))
405                                    .collect::<Vec<_>>()
406                                    .join("\n")
407                            )
408                        }
409
410                        let mut buffers_list = self.buffers_list.lock();
411                        // We're still out of memory, let's try to allocate another chunk
412                        // This check is done again to avoid allocating multiple contiguous chunks
413                        // on multiple threads memory exaustion
414                        if chunks_lock.len() == 0 {
415                            let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
416
417                            let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
418                                * alloc_multiplier)
419                                / self.chunk_usable_size.load(Ordering::Relaxed);
420                            let chunks_iter = self
421                                .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
422                            chunks_lock.extend(chunks_iter);
423
424                            crate::log_info!(
425                                "Allocated {} extra chunks for temporary files ({})",
426                                extra_chunks_count,
427                                OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
428                            );
429                        }
430                        // Reset the tries counter
431                        tries_count = 0;
432                    }
433                }
434                Some(chunk) => {
435                    self.min_free_chunks
436                        .fetch_min(free_count, Ordering::Relaxed);
437                    #[cfg(feature = "track-usage")]
438                    {
439                        *USAGE_MAP
440                            .lock()
441                            .as_mut()
442                            .unwrap()
443                            .entry(usage.clone())
444                            .or_insert(0) += 1;
445                    }
446
447                    return chunk;
448                }
449            }
450        }
451    }
452
453    pub fn get_free_memory(&self) -> MemoryDataSize {
454        MemoryDataSize::from_octets(
455            (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
456        )
457    }
458
459    pub fn get_reserved_memory(&self) -> MemoryDataSize {
460        MemoryDataSize::from_octets(
461            ((self.chunks_total_count.load(Ordering::Relaxed)
462                - self.min_free_chunks.load(Ordering::Relaxed))
463                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
464        )
465    }
466
467    pub fn get_total_memory(&self) -> MemoryDataSize {
468        MemoryDataSize::from_octets(
469            (self.chunks_total_count.load(Ordering::Relaxed)
470                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
471        )
472    }
473
474    pub fn deinitialize(&self) {
475        let mut chunks = self.chunks.lock();
476
477        let mut counter = 0;
478        // Wait for the chunks to be written on disk
479        while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
480            drop(chunks);
481            std::thread::sleep(Duration::from_millis(200));
482
483            counter += 1;
484            if counter % 256 == 0 {
485                crate::log_warn!("WARNING: Cannot flush all the data!");
486            }
487
488            chunks = self.chunks.lock();
489        }
490
491        FILES_FLUSH_HASH_MAP.lock().take();
492
493        {
494            chunks.clear();
495            self.chunks_total_count.swap(0, Ordering::Relaxed);
496
497            for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
498                unsafe {
499                    dealloc(
500                        addr as *mut u8,
501                        Layout::from_size_align_unchecked(
502                            chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
503                            ALLOCATOR_ALIGN,
504                        ),
505                    )
506                }
507            }
508        }
509    }
510}
511
512pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
513
514#[cfg(test)]
515mod tests {
516    use crate::memory_data_size::MemoryDataSize;
517    use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
518    use rayon::prelude::*;
519
520    #[test]
521    fn allocate_memory() {
522        CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
523        for _ in 0..5 {
524            let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
525                Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
526            })
527            .take(1024 * 2)
528            .collect();
529
530            allocated_chunks.par_iter_mut().for_each(|x| {
531                x.zero_memory();
532            });
533        }
534        CHUNKS_ALLOCATOR.deinitialize();
535    }
536}