parallel_processor/memory_fs/
allocator.rs

1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use parking_lot::lock_api::RawMutex as _;
4use parking_lot::{Condvar, Mutex, MutexGuard};
5use std::alloc::{alloc, dealloc, Layout};
6use std::cmp::{max, min};
7use std::slice::{from_raw_parts, from_raw_parts_mut};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::time::Duration;
10
11const ALLOCATOR_ALIGN: usize = 4096;
12const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
13const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
14const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
15
16#[macro_export]
17#[cfg(feature = "track-usage")]
18macro_rules! chunk_usage {
19    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
20        $crate::memory_fs::allocator::ChunkUsage_::$mode $({
21            $($param: $value),*
22        })?
23    }
24}
25
26#[macro_export]
27#[cfg(not(feature = "track-usage"))]
28macro_rules! chunk_usage {
29    ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
30        ()
31    };
32}
33
34#[derive(Debug, Clone, Hash, Eq, PartialEq)]
35pub enum ChunkUsage_ {
36    FileBuffer { path: String },
37    InMemoryFile { path: String },
38    ReadStriped { path: String },
39    TemporarySpace,
40}
41
42pub struct AllocatedChunk {
43    memory: usize,
44    len: AtomicUsize,
45    max_len_log2: usize,
46    dealloc_fn: fn(usize, usize),
47    #[cfg(feature = "track-usage")]
48    _usage: ChunkUsage_,
49}
50unsafe impl Sync for AllocatedChunk {}
51unsafe impl Send for AllocatedChunk {}
52
53impl Clone for AllocatedChunk {
54    #[track_caller]
55    fn clone(&self) -> Self {
56        panic!("This method should not be called, check for vector cloning!");
57    }
58}
59
60impl AllocatedChunk {
61    pub const INVALID: Self = Self {
62        memory: 0,
63        len: AtomicUsize::new(0),
64        max_len_log2: 0,
65        dealloc_fn: |_, _| {},
66        #[cfg(feature = "track-usage")]
67        _usage: ChunkUsage_::TemporarySpace,
68    };
69
70    #[inline(always)]
71    #[allow(dead_code)]
72    fn zero_memory(&mut self) {
73        unsafe {
74            std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
75        }
76    }
77
78    #[inline(always)]
79    #[allow(mutable_transmutes)]
80    pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
81        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
82        std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
83        *off_len += len;
84    }
85
86    #[inline(always)]
87    #[allow(mutable_transmutes)]
88    pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
89        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
90        std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
91        *off_len += len;
92    }
93
94    #[inline(always)]
95    #[allow(mutable_transmutes)]
96    pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
97        let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
98        let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
99        *off_len += len;
100        slice
101    }
102
103    pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
104        let result = self
105            .len
106            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
107                if value + data.len() <= (1 << self.max_len_log2) {
108                    Some(value + data.len())
109                } else {
110                    None
111                }
112            });
113
114        match result {
115            Ok(addr_offset) => {
116                unsafe {
117                    std::ptr::copy_nonoverlapping(
118                        data.as_ptr(),
119                        (self.memory + addr_offset) as *mut u8,
120                        data.len(),
121                    );
122                }
123                Some(addr_offset as u64)
124            }
125            Err(_) => None,
126        }
127    }
128
129    #[inline(always)]
130    pub fn has_space_for(&self, len: usize) -> bool {
131        self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
132    }
133
134    #[inline(always)]
135    pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
136        from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
137    }
138
139    #[inline(always)]
140    pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
141        self.memory as *mut u8
142    }
143
144    #[inline(always)]
145    pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
146        &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
147    }
148
149    #[inline(always)]
150    pub fn len(&self) -> usize {
151        self.len.load(Ordering::Relaxed)
152    }
153
154    #[inline(always)]
155    pub fn max_len(&self) -> usize {
156        1 << self.max_len_log2
157    }
158
159    #[inline(always)]
160    pub fn remaining_bytes(&self) -> usize {
161        (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
162    }
163
164    #[inline(always)]
165    pub fn clear(&self) {
166        self.len.store(0, Ordering::Relaxed);
167    }
168
169    #[inline(always)]
170    pub fn get(&self) -> &[u8] {
171        unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
172    }
173
174    #[inline(always)]
175    pub unsafe fn set_len(&self, len: usize) {
176        self.len.store(len, Ordering::Relaxed);
177    }
178}
179
180impl Drop for AllocatedChunk {
181    fn drop(&mut self) {
182        (self.dealloc_fn)(self.memory, self.max_len_log2);
183    }
184}
185
186pub struct MemoryAllocationLimits {
187    pub max_usage: MemoryDataSize,
188    pub min_fs_usage: MemoryDataSize,
189}
190
191pub struct ChunksAllocator {
192    buffers_list: Mutex<Vec<(usize, usize)>>,
193    chunks_wait_condvar: Condvar,
194    chunks: Mutex<Vec<usize>>,
195    cleared_chunks: Mutex<Vec<usize>>,
196    min_free_chunks: AtomicUsize,
197    chunks_total_count: AtomicUsize,
198    is_active: AtomicBool,
199    chunk_padded_size: AtomicUsize,
200    chunk_usable_size: AtomicUsize,
201    chunks_log_size: AtomicUsize,
202}
203unsafe impl Sync for ChunksAllocator {}
204unsafe impl Send for ChunksAllocator {}
205
206#[cfg(feature = "track-usage")]
207static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
208    Mutex::const_new(parking_lot::RawMutex::INIT, None);
209
210impl ChunksAllocator {
211    const fn new() -> ChunksAllocator {
212        ChunksAllocator {
213            buffers_list: Mutex::new(vec![]),
214            chunks_wait_condvar: Condvar::new(),
215            chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
216            cleared_chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
217            min_free_chunks: AtomicUsize::new(0),
218            chunks_total_count: AtomicUsize::new(0),
219            is_active: AtomicBool::new(false),
220            chunk_padded_size: AtomicUsize::new(0),
221            chunk_usable_size: AtomicUsize::new(0),
222            chunks_log_size: AtomicUsize::new(0),
223        }
224    }
225
226    fn allocate_contiguous_chunk(
227        &self,
228        chunks_count: usize,
229        buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
230    ) -> impl Iterator<Item = usize> {
231        let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
232
233        self.chunks_total_count
234            .fetch_add(chunks_count, Ordering::Relaxed);
235
236        self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
237
238        let data = unsafe {
239            alloc(Layout::from_size_align_unchecked(
240                chunks_count * chunk_padded_size,
241                ALLOCATOR_ALIGN,
242            ))
243        };
244
245        #[cfg(feature = "memory-guards")]
246        unsafe {
247            let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
248            for i in 0..chunks_count {
249                let guard = first_guard.add(i * chunk_padded_size);
250                libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
251            }
252        }
253
254        buffers_list.push((data as usize, chunks_count));
255
256        (0..chunks_count)
257            .into_iter()
258            .rev()
259            .map(move |c| data as usize + (c * chunk_padded_size))
260    }
261
262    pub fn initialize(
263        &'static self,
264        memory: MemoryDataSize,
265        mut chunks_log_size: usize,
266        min_chunks_count: usize,
267        alloc_limits: Option<MemoryAllocationLimits>,
268    ) {
269        self.is_active.swap(true, Ordering::Relaxed);
270
271        #[cfg(target_os = "linux")]
272        if let Some(alloc_limits) = alloc_limits {
273            use crate::simple_process_stats::ProcessStats;
274
275            // Try to softly enforce the requested allocation limits
276            std::thread::spawn(move || {
277                let min_chunks_count = alloc_limits.min_fs_usage.as_bytes()
278                    / self.chunk_usable_size.load(Ordering::Relaxed);
279
280                while self.is_active.load(Ordering::Relaxed) {
281                    const MIN_FREE_CHUNKS_COUNT: usize = 4;
282
283                    if let Ok(stats) = ProcessStats::get() {
284                        if stats.memory_usage_bytes as usize > alloc_limits.max_usage.as_bytes() {
285                            MemoryFs::reduce_pressure();
286                            let mut chunks_to_free = vec![];
287                            {
288                                let cleared_chunks_count = self.cleared_chunks.lock().len();
289                                let mut chunks = self.chunks.lock();
290                                while chunks.len() > MIN_FREE_CHUNKS_COUNT
291                                    && ((stats.memory_usage_bytes as usize).saturating_sub(
292                                        chunks_to_free.len()
293                                            * self.chunk_padded_size.load(Ordering::Relaxed),
294                                    )) > alloc_limits.max_usage.as_bytes()
295                                    && min_chunks_count // Ensure that there is a minimum amount of chunks
296                                        + cleared_chunks_count
297                                        + chunks_to_free.len()
298                                        < self.chunks_total_count.load(Ordering::Relaxed)
299                                {
300                                    chunks_to_free.push(chunks.pop().unwrap());
301                                }
302                                drop(chunks);
303                            }
304                            let mut cleared_chunks = self.cleared_chunks.lock();
305                            for chunk_start in chunks_to_free {
306                                unsafe {
307                                    libc::madvise(
308                                        chunk_start as *mut libc::c_void,
309                                        self.chunk_padded_size.load(Ordering::Relaxed),
310                                        libc::MADV_DONTNEED,
311                                    );
312                                }
313                                cleared_chunks.push(chunk_start);
314                            }
315                        } else {
316                            let allowed_space = alloc_limits.max_usage.as_bytes()
317                                - stats.memory_usage_bytes as usize;
318                            let allowed_chunks =
319                                allowed_space / self.chunk_padded_size.load(Ordering::Relaxed);
320                            if allowed_chunks > 0 {
321                                let mut cleared_chunks = self.cleared_chunks.lock();
322                                let mut chunks = self.chunks.lock();
323                                while chunks.len() < allowed_chunks && cleared_chunks.len() > 0 {
324                                    chunks.push(cleared_chunks.pop().unwrap());
325                                }
326                            }
327                        }
328                    }
329
330                    std::thread::sleep(Duration::from_secs(1));
331                }
332                // Restore all the cleared chunks on termination
333                let cleared_chunks = self.cleared_chunks.lock().drain(..).collect::<Vec<_>>();
334                self.chunks.lock().extend(cleared_chunks);
335            });
336        }
337
338        if self.buffers_list.lock().len() > 0 {
339            // Already allocated
340            return;
341        }
342
343        #[cfg(feature = "track-usage")]
344        {
345            *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
346        }
347
348        chunks_log_size = min(
349            MAXIMUM_CHUNK_SIZE_LOG,
350            max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
351        );
352
353        let chunk_usable_size = 1usize << chunks_log_size;
354        let chunk_padded_size = chunk_usable_size
355            + if cfg!(feature = "memory-guards") {
356                4096
357            } else {
358                0
359            };
360        let total_padded_mem_size: MemoryDataSize =
361            MemoryDataSize::from_octets(chunk_padded_size as f64);
362
363        let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
364
365        self.chunk_padded_size
366            .store(chunk_padded_size, Ordering::Relaxed);
367        self.chunk_usable_size
368            .store(chunk_usable_size, Ordering::Relaxed);
369        self.chunks_log_size
370            .store(chunks_log_size, Ordering::Relaxed);
371
372        let chunks_iter =
373            self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
374
375        self.chunks.lock().extend(chunks_iter);
376
377        crate::log_info!(
378            "Allocator initialized: mem: {} chunks: {} log2: {}",
379            memory,
380            chunks_count,
381            chunks_log_size
382        );
383    }
384
385    pub fn giveback_free_memory(&self) {
386        #[cfg(not(target_os = "windows"))]
387        {
388            let pagesize = page_size::get();
389
390            let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
391
392            let chunks = self.chunks.lock();
393            for chunk_start in chunks.iter() {
394                unsafe {
395                    libc::madvise(
396                        *chunk_start as *mut libc::c_void,
397                        pages_per_chunk * pagesize,
398                        libc::MADV_DONTNEED,
399                    );
400                }
401            }
402        }
403    }
404
405    pub fn giveback_all_memory(&self) {
406        MemoryFs::flush_to_disk(false);
407
408        loop {
409            {
410                // Wait for all the chunks to be freed
411                if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
412                    break;
413                }
414            }
415            std::thread::sleep(Duration::from_millis(10));
416        }
417
418        #[cfg(not(target_os = "windows"))]
419        unsafe {
420            for (buffer, chunks_count) in self.buffers_list.lock().iter() {
421                libc::madvise(
422                    *buffer as *mut libc::c_void,
423                    chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
424                    libc::MADV_DONTNEED,
425                );
426            }
427        }
428    }
429
430    pub fn request_chunk(
431        &self,
432        #[cfg(feature = "track-usage")] usage: ChunkUsage_,
433        #[cfg(not(feature = "track-usage"))] _: (),
434    ) -> AllocatedChunk {
435        let mut tries_count = 0;
436        let mut chunks_lock = self.chunks.lock();
437
438        loop {
439            let el = chunks_lock.pop();
440            let free_count = chunks_lock.len();
441            drop(chunks_lock);
442
443            match el.map(|chunk| AllocatedChunk {
444                memory: chunk,
445                len: AtomicUsize::new(0),
446                max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
447                #[cfg(feature = "track-usage")]
448                _usage: usage.clone(),
449                dealloc_fn: |ptr, _size_log2| {
450                    CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
451                    CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
452                },
453            }) {
454                None => {
455                    if !MemoryFs::reduce_pressure() {
456                        tries_count += 1;
457                    }
458
459                    chunks_lock = self.chunks.lock();
460                    if chunks_lock.len() == 0 {
461                        if !self
462                            .chunks_wait_condvar
463                            .wait_for(&mut chunks_lock, Duration::from_millis(25))
464                            .timed_out()
465                        {
466                            tries_count = 0;
467                            continue;
468                        }
469                    }
470
471                    if tries_count > 10 {
472                        #[cfg(feature = "track-usage")]
473                        {
474                            super::file::internal::MemoryFileInternal::debug_dump_files();
475                            crate::log_info!(
476                                "Usages: {:?}",
477                                USAGE_MAP
478                                    .lock()
479                                    .as_ref()
480                                    .unwrap()
481                                    .iter()
482                                    .filter(|x| *x.1 != 0)
483                                    .map(|x| format!("{:?}", x))
484                                    .collect::<Vec<_>>()
485                                    .join("\n")
486                            )
487                        }
488
489                        let mut buffers_list = self.buffers_list.lock();
490                        // We're still out of memory, let's try to allocate another chunk
491                        // This check is done again to avoid allocating multiple contiguous chunks
492                        // on multiple threads memory exaustion
493                        if chunks_lock.len() == 0 {
494                            let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
495
496                            let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
497                                * alloc_multiplier)
498                                / self.chunk_usable_size.load(Ordering::Relaxed);
499                            let chunks_iter = self
500                                .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
501                            chunks_lock.extend(chunks_iter);
502
503                            crate::log_info!(
504                                "Allocated {} extra chunks for temporary files ({})",
505                                extra_chunks_count,
506                                OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
507                            );
508                        }
509                        // Reset the tries counter
510                        tries_count = 0;
511                    }
512                }
513                Some(chunk) => {
514                    self.min_free_chunks
515                        .fetch_min(free_count, Ordering::Relaxed);
516                    #[cfg(feature = "track-usage")]
517                    {
518                        *USAGE_MAP
519                            .lock()
520                            .as_mut()
521                            .unwrap()
522                            .entry(usage.clone())
523                            .or_insert(0) += 1;
524                    }
525
526                    return chunk;
527                }
528            }
529        }
530    }
531
532    pub fn get_free_memory(&self) -> MemoryDataSize {
533        MemoryDataSize::from_octets(
534            (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
535        )
536    }
537
538    pub fn get_reserved_memory(&self) -> MemoryDataSize {
539        MemoryDataSize::from_octets(
540            ((self.chunks_total_count.load(Ordering::Relaxed)
541                - self.min_free_chunks.load(Ordering::Relaxed))
542                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
543        )
544    }
545
546    pub fn get_total_memory(&self) -> MemoryDataSize {
547        MemoryDataSize::from_octets(
548            (self.chunks_total_count.load(Ordering::Relaxed)
549                * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
550        )
551    }
552
553    pub fn deinitialize(&self) {
554        self.is_active.store(false, Ordering::Relaxed);
555        let mut chunks = self.chunks.lock();
556
557        let mut counter = 0;
558        // Wait for the chunks to be written on disk
559        while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
560            drop(chunks);
561            std::thread::sleep(Duration::from_millis(200));
562
563            counter += 1;
564            if counter % 256 == 0 {
565                crate::log_warn!("WARNING: Cannot flush all the data!");
566            }
567
568            chunks = self.chunks.lock();
569        }
570
571        FILES_FLUSH_HASH_MAP.lock().take();
572
573        {
574            chunks.clear();
575            self.chunks_total_count.swap(0, Ordering::Relaxed);
576
577            for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
578                unsafe {
579                    dealloc(
580                        addr as *mut u8,
581                        Layout::from_size_align_unchecked(
582                            chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
583                            ALLOCATOR_ALIGN,
584                        ),
585                    )
586                }
587            }
588        }
589    }
590}
591
592pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
593
594#[cfg(test)]
595mod tests {
596    use crate::memory_data_size::MemoryDataSize;
597    use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
598    use rayon::prelude::*;
599
600    #[test]
601    fn allocate_memory() {
602        CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
603        for _ in 0..5 {
604            let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
605                Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
606            })
607            .take(1024 * 2)
608            .collect();
609
610            allocated_chunks.par_iter_mut().for_each(|x| {
611                x.zero_memory();
612            });
613        }
614        CHUNKS_ALLOCATOR.deinitialize();
615    }
616}