parallel_processor/memory_fs/
allocator.rs

1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use crate::simple_process_stats::ProcessStats;
4use parking_lot::lock_api::RawMutex as _;
5use parking_lot::{Condvar, Mutex, MutexGuard};
6use std::alloc::{alloc, dealloc, Layout};
7use std::cmp::{max, min};
8use std::slice::{from_raw_parts, from_raw_parts_mut};
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::time::Duration;
11
12const ALLOCATOR_ALIGN: usize = 4096;
13const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
14const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
15const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
16
17#[macro_export]
18#[cfg(feature = "track-usage")]
19macro_rules! chunk_usage {
20    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
21        $crate::memory_fs::allocator::ChunkUsage_::$mode $({
22            $($param: $value),*
23        })?
24    }
25}
26
27#[macro_export]
28#[cfg(not(feature = "track-usage"))]
29macro_rules! chunk_usage {
30    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
31        ()
32    };
33}
34
35#[derive(Debug, Clone, Hash, Eq, PartialEq)]
36pub enum ChunkUsage_ {
37    FileBuffer { path: String },
38    InMemoryFile { path: String },
39    ReadStriped { path: String },
40    TemporarySpace,
41}
42
43pub struct AllocatedChunk {
44    memory: usize,
45    len: AtomicUsize,
46    max_len_log2: usize,
47    dealloc_fn: fn(usize, usize),
48    #[cfg(feature = "track-usage")]
49    _usage: ChunkUsage_,
50}
51unsafe impl Sync for AllocatedChunk {}
52unsafe impl Send for AllocatedChunk {}
53
54impl Clone for AllocatedChunk {
55    #[track_caller]
56    fn clone(&self) -> Self {
57        panic!("This method should not be called, check for vector cloning!");
58    }
59}
60
61impl AllocatedChunk {
62    pub const INVALID: Self = Self {
63        memory: 0,
64        len: AtomicUsize::new(0),
65        max_len_log2: 0,
66        dealloc_fn: |_, _| {},
67        #[cfg(feature = "track-usage")]
68        _usage: ChunkUsage_::TemporarySpace,
69    };
70
71    #[inline(always)]
72    #[allow(dead_code)]
73    fn zero_memory(&mut self) {
74        unsafe {
75            std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
76        }
77    }
78
79    #[inline(always)]
80    #[allow(mutable_transmutes)]
81    pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
82        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
83        std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
84        *off_len += len;
85    }
86
87    #[inline(always)]
88    #[allow(mutable_transmutes)]
89    pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
90        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
91        std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
92        *off_len += len;
93    }
94
95    #[inline(always)]
96    #[allow(mutable_transmutes)]
97    pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
98        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
99        let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
100        *off_len += len;
101        slice
102    }
103
104    pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
105        let result = self
106            .len
107            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
108                if value + data.len() <= (1 << self.max_len_log2) {
109                    Some(value + data.len())
110                } else {
111                    None
112                }
113            });
114
115        match result {
116            Ok(addr_offset) => {
117                unsafe {
118                    std::ptr::copy_nonoverlapping(
119                        data.as_ptr(),
120                        (self.memory + addr_offset) as *mut u8,
121                        data.len(),
122                    );
123                }
124                Some(addr_offset as u64)
125            }
126            Err(_) => None,
127        }
128    }
129
130    #[inline(always)]
131    pub fn has_space_for(&self, len: usize) -> bool {
132        self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
133    }
134
135    #[inline(always)]
136    pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
137        from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
138    }
139
140    #[inline(always)]
141    pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
142        self.memory as *mut u8
143    }
144
145    #[inline(always)]
146    pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
147        &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
148    }
149
150    #[inline(always)]
151    pub fn len(&self) -> usize {
152        self.len.load(Ordering::Relaxed)
153    }
154
155    #[inline(always)]
156    pub fn max_len(&self) -> usize {
157        1 << self.max_len_log2
158    }
159
160    #[inline(always)]
161    pub fn remaining_bytes(&self) -> usize {
162        (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
163    }
164
165    #[inline(always)]
166    pub fn clear(&self) {
167        self.len.store(0, Ordering::Relaxed);
168    }
169
170    #[inline(always)]
171    pub fn get(&self) -> &[u8] {
172        unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
173    }
174
175    #[inline(always)]
176    pub unsafe fn set_len(&self, len: usize) {
177        self.len.store(len, Ordering::Relaxed);
178    }
179}
180
181impl Drop for AllocatedChunk {
182    fn drop(&mut self) {
183        (self.dealloc_fn)(self.memory, self.max_len_log2);
184    }
185}
186
187pub struct MemoryAllocationLimits {
188    pub max_usage: MemoryDataSize,
189    pub min_fs_usage: MemoryDataSize,
190}
191
192pub struct ChunksAllocator {
193    buffers_list: Mutex<Vec<(usize, usize)>>,
194    chunks_wait_condvar: Condvar,
195    chunks: Mutex<Vec<usize>>,
196    cleared_chunks: Mutex<Vec<usize>>,
197    min_free_chunks: AtomicUsize,
198    chunks_total_count: AtomicUsize,
199    is_active: AtomicBool,
200    chunk_padded_size: AtomicUsize,
201    chunk_usable_size: AtomicUsize,
202    chunks_log_size: AtomicUsize,
203}
204unsafe impl Sync for ChunksAllocator {}
205unsafe impl Send for ChunksAllocator {}
206
207#[cfg(feature = "track-usage")]
208static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
209    Mutex::const_new(parking_lot::RawMutex::INIT, None);
210
211impl ChunksAllocator {
212    const fn new() -> ChunksAllocator {
213        ChunksAllocator {
214            buffers_list: Mutex::new(vec![]),
215            chunks_wait_condvar: Condvar::new(),
216            chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
217            cleared_chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
218            min_free_chunks: AtomicUsize::new(0),
219            chunks_total_count: AtomicUsize::new(0),
220            is_active: AtomicBool::new(false),
221            chunk_padded_size: AtomicUsize::new(0),
222            chunk_usable_size: AtomicUsize::new(0),
223            chunks_log_size: AtomicUsize::new(0),
224        }
225    }
226
227    fn allocate_contiguous_chunk(
228        &self,
229        chunks_count: usize,
230        buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
231    ) -> impl Iterator<Item = usize> {
232        let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
233
234        self.chunks_total_count
235            .fetch_add(chunks_count, Ordering::Relaxed);
236
237        self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
238
239        let data = unsafe {
240            alloc(Layout::from_size_align_unchecked(
241                chunks_count * chunk_padded_size,
242                ALLOCATOR_ALIGN,
243            ))
244        };
245
246        #[cfg(feature = "memory-guards")]
247        unsafe {
248            let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
249            for i in 0..chunks_count {
250                let guard = first_guard.add(i * chunk_padded_size);
251                libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
252            }
253        }
254
255        buffers_list.push((data as usize, chunks_count));
256
257        (0..chunks_count)
258            .into_iter()
259            .rev()
260            .map(move |c| data as usize + (c * chunk_padded_size))
261    }
262
263    pub fn initialize(
264        &'static self,
265        memory: MemoryDataSize,
266        mut chunks_log_size: usize,
267        min_chunks_count: usize,
268        alloc_limits: Option<MemoryAllocationLimits>,
269    ) {
270        self.is_active.swap(true, Ordering::Relaxed);
271
272        #[cfg(not(target_os = "windows"))]
273        if let Some(alloc_limits) = alloc_limits {
274            // Try to softly enforce the requested allocation limits
275            std::thread::spawn(move || {
276                let min_chunks_count = alloc_limits.min_fs_usage.as_bytes()
277                    / self.chunk_usable_size.load(Ordering::Relaxed);
278
279                while self.is_active.load(Ordering::Relaxed) {
280                    const MIN_FREE_CHUNKS_COUNT: usize = 4;
281
282                    if let Ok(stats) = ProcessStats::get() {
283                        if stats.memory_usage_bytes as usize > alloc_limits.max_usage.as_bytes() {
284                            MemoryFs::reduce_pressure();
285                            let mut chunks_to_free = vec![];
286                            {
287                                let cleared_chunks_count = self.cleared_chunks.lock().len();
288                                let mut chunks = self.chunks.lock();
289                                while chunks.len() > MIN_FREE_CHUNKS_COUNT
290                                    && ((stats.memory_usage_bytes as usize).saturating_sub(
291                                        chunks_to_free.len()
292                                            * self.chunk_padded_size.load(Ordering::Relaxed),
293                                    )) > alloc_limits.max_usage.as_bytes()
294                                    && min_chunks_count // Ensure that there is a minimum amount of chunks
295                                        + cleared_chunks_count
296                                        + chunks_to_free.len()
297                                        < self.chunks_total_count.load(Ordering::Relaxed)
298                                {
299                                    chunks_to_free.push(chunks.pop().unwrap());
300                                }
301                                drop(chunks);
302                            }
303                            let mut cleared_chunks = self.cleared_chunks.lock();
304                            for chunk_start in chunks_to_free {
305                                unsafe {
306                                    libc::madvise(
307                                        chunk_start as *mut libc::c_void,
308                                        self.chunk_padded_size.load(Ordering::Relaxed),
309                                        libc::MADV_DONTNEED,
310                                    );
311                                }
312                                cleared_chunks.push(chunk_start);
313                            }
314                        } else {
315                            let allowed_space = alloc_limits.max_usage.as_bytes()
316                                - stats.memory_usage_bytes as usize;
317                            let allowed_chunks =
318                                allowed_space / self.chunk_padded_size.load(Ordering::Relaxed);
319                            if allowed_chunks > 0 {
320                                let mut cleared_chunks = self.cleared_chunks.lock();
321                                let mut chunks = self.chunks.lock();
322                                while chunks.len() < allowed_chunks && cleared_chunks.len() > 0 {
323                                    chunks.push(cleared_chunks.pop().unwrap());
324                                }
325                            }
326                        }
327                    }
328
329                    std::thread::sleep(Duration::from_secs(1));
330                }
331                // Restore all the cleared chunks on termination
332                let cleared_chunks = self.cleared_chunks.lock().drain(..).collect::<Vec<_>>();
333                self.chunks.lock().extend(cleared_chunks);
334            });
335        }
336
337        if self.buffers_list.lock().len() > 0 {
338            // Already allocated
339            return;
340        }
341
342        #[cfg(feature = "track-usage")]
343        {
344            *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
345        }
346
347        chunks_log_size = min(
348            MAXIMUM_CHUNK_SIZE_LOG,
349            max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
350        );
351
352        let chunk_usable_size = 1usize << chunks_log_size;
353        let chunk_padded_size = chunk_usable_size
354            + if cfg!(feature = "memory-guards") {
355                4096
356            } else {
357                0
358            };
359        let total_padded_mem_size: MemoryDataSize =
360            MemoryDataSize::from_octets(chunk_padded_size as f64);
361
362        let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
363
364        self.chunk_padded_size
365            .store(chunk_padded_size, Ordering::Relaxed);
366        self.chunk_usable_size
367            .store(chunk_usable_size, Ordering::Relaxed);
368        self.chunks_log_size
369            .store(chunks_log_size, Ordering::Relaxed);
370
371        let chunks_iter =
372            self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
373
374        self.chunks.lock().extend(chunks_iter);
375
376        crate::log_info!(
377            "Allocator initialized: mem: {} chunks: {} log2: {}",
378            memory,
379            chunks_count,
380            chunks_log_size
381        );
382    }
383
384    pub fn giveback_free_memory(&self) {
385        #[cfg(not(target_os = "windows"))]
386        {
387            let pagesize = page_size::get();
388
389            let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
390
391            let chunks = self.chunks.lock();
392            for chunk_start in chunks.iter() {
393                unsafe {
394                    libc::madvise(
395                        *chunk_start as *mut libc::c_void,
396                        pages_per_chunk * pagesize,
397                        libc::MADV_DONTNEED,
398                    );
399                }
400            }
401        }
402    }
403
404    pub fn giveback_all_memory(&self) {
405        MemoryFs::flush_to_disk(false);
406
407        loop {
408            {
409                // Wait for all the chunks to be freed
410                if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
411                    break;
412                }
413            }
414            std::thread::sleep(Duration::from_millis(10));
415        }
416
417        #[cfg(not(target_os = "windows"))]
418        unsafe {
419            for (buffer, chunks_count) in self.buffers_list.lock().iter() {
420                libc::madvise(
421                    *buffer as *mut libc::c_void,
422                    chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
423                    libc::MADV_DONTNEED,
424                );
425            }
426        }
427    }
428
429    pub fn request_chunk(
430        &self,
431        #[cfg(feature = "track-usage")] usage: ChunkUsage_,
432        #[cfg(not(feature = "track-usage"))] _: (),
433    ) -> AllocatedChunk {
434        let mut tries_count = 0;
435        let mut chunks_lock = self.chunks.lock();
436
437        loop {
438            let el = chunks_lock.pop();
439            let free_count = chunks_lock.len();
440            drop(chunks_lock);
441
442            match el.map(|chunk| AllocatedChunk {
443                memory: chunk,
444                len: AtomicUsize::new(0),
445                max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
446                #[cfg(feature = "track-usage")]
447                _usage: usage.clone(),
448                dealloc_fn: |ptr, _size_log2| {
449                    CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
450                    CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
451                },
452            }) {
453                None => {
454                    if !MemoryFs::reduce_pressure() {
455                        tries_count += 1;
456                    }
457
458                    chunks_lock = self.chunks.lock();
459                    if chunks_lock.len() == 0 {
460                        if !self
461                            .chunks_wait_condvar
462                            .wait_for(&mut chunks_lock, Duration::from_millis(25))
463                            .timed_out()
464                        {
465                            tries_count = 0;
466                            continue;
467                        }
468                    }
469
470                    if tries_count > 10 {
471                        #[cfg(feature = "track-usage")]
472                        {
473                            super::file::internal::MemoryFileInternal::debug_dump_files();
474                            crate::log_info!(
475                                "Usages: {:?}",
476                                USAGE_MAP
477                                    .lock()
478                                    .as_ref()
479                                    .unwrap()
480                                    .iter()
481                                    .filter(|x| *x.1 != 0)
482                                    .map(|x| format!("{:?}", x))
483                                    .collect::<Vec<_>>()
484                                    .join("\n")
485                            )
486                        }
487
488                        let mut buffers_list = self.buffers_list.lock();
489                        // We're still out of memory, let's try to allocate another chunk
490                        // This check is done again to avoid allocating multiple contiguous chunks
491                        // on multiple threads memory exaustion
492                        if chunks_lock.len() == 0 {
493                            let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
494
495                            let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
496                                * alloc_multiplier)
497                                / self.chunk_usable_size.load(Ordering::Relaxed);
498                            let chunks_iter = self
499                                .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
500                            chunks_lock.extend(chunks_iter);
501
502                            crate::log_info!(
503                                "Allocated {} extra chunks for temporary files ({})",
504                                extra_chunks_count,
505                                OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
506                            );
507                        }
508                        // Reset the tries counter
509                        tries_count = 0;
510                    }
511                }
512                Some(chunk) => {
513                    self.min_free_chunks
514                        .fetch_min(free_count, Ordering::Relaxed);
515                    #[cfg(feature = "track-usage")]
516                    {
517                        *USAGE_MAP
518                            .lock()
519                            .as_mut()
520                            .unwrap()
521                            .entry(usage.clone())
522                            .or_insert(0) += 1;
523                    }
524
525                    return chunk;
526                }
527            }
528        }
529    }
530
531    pub fn get_free_memory(&self) -> MemoryDataSize {
532        MemoryDataSize::from_octets(
533            (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
534        )
535    }
536
537    pub fn get_reserved_memory(&self) -> MemoryDataSize {
538        MemoryDataSize::from_octets(
539            ((self.chunks_total_count.load(Ordering::Relaxed)
540                - self.min_free_chunks.load(Ordering::Relaxed))
541                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
542        )
543    }
544
545    pub fn get_total_memory(&self) -> MemoryDataSize {
546        MemoryDataSize::from_octets(
547            (self.chunks_total_count.load(Ordering::Relaxed)
548                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
549        )
550    }
551
552    pub fn deinitialize(&self) {
553        self.is_active.store(false, Ordering::Relaxed);
554        let mut chunks = self.chunks.lock();
555
556        let mut counter = 0;
557        // Wait for the chunks to be written on disk
558        while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
559            drop(chunks);
560            std::thread::sleep(Duration::from_millis(200));
561
562            counter += 1;
563            if counter % 256 == 0 {
564                crate::log_warn!("WARNING: Cannot flush all the data!");
565            }
566
567            chunks = self.chunks.lock();
568        }
569
570        FILES_FLUSH_HASH_MAP.lock().take();
571
572        {
573            chunks.clear();
574            self.chunks_total_count.swap(0, Ordering::Relaxed);
575
576            for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
577                unsafe {
578                    dealloc(
579                        addr as *mut u8,
580                        Layout::from_size_align_unchecked(
581                            chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
582                            ALLOCATOR_ALIGN,
583                        ),
584                    )
585                }
586            }
587        }
588    }
589}
590
591pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
592
593#[cfg(test)]
594mod tests {
595    use crate::memory_data_size::MemoryDataSize;
596    use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
597    use rayon::prelude::*;
598
599    #[test]
600    fn allocate_memory() {
601        CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
602        for _ in 0..5 {
603            let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
604                Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
605            })
606            .take(1024 * 2)
607            .collect();
608
609            allocated_chunks.par_iter_mut().for_each(|x| {
610                x.zero_memory();
611            });
612        }
613        CHUNKS_ALLOCATOR.deinitialize();
614    }
615}