parallel_processor/memory_fs/
allocator.rs1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use parking_lot::lock_api::RawMutex as _;
4use parking_lot::{Condvar, Mutex, MutexGuard};
5use std::alloc::{alloc, dealloc, Layout};
6use std::cmp::{max, min};
7use std::slice::{from_raw_parts, from_raw_parts_mut};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::time::Duration;
10
11const ALLOCATOR_ALIGN: usize = 4096;
12const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
13const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
14const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
15
16#[macro_export]
17#[cfg(feature = "track-usage")]
18macro_rules! chunk_usage {
19 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
20 $crate::memory_fs::allocator::ChunkUsage_::$mode $({
21 $($param: $value),*
22 })?
23 }
24}
25
26#[macro_export]
27#[cfg(not(feature = "track-usage"))]
28macro_rules! chunk_usage {
29 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
30 ()
31 };
32}
33
34#[derive(Debug, Clone, Hash, Eq, PartialEq)]
35pub enum ChunkUsage_ {
36 FileBuffer { path: String },
37 InMemoryFile { path: String },
38 ReadStriped { path: String },
39 TemporarySpace,
40}
41
42pub struct AllocatedChunk {
43 memory: usize,
44 len: AtomicUsize,
45 max_len_log2: usize,
46 dealloc_fn: fn(usize, usize),
47 #[cfg(feature = "track-usage")]
48 _usage: ChunkUsage_,
49}
50unsafe impl Sync for AllocatedChunk {}
51unsafe impl Send for AllocatedChunk {}
52
53impl Clone for AllocatedChunk {
54 #[track_caller]
55 fn clone(&self) -> Self {
56 panic!("This method should not be called, check for vector cloning!");
57 }
58}
59
60impl AllocatedChunk {
61 pub const INVALID: Self = Self {
62 memory: 0,
63 len: AtomicUsize::new(0),
64 max_len_log2: 0,
65 dealloc_fn: |_, _| {},
66 #[cfg(feature = "track-usage")]
67 _usage: ChunkUsage_::TemporarySpace,
68 };
69
70 #[inline(always)]
71 #[allow(dead_code)]
72 fn zero_memory(&mut self) {
73 unsafe {
74 std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
75 }
76 }
77
78 #[inline(always)]
79 #[allow(mutable_transmutes)]
80 pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
81 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
82 std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
83 *off_len += len;
84 }
85
86 #[inline(always)]
87 #[allow(mutable_transmutes)]
88 pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
89 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
90 std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
91 *off_len += len;
92 }
93
94 #[inline(always)]
95 #[allow(mutable_transmutes)]
96 pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
97 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
98 let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
99 *off_len += len;
100 slice
101 }
102
103 pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
104 let result = self
105 .len
106 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
107 if value + data.len() <= (1 << self.max_len_log2) {
108 Some(value + data.len())
109 } else {
110 None
111 }
112 });
113
114 match result {
115 Ok(addr_offset) => {
116 unsafe {
117 std::ptr::copy_nonoverlapping(
118 data.as_ptr(),
119 (self.memory + addr_offset) as *mut u8,
120 data.len(),
121 );
122 }
123 Some(addr_offset as u64)
124 }
125 Err(_) => None,
126 }
127 }
128
129 #[inline(always)]
130 pub fn has_space_for(&self, len: usize) -> bool {
131 self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
132 }
133
134 #[inline(always)]
135 pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
136 from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
137 }
138
139 #[inline(always)]
140 pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
141 self.memory as *mut u8
142 }
143
144 #[inline(always)]
145 pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
146 &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
147 }
148
149 #[inline(always)]
150 pub fn len(&self) -> usize {
151 self.len.load(Ordering::Relaxed)
152 }
153
154 #[inline(always)]
155 pub fn max_len(&self) -> usize {
156 1 << self.max_len_log2
157 }
158
159 #[inline(always)]
160 pub fn remaining_bytes(&self) -> usize {
161 (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
162 }
163
164 #[inline(always)]
165 pub fn clear(&self) {
166 self.len.store(0, Ordering::Relaxed);
167 }
168
169 #[inline(always)]
170 pub fn get(&self) -> &[u8] {
171 unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
172 }
173
174 #[inline(always)]
175 pub unsafe fn set_len(&self, len: usize) {
176 self.len.store(len, Ordering::Relaxed);
177 }
178}
179
180impl Drop for AllocatedChunk {
181 fn drop(&mut self) {
182 (self.dealloc_fn)(self.memory, self.max_len_log2);
183 }
184}
185
186pub struct MemoryAllocationLimits {
187 pub max_usage: MemoryDataSize,
188 pub min_fs_usage: MemoryDataSize,
189}
190
191pub struct ChunksAllocator {
192 buffers_list: Mutex<Vec<(usize, usize)>>,
193 chunks_wait_condvar: Condvar,
194 chunks: Mutex<Vec<usize>>,
195 cleared_chunks: Mutex<Vec<usize>>,
196 min_free_chunks: AtomicUsize,
197 chunks_total_count: AtomicUsize,
198 is_active: AtomicBool,
199 chunk_padded_size: AtomicUsize,
200 chunk_usable_size: AtomicUsize,
201 chunks_log_size: AtomicUsize,
202}
203unsafe impl Sync for ChunksAllocator {}
204unsafe impl Send for ChunksAllocator {}
205
206#[cfg(feature = "track-usage")]
207static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
208 Mutex::const_new(parking_lot::RawMutex::INIT, None);
209
210impl ChunksAllocator {
211 const fn new() -> ChunksAllocator {
212 ChunksAllocator {
213 buffers_list: Mutex::new(vec![]),
214 chunks_wait_condvar: Condvar::new(),
215 chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
216 cleared_chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
217 min_free_chunks: AtomicUsize::new(0),
218 chunks_total_count: AtomicUsize::new(0),
219 is_active: AtomicBool::new(false),
220 chunk_padded_size: AtomicUsize::new(0),
221 chunk_usable_size: AtomicUsize::new(0),
222 chunks_log_size: AtomicUsize::new(0),
223 }
224 }
225
226 fn allocate_contiguous_chunk(
227 &self,
228 chunks_count: usize,
229 buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
230 ) -> impl Iterator<Item = usize> {
231 let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
232
233 self.chunks_total_count
234 .fetch_add(chunks_count, Ordering::Relaxed);
235
236 self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
237
238 let data = unsafe {
239 alloc(Layout::from_size_align_unchecked(
240 chunks_count * chunk_padded_size,
241 ALLOCATOR_ALIGN,
242 ))
243 };
244
245 #[cfg(feature = "memory-guards")]
246 unsafe {
247 let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
248 for i in 0..chunks_count {
249 let guard = first_guard.add(i * chunk_padded_size);
250 libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
251 }
252 }
253
254 buffers_list.push((data as usize, chunks_count));
255
256 (0..chunks_count)
257 .into_iter()
258 .rev()
259 .map(move |c| data as usize + (c * chunk_padded_size))
260 }
261
262 pub fn initialize(
263 &'static self,
264 memory: MemoryDataSize,
265 mut chunks_log_size: usize,
266 min_chunks_count: usize,
267 alloc_limits: Option<MemoryAllocationLimits>,
268 ) {
269 self.is_active.swap(true, Ordering::Relaxed);
270
271 #[cfg(target_os = "linux")]
272 if let Some(alloc_limits) = alloc_limits {
273 use crate::simple_process_stats::ProcessStats;
274
275 std::thread::spawn(move || {
277 let min_chunks_count = alloc_limits.min_fs_usage.as_bytes()
278 / self.chunk_usable_size.load(Ordering::Relaxed);
279
280 while self.is_active.load(Ordering::Relaxed) {
281 const MIN_FREE_CHUNKS_COUNT: usize = 4;
282
283 if let Ok(stats) = ProcessStats::get() {
284 if stats.memory_usage_bytes as usize > alloc_limits.max_usage.as_bytes() {
285 MemoryFs::reduce_pressure();
286 let mut chunks_to_free = vec![];
287 {
288 let cleared_chunks_count = self.cleared_chunks.lock().len();
289 let mut chunks = self.chunks.lock();
290 while chunks.len() > MIN_FREE_CHUNKS_COUNT
291 && ((stats.memory_usage_bytes as usize).saturating_sub(
292 chunks_to_free.len()
293 * self.chunk_padded_size.load(Ordering::Relaxed),
294 )) > alloc_limits.max_usage.as_bytes()
295 && min_chunks_count + cleared_chunks_count
297 + chunks_to_free.len()
298 < self.chunks_total_count.load(Ordering::Relaxed)
299 {
300 chunks_to_free.push(chunks.pop().unwrap());
301 }
302 drop(chunks);
303 }
304 let mut cleared_chunks = self.cleared_chunks.lock();
305 for chunk_start in chunks_to_free {
306 unsafe {
307 libc::madvise(
308 chunk_start as *mut libc::c_void,
309 self.chunk_padded_size.load(Ordering::Relaxed),
310 libc::MADV_DONTNEED,
311 );
312 }
313 cleared_chunks.push(chunk_start);
314 }
315 } else {
316 let allowed_space = alloc_limits.max_usage.as_bytes()
317 - stats.memory_usage_bytes as usize;
318 let allowed_chunks =
319 allowed_space / self.chunk_padded_size.load(Ordering::Relaxed);
320 if allowed_chunks > 0 {
321 let mut cleared_chunks = self.cleared_chunks.lock();
322 let mut chunks = self.chunks.lock();
323 while chunks.len() < allowed_chunks && cleared_chunks.len() > 0 {
324 chunks.push(cleared_chunks.pop().unwrap());
325 }
326 }
327 }
328 }
329
330 std::thread::sleep(Duration::from_secs(1));
331 }
332 let cleared_chunks = self.cleared_chunks.lock().drain(..).collect::<Vec<_>>();
334 self.chunks.lock().extend(cleared_chunks);
335 });
336 }
337
338 if self.buffers_list.lock().len() > 0 {
339 return;
341 }
342
343 #[cfg(feature = "track-usage")]
344 {
345 *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
346 }
347
348 chunks_log_size = min(
349 MAXIMUM_CHUNK_SIZE_LOG,
350 max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
351 );
352
353 let chunk_usable_size = 1usize << chunks_log_size;
354 let chunk_padded_size = chunk_usable_size
355 + if cfg!(feature = "memory-guards") {
356 4096
357 } else {
358 0
359 };
360 let total_padded_mem_size: MemoryDataSize =
361 MemoryDataSize::from_octets(chunk_padded_size as f64);
362
363 let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
364
365 self.chunk_padded_size
366 .store(chunk_padded_size, Ordering::Relaxed);
367 self.chunk_usable_size
368 .store(chunk_usable_size, Ordering::Relaxed);
369 self.chunks_log_size
370 .store(chunks_log_size, Ordering::Relaxed);
371
372 let chunks_iter =
373 self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
374
375 self.chunks.lock().extend(chunks_iter);
376
377 crate::log_info!(
378 "Allocator initialized: mem: {} chunks: {} log2: {}",
379 memory,
380 chunks_count,
381 chunks_log_size
382 );
383 }
384
385 pub fn giveback_free_memory(&self) {
386 #[cfg(not(target_os = "windows"))]
387 {
388 let pagesize = page_size::get();
389
390 let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
391
392 let chunks = self.chunks.lock();
393 for chunk_start in chunks.iter() {
394 unsafe {
395 libc::madvise(
396 *chunk_start as *mut libc::c_void,
397 pages_per_chunk * pagesize,
398 libc::MADV_DONTNEED,
399 );
400 }
401 }
402 }
403 }
404
405 pub fn giveback_all_memory(&self) {
406 MemoryFs::flush_to_disk(false);
407
408 loop {
409 {
410 if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
412 break;
413 }
414 }
415 std::thread::sleep(Duration::from_millis(10));
416 }
417
418 #[cfg(not(target_os = "windows"))]
419 unsafe {
420 for (buffer, chunks_count) in self.buffers_list.lock().iter() {
421 libc::madvise(
422 *buffer as *mut libc::c_void,
423 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
424 libc::MADV_DONTNEED,
425 );
426 }
427 }
428 }
429
430 pub fn request_chunk(
431 &self,
432 #[cfg(feature = "track-usage")] usage: ChunkUsage_,
433 #[cfg(not(feature = "track-usage"))] _: (),
434 ) -> AllocatedChunk {
435 let mut tries_count = 0;
436 let mut chunks_lock = self.chunks.lock();
437
438 loop {
439 let el = chunks_lock.pop();
440 let free_count = chunks_lock.len();
441 drop(chunks_lock);
442
443 match el.map(|chunk| AllocatedChunk {
444 memory: chunk,
445 len: AtomicUsize::new(0),
446 max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
447 #[cfg(feature = "track-usage")]
448 _usage: usage.clone(),
449 dealloc_fn: |ptr, _size_log2| {
450 CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
451 CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
452 },
453 }) {
454 None => {
455 if !MemoryFs::reduce_pressure() {
456 tries_count += 1;
457 }
458
459 chunks_lock = self.chunks.lock();
460 if chunks_lock.len() == 0 {
461 if !self
462 .chunks_wait_condvar
463 .wait_for(&mut chunks_lock, Duration::from_millis(25))
464 .timed_out()
465 {
466 tries_count = 0;
467 continue;
468 }
469 }
470
471 if tries_count > 10 {
472 #[cfg(feature = "track-usage")]
473 {
474 super::file::internal::MemoryFileInternal::debug_dump_files();
475 crate::log_info!(
476 "Usages: {:?}",
477 USAGE_MAP
478 .lock()
479 .as_ref()
480 .unwrap()
481 .iter()
482 .filter(|x| *x.1 != 0)
483 .map(|x| format!("{:?}", x))
484 .collect::<Vec<_>>()
485 .join("\n")
486 )
487 }
488
489 let mut buffers_list = self.buffers_list.lock();
490 if chunks_lock.len() == 0 {
494 let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
495
496 let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
497 * alloc_multiplier)
498 / self.chunk_usable_size.load(Ordering::Relaxed);
499 let chunks_iter = self
500 .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
501 chunks_lock.extend(chunks_iter);
502
503 crate::log_info!(
504 "Allocated {} extra chunks for temporary files ({})",
505 extra_chunks_count,
506 OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
507 );
508 }
509 tries_count = 0;
511 }
512 }
513 Some(chunk) => {
514 self.min_free_chunks
515 .fetch_min(free_count, Ordering::Relaxed);
516 #[cfg(feature = "track-usage")]
517 {
518 *USAGE_MAP
519 .lock()
520 .as_mut()
521 .unwrap()
522 .entry(usage.clone())
523 .or_insert(0) += 1;
524 }
525
526 return chunk;
527 }
528 }
529 }
530 }
531
532 pub fn get_free_memory(&self) -> MemoryDataSize {
533 MemoryDataSize::from_octets(
534 (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
535 )
536 }
537
538 pub fn get_reserved_memory(&self) -> MemoryDataSize {
539 MemoryDataSize::from_octets(
540 ((self.chunks_total_count.load(Ordering::Relaxed)
541 - self.min_free_chunks.load(Ordering::Relaxed))
542 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
543 )
544 }
545
546 pub fn get_total_memory(&self) -> MemoryDataSize {
547 MemoryDataSize::from_octets(
548 (self.chunks_total_count.load(Ordering::Relaxed)
549 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
550 )
551 }
552
553 pub fn deinitialize(&self) {
554 self.is_active.store(false, Ordering::Relaxed);
555 let mut chunks = self.chunks.lock();
556
557 let mut counter = 0;
558 while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
560 drop(chunks);
561 std::thread::sleep(Duration::from_millis(200));
562
563 counter += 1;
564 if counter % 256 == 0 {
565 crate::log_warn!("WARNING: Cannot flush all the data!");
566 }
567
568 chunks = self.chunks.lock();
569 }
570
571 FILES_FLUSH_HASH_MAP.lock().take();
572
573 {
574 chunks.clear();
575 self.chunks_total_count.swap(0, Ordering::Relaxed);
576
577 for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
578 unsafe {
579 dealloc(
580 addr as *mut u8,
581 Layout::from_size_align_unchecked(
582 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
583 ALLOCATOR_ALIGN,
584 ),
585 )
586 }
587 }
588 }
589 }
590}
591
592pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
593
594#[cfg(test)]
595mod tests {
596 use crate::memory_data_size::MemoryDataSize;
597 use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
598 use rayon::prelude::*;
599
600 #[test]
601 fn allocate_memory() {
602 CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
603 for _ in 0..5 {
604 let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
605 Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
606 })
607 .take(1024 * 2)
608 .collect();
609
610 allocated_chunks.par_iter_mut().for_each(|x| {
611 x.zero_memory();
612 });
613 }
614 CHUNKS_ALLOCATOR.deinitialize();
615 }
616}