parallel_processor/memory_fs/
allocator.rs1use crate::memory_data_size::MemoryDataSize;
2use crate::memory_fs::{MemoryFs, FILES_FLUSH_HASH_MAP};
3use crate::simple_process_stats::ProcessStats;
4use parking_lot::lock_api::RawMutex as _;
5use parking_lot::{Condvar, Mutex, MutexGuard};
6use std::alloc::{alloc, dealloc, Layout};
7use std::cmp::{max, min};
8use std::slice::{from_raw_parts, from_raw_parts_mut};
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::time::Duration;
11
12const ALLOCATOR_ALIGN: usize = 4096;
13const OUT_OF_MEMORY_ALLOCATION_SIZE: MemoryDataSize = MemoryDataSize::from_mebioctets(256);
14const MAXIMUM_CHUNK_SIZE_LOG: usize = 18;
15const MINIMUM_CHUNK_SIZE_LOG: usize = 12;
16
17#[macro_export]
18#[cfg(feature = "track-usage")]
19macro_rules! chunk_usage {
20 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
21 $crate::memory_fs::allocator::ChunkUsage_::$mode $({
22 $($param: $value),*
23 })?
24 }
25}
26
27#[macro_export]
28#[cfg(not(feature = "track-usage"))]
29macro_rules! chunk_usage {
30 ($mode:ident $({ $($param:ident : $value:expr),* })?) => {
31 ()
32 };
33}
34
35#[derive(Debug, Clone, Hash, Eq, PartialEq)]
36pub enum ChunkUsage_ {
37 FileBuffer { path: String },
38 InMemoryFile { path: String },
39 ReadStriped { path: String },
40 TemporarySpace,
41}
42
43pub struct AllocatedChunk {
44 memory: usize,
45 len: AtomicUsize,
46 max_len_log2: usize,
47 dealloc_fn: fn(usize, usize),
48 #[cfg(feature = "track-usage")]
49 _usage: ChunkUsage_,
50}
51unsafe impl Sync for AllocatedChunk {}
52unsafe impl Send for AllocatedChunk {}
53
54impl Clone for AllocatedChunk {
55 #[track_caller]
56 fn clone(&self) -> Self {
57 panic!("This method should not be called, check for vector cloning!");
58 }
59}
60
61impl AllocatedChunk {
62 pub const INVALID: Self = Self {
63 memory: 0,
64 len: AtomicUsize::new(0),
65 max_len_log2: 0,
66 dealloc_fn: |_, _| {},
67 #[cfg(feature = "track-usage")]
68 _usage: ChunkUsage_::TemporarySpace,
69 };
70
71 #[inline(always)]
72 #[allow(dead_code)]
73 fn zero_memory(&mut self) {
74 unsafe {
75 std::ptr::write_bytes(self.memory as *mut u8, 0, 1 << self.max_len_log2);
76 }
77 }
78
79 #[inline(always)]
80 #[allow(mutable_transmutes)]
81 pub unsafe fn write_bytes_noextend_single_thread(&self, data: *const u8, len: usize) {
82 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
83 std::ptr::copy_nonoverlapping(data, (self.memory + *off_len) as *mut u8, len);
84 *off_len += len;
85 }
86
87 #[inline(always)]
88 #[allow(mutable_transmutes)]
89 pub unsafe fn write_zero_bytes_noextend_single_thread(&self, len: usize) {
90 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
91 std::ptr::write_bytes((self.memory + *off_len) as *mut u8, 0, len);
92 *off_len += len;
93 }
94
95 #[inline(always)]
96 #[allow(mutable_transmutes)]
97 pub unsafe fn prealloc_bytes_single_thread(&self, len: usize) -> &'static mut [u8] {
98 let off_len = std::mem::transmute::<&AtomicUsize, &mut usize>(&self.len);
99 let slice = from_raw_parts_mut((self.memory + *off_len) as *mut u8, len);
100 *off_len += len;
101 slice
102 }
103
104 pub fn write_bytes_noextend(&self, data: &[u8]) -> Option<u64> {
105 let result = self
106 .len
107 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
108 if value + data.len() <= (1 << self.max_len_log2) {
109 Some(value + data.len())
110 } else {
111 None
112 }
113 });
114
115 match result {
116 Ok(addr_offset) => {
117 unsafe {
118 std::ptr::copy_nonoverlapping(
119 data.as_ptr(),
120 (self.memory + addr_offset) as *mut u8,
121 data.len(),
122 );
123 }
124 Some(addr_offset as u64)
125 }
126 Err(_) => None,
127 }
128 }
129
130 #[inline(always)]
131 pub fn has_space_for(&self, len: usize) -> bool {
132 self.len.load(Ordering::Relaxed) + len <= (1 << self.max_len_log2)
133 }
134
135 #[inline(always)]
136 pub unsafe fn get_mut_slice(&self) -> &'static mut [u8] {
137 from_raw_parts_mut(self.memory as *mut u8, self.len.load(Ordering::Relaxed))
138 }
139
140 #[inline(always)]
141 pub unsafe fn get_mut_ptr(&self) -> *mut u8 {
142 self.memory as *mut u8
143 }
144
145 #[inline(always)]
146 pub unsafe fn get_object_reference_mut<T>(&self, offset_in_bytes: usize) -> &'static mut T {
147 &mut *((self.memory as *mut u8).add(offset_in_bytes) as *mut T)
148 }
149
150 #[inline(always)]
151 pub fn len(&self) -> usize {
152 self.len.load(Ordering::Relaxed)
153 }
154
155 #[inline(always)]
156 pub fn max_len(&self) -> usize {
157 1 << self.max_len_log2
158 }
159
160 #[inline(always)]
161 pub fn remaining_bytes(&self) -> usize {
162 (1 << self.max_len_log2) - self.len.load(Ordering::Relaxed)
163 }
164
165 #[inline(always)]
166 pub fn clear(&self) {
167 self.len.store(0, Ordering::Relaxed);
168 }
169
170 #[inline(always)]
171 pub fn get(&self) -> &[u8] {
172 unsafe { from_raw_parts(self.memory as *mut u8, self.len.load(Ordering::Relaxed)) }
173 }
174
175 #[inline(always)]
176 pub unsafe fn set_len(&self, len: usize) {
177 self.len.store(len, Ordering::Relaxed);
178 }
179}
180
181impl Drop for AllocatedChunk {
182 fn drop(&mut self) {
183 (self.dealloc_fn)(self.memory, self.max_len_log2);
184 }
185}
186
187pub struct MemoryAllocationLimits {
188 pub max_usage: MemoryDataSize,
189 pub min_fs_usage: MemoryDataSize,
190}
191
192pub struct ChunksAllocator {
193 buffers_list: Mutex<Vec<(usize, usize)>>,
194 chunks_wait_condvar: Condvar,
195 chunks: Mutex<Vec<usize>>,
196 cleared_chunks: Mutex<Vec<usize>>,
197 min_free_chunks: AtomicUsize,
198 chunks_total_count: AtomicUsize,
199 is_active: AtomicBool,
200 chunk_padded_size: AtomicUsize,
201 chunk_usable_size: AtomicUsize,
202 chunks_log_size: AtomicUsize,
203}
204unsafe impl Sync for ChunksAllocator {}
205unsafe impl Send for ChunksAllocator {}
206
207#[cfg(feature = "track-usage")]
208static USAGE_MAP: Mutex<Option<std::collections::HashMap<ChunkUsage_, usize>>> =
209 Mutex::const_new(parking_lot::RawMutex::INIT, None);
210
211impl ChunksAllocator {
212 const fn new() -> ChunksAllocator {
213 ChunksAllocator {
214 buffers_list: Mutex::new(vec![]),
215 chunks_wait_condvar: Condvar::new(),
216 chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
217 cleared_chunks: Mutex::const_new(parking_lot::RawMutex::INIT, Vec::new()),
218 min_free_chunks: AtomicUsize::new(0),
219 chunks_total_count: AtomicUsize::new(0),
220 is_active: AtomicBool::new(false),
221 chunk_padded_size: AtomicUsize::new(0),
222 chunk_usable_size: AtomicUsize::new(0),
223 chunks_log_size: AtomicUsize::new(0),
224 }
225 }
226
227 fn allocate_contiguous_chunk(
228 &self,
229 chunks_count: usize,
230 buffers_list: &mut MutexGuard<Vec<(usize, usize)>>,
231 ) -> impl Iterator<Item = usize> {
232 let chunk_padded_size = self.chunk_padded_size.load(Ordering::Relaxed);
233
234 self.chunks_total_count
235 .fetch_add(chunks_count, Ordering::Relaxed);
236
237 self.min_free_chunks.store(chunks_count, Ordering::Relaxed);
238
239 let data = unsafe {
240 alloc(Layout::from_size_align_unchecked(
241 chunks_count * chunk_padded_size,
242 ALLOCATOR_ALIGN,
243 ))
244 };
245
246 #[cfg(feature = "memory-guards")]
247 unsafe {
248 let first_guard = data.add(self.chunk_usable_size.load(Ordering::Relaxed));
249 for i in 0..chunks_count {
250 let guard = first_guard.add(i * chunk_padded_size);
251 libc::mprotect(guard as *mut libc::c_void, 4096, libc::PROT_NONE);
252 }
253 }
254
255 buffers_list.push((data as usize, chunks_count));
256
257 (0..chunks_count)
258 .into_iter()
259 .rev()
260 .map(move |c| data as usize + (c * chunk_padded_size))
261 }
262
263 pub fn initialize(
264 &'static self,
265 memory: MemoryDataSize,
266 mut chunks_log_size: usize,
267 min_chunks_count: usize,
268 alloc_limits: Option<MemoryAllocationLimits>,
269 ) {
270 self.is_active.swap(true, Ordering::Relaxed);
271
272 #[cfg(not(target_os = "windows"))]
273 if let Some(alloc_limits) = alloc_limits {
274 std::thread::spawn(move || {
276 let min_chunks_count = alloc_limits.min_fs_usage.as_bytes()
277 / self.chunk_usable_size.load(Ordering::Relaxed);
278
279 while self.is_active.load(Ordering::Relaxed) {
280 const MIN_FREE_CHUNKS_COUNT: usize = 4;
281
282 if let Ok(stats) = ProcessStats::get() {
283 if stats.memory_usage_bytes as usize > alloc_limits.max_usage.as_bytes() {
284 MemoryFs::reduce_pressure();
285 let mut chunks_to_free = vec![];
286 {
287 let cleared_chunks_count = self.cleared_chunks.lock().len();
288 let mut chunks = self.chunks.lock();
289 while chunks.len() > MIN_FREE_CHUNKS_COUNT
290 && ((stats.memory_usage_bytes as usize).saturating_sub(
291 chunks_to_free.len()
292 * self.chunk_padded_size.load(Ordering::Relaxed),
293 )) > alloc_limits.max_usage.as_bytes()
294 && min_chunks_count + cleared_chunks_count
296 + chunks_to_free.len()
297 < self.chunks_total_count.load(Ordering::Relaxed)
298 {
299 chunks_to_free.push(chunks.pop().unwrap());
300 }
301 drop(chunks);
302 }
303 let mut cleared_chunks = self.cleared_chunks.lock();
304 for chunk_start in chunks_to_free {
305 unsafe {
306 libc::madvise(
307 chunk_start as *mut libc::c_void,
308 self.chunk_padded_size.load(Ordering::Relaxed),
309 libc::MADV_DONTNEED,
310 );
311 }
312 cleared_chunks.push(chunk_start);
313 }
314 } else {
315 let allowed_space = alloc_limits.max_usage.as_bytes()
316 - stats.memory_usage_bytes as usize;
317 let allowed_chunks =
318 allowed_space / self.chunk_padded_size.load(Ordering::Relaxed);
319 if allowed_chunks > 0 {
320 let mut cleared_chunks = self.cleared_chunks.lock();
321 let mut chunks = self.chunks.lock();
322 while chunks.len() < allowed_chunks && cleared_chunks.len() > 0 {
323 chunks.push(cleared_chunks.pop().unwrap());
324 }
325 }
326 }
327 }
328
329 std::thread::sleep(Duration::from_secs(1));
330 }
331 let cleared_chunks = self.cleared_chunks.lock().drain(..).collect::<Vec<_>>();
333 self.chunks.lock().extend(cleared_chunks);
334 });
335 }
336
337 if self.buffers_list.lock().len() > 0 {
338 return;
340 }
341
342 #[cfg(feature = "track-usage")]
343 {
344 *USAGE_MAP.lock() = Some(std::collections::HashMap::new());
345 }
346
347 chunks_log_size = min(
348 MAXIMUM_CHUNK_SIZE_LOG,
349 max(MINIMUM_CHUNK_SIZE_LOG, chunks_log_size),
350 );
351
352 let chunk_usable_size = 1usize << chunks_log_size;
353 let chunk_padded_size = chunk_usable_size
354 + if cfg!(feature = "memory-guards") {
355 4096
356 } else {
357 0
358 };
359 let total_padded_mem_size: MemoryDataSize =
360 MemoryDataSize::from_octets(chunk_padded_size as f64);
361
362 let chunks_count = max(min_chunks_count, (memory / total_padded_mem_size) as usize);
363
364 self.chunk_padded_size
365 .store(chunk_padded_size, Ordering::Relaxed);
366 self.chunk_usable_size
367 .store(chunk_usable_size, Ordering::Relaxed);
368 self.chunks_log_size
369 .store(chunks_log_size, Ordering::Relaxed);
370
371 let chunks_iter =
372 self.allocate_contiguous_chunk(chunks_count, &mut self.buffers_list.lock());
373
374 self.chunks.lock().extend(chunks_iter);
375
376 crate::log_info!(
377 "Allocator initialized: mem: {} chunks: {} log2: {}",
378 memory,
379 chunks_count,
380 chunks_log_size
381 );
382 }
383
384 pub fn giveback_free_memory(&self) {
385 #[cfg(not(target_os = "windows"))]
386 {
387 let pagesize = page_size::get();
388
389 let pages_per_chunk = self.chunk_padded_size.load(Ordering::Relaxed) / pagesize;
390
391 let chunks = self.chunks.lock();
392 for chunk_start in chunks.iter() {
393 unsafe {
394 libc::madvise(
395 *chunk_start as *mut libc::c_void,
396 pages_per_chunk * pagesize,
397 libc::MADV_DONTNEED,
398 );
399 }
400 }
401 }
402 }
403
404 pub fn giveback_all_memory(&self) {
405 MemoryFs::flush_to_disk(false);
406
407 loop {
408 {
409 if self.chunks.lock().len() == self.chunks_total_count.load(Ordering::Relaxed) {
411 break;
412 }
413 }
414 std::thread::sleep(Duration::from_millis(10));
415 }
416
417 #[cfg(not(target_os = "windows"))]
418 unsafe {
419 for (buffer, chunks_count) in self.buffers_list.lock().iter() {
420 libc::madvise(
421 *buffer as *mut libc::c_void,
422 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
423 libc::MADV_DONTNEED,
424 );
425 }
426 }
427 }
428
429 pub fn request_chunk(
430 &self,
431 #[cfg(feature = "track-usage")] usage: ChunkUsage_,
432 #[cfg(not(feature = "track-usage"))] _: (),
433 ) -> AllocatedChunk {
434 let mut tries_count = 0;
435 let mut chunks_lock = self.chunks.lock();
436
437 loop {
438 let el = chunks_lock.pop();
439 let free_count = chunks_lock.len();
440 drop(chunks_lock);
441
442 match el.map(|chunk| AllocatedChunk {
443 memory: chunk,
444 len: AtomicUsize::new(0),
445 max_len_log2: self.chunks_log_size.load(Ordering::Relaxed),
446 #[cfg(feature = "track-usage")]
447 _usage: usage.clone(),
448 dealloc_fn: |ptr, _size_log2| {
449 CHUNKS_ALLOCATOR.chunks.lock().push(ptr);
450 CHUNKS_ALLOCATOR.chunks_wait_condvar.notify_one();
451 },
452 }) {
453 None => {
454 if !MemoryFs::reduce_pressure() {
455 tries_count += 1;
456 }
457
458 chunks_lock = self.chunks.lock();
459 if chunks_lock.len() == 0 {
460 if !self
461 .chunks_wait_condvar
462 .wait_for(&mut chunks_lock, Duration::from_millis(25))
463 .timed_out()
464 {
465 tries_count = 0;
466 continue;
467 }
468 }
469
470 if tries_count > 10 {
471 #[cfg(feature = "track-usage")]
472 {
473 super::file::internal::MemoryFileInternal::debug_dump_files();
474 crate::log_info!(
475 "Usages: {:?}",
476 USAGE_MAP
477 .lock()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .filter(|x| *x.1 != 0)
482 .map(|x| format!("{:?}", x))
483 .collect::<Vec<_>>()
484 .join("\n")
485 )
486 }
487
488 let mut buffers_list = self.buffers_list.lock();
489 if chunks_lock.len() == 0 {
493 let alloc_multiplier = 1 << (buffers_list.len().saturating_sub(1));
494
495 let extra_chunks_count = (OUT_OF_MEMORY_ALLOCATION_SIZE.as_bytes()
496 * alloc_multiplier)
497 / self.chunk_usable_size.load(Ordering::Relaxed);
498 let chunks_iter = self
499 .allocate_contiguous_chunk(extra_chunks_count, &mut buffers_list);
500 chunks_lock.extend(chunks_iter);
501
502 crate::log_info!(
503 "Allocated {} extra chunks for temporary files ({})",
504 extra_chunks_count,
505 OUT_OF_MEMORY_ALLOCATION_SIZE * alloc_multiplier as f64
506 );
507 }
508 tries_count = 0;
510 }
511 }
512 Some(chunk) => {
513 self.min_free_chunks
514 .fetch_min(free_count, Ordering::Relaxed);
515 #[cfg(feature = "track-usage")]
516 {
517 *USAGE_MAP
518 .lock()
519 .as_mut()
520 .unwrap()
521 .entry(usage.clone())
522 .or_insert(0) += 1;
523 }
524
525 return chunk;
526 }
527 }
528 }
529 }
530
531 pub fn get_free_memory(&self) -> MemoryDataSize {
532 MemoryDataSize::from_octets(
533 (self.chunks.lock().len() * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
534 )
535 }
536
537 pub fn get_reserved_memory(&self) -> MemoryDataSize {
538 MemoryDataSize::from_octets(
539 ((self.chunks_total_count.load(Ordering::Relaxed)
540 - self.min_free_chunks.load(Ordering::Relaxed))
541 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
542 )
543 }
544
545 pub fn get_total_memory(&self) -> MemoryDataSize {
546 MemoryDataSize::from_octets(
547 (self.chunks_total_count.load(Ordering::Relaxed)
548 * self.chunk_usable_size.load(Ordering::Relaxed)) as f64,
549 )
550 }
551
552 pub fn deinitialize(&self) {
553 self.is_active.store(false, Ordering::Relaxed);
554 let mut chunks = self.chunks.lock();
555
556 let mut counter = 0;
557 while chunks.len() != self.chunks_total_count.load(Ordering::Relaxed) {
559 drop(chunks);
560 std::thread::sleep(Duration::from_millis(200));
561
562 counter += 1;
563 if counter % 256 == 0 {
564 crate::log_warn!("WARNING: Cannot flush all the data!");
565 }
566
567 chunks = self.chunks.lock();
568 }
569
570 FILES_FLUSH_HASH_MAP.lock().take();
571
572 {
573 chunks.clear();
574 self.chunks_total_count.swap(0, Ordering::Relaxed);
575
576 for (addr, chunks_count) in self.buffers_list.lock().drain(..) {
577 unsafe {
578 dealloc(
579 addr as *mut u8,
580 Layout::from_size_align_unchecked(
581 chunks_count * self.chunk_padded_size.load(Ordering::Relaxed),
582 ALLOCATOR_ALIGN,
583 ),
584 )
585 }
586 }
587 }
588 }
589}
590
591pub static CHUNKS_ALLOCATOR: ChunksAllocator = ChunksAllocator::new();
592
593#[cfg(test)]
594mod tests {
595 use crate::memory_data_size::MemoryDataSize;
596 use crate::memory_fs::allocator::CHUNKS_ALLOCATOR;
597 use rayon::prelude::*;
598
599 #[test]
600 fn allocate_memory() {
601 CHUNKS_ALLOCATOR.initialize(MemoryDataSize::from_gibioctets(8), 22, 0);
602 for _ in 0..5 {
603 let mut allocated_chunks: Vec<_> = std::iter::from_fn(move || {
604 Some(CHUNKS_ALLOCATOR.request_chunk(chunk_usage!(TemporarySpace)))
605 })
606 .take(1024 * 2)
607 .collect();
608
609 allocated_chunks.par_iter_mut().for_each(|x| {
610 x.zero_memory();
611 });
612 }
613 CHUNKS_ALLOCATOR.deinitialize();
614 }
615}