use scirs2_core::ndarray::{Array2, ArrayViewMut1, ArrayViewMut2};
use std::alloc::{GlobalAlloc, Layout, System};
use std::collections::VecDeque;
use std::ptr::NonNull;
use std::sync::Mutex;
#[cfg(any(target_os = "linux", target_os = "android"))]
use libc;
#[cfg(target_os = "linux")]
use std::fs;
use std::sync::atomic::Ordering;
#[cfg(test)]
use num_cpus;
#[cfg(not(test))]
mod num_cpus {
pub fn get() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
}
}
#[derive(Debug, Clone)]
pub struct MemoryPoolConfig {
pub max_pool_size: usize,
pub cache_line_size: usize,
pub numa_aware: bool,
pub prefetch_distance: usize,
pub arena_block_size: usize,
pub numa_node_hint: i32,
pub auto_numa_discovery: bool,
pub enable_thread_affinity: bool,
pub enable_memory_warming: bool,
pub large_object_threshold: usize,
pub max_memory_usage: usize,
}
impl Default for MemoryPoolConfig {
fn default() -> Self {
Self {
max_pool_size: 1000,
cache_line_size: 64,
numa_aware: true,
prefetch_distance: 8,
arena_block_size: 1024 * 1024, numa_node_hint: -1, auto_numa_discovery: true,
enable_thread_affinity: true,
enable_memory_warming: true,
large_object_threshold: 64 * 1024, max_memory_usage: 1024 * 1024 * 1024, }
}
}
pub struct DistancePool {
config: MemoryPoolConfig,
distance_buffers: Mutex<VecDeque<Box<[f64]>>>,
index_buffers: Mutex<VecDeque<Box<[usize]>>>,
matrix_buffers: Mutex<VecDeque<Array2<f64>>>,
large_buffers: Mutex<VecDeque<Box<[f64]>>>, stats: PoolStatistics,
memory_usage: std::sync::atomic::AtomicUsize, numa_node: std::sync::atomic::AtomicI32, }
impl DistancePool {
pub fn new(capacity: usize) -> Self {
Self::with_config(capacity, MemoryPoolConfig::default())
}
pub fn with_config(capacity: usize, config: MemoryPoolConfig) -> Self {
let numa_node = if config.numa_aware && config.numa_node_hint >= 0 {
config.numa_node_hint
} else {
Self::detect_numa_node()
};
Self {
config,
distance_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
index_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
matrix_buffers: Mutex::new(VecDeque::with_capacity(capacity / 4)), large_buffers: Mutex::new(VecDeque::with_capacity(capacity / 10)), stats: PoolStatistics::new(),
memory_usage: std::sync::atomic::AtomicUsize::new(0),
numa_node: std::sync::atomic::AtomicI32::new(numa_node),
}
}
pub fn get_distance_buffer(&self, size: usize) -> DistanceBuffer {
let buffer_size_bytes = size * std::mem::size_of::<f64>();
let is_large = buffer_size_bytes > self.config.large_object_threshold;
let current_usage = self.memory_usage.load(std::sync::atomic::Ordering::Relaxed);
if current_usage + buffer_size_bytes > self.config.max_memory_usage {
self.cleanup_excess_memory();
}
let buffer = if is_large {
self.get_large_buffer(size)
} else {
let mut buffers = self.distance_buffers.lock().expect("Operation failed");
for i in 0..buffers.len() {
if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
let buffer = buffers.remove(i).expect("Operation failed");
self.stats.record_hit();
return DistanceBuffer::new(buffer, self);
}
}
self.stats.record_miss();
self.create_aligned_buffer(size)
};
self.memory_usage
.fetch_add(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
DistanceBuffer::new(buffer, self)
}
fn get_large_buffer(&self, size: usize) -> Box<[f64]> {
let mut buffers = self.large_buffers.lock().expect("Operation failed");
for i in 0..buffers.len() {
if buffers[i].len() == size {
let buffer = buffers.remove(i).expect("Operation failed");
self.stats.record_hit();
return buffer;
}
}
self.stats.record_miss();
if self.config.numa_aware {
self.create_numa_aligned_buffer(size)
} else {
self.create_aligned_buffer(size)
}
}
pub fn get_index_buffer(&self, size: usize) -> IndexBuffer {
let mut buffers = self.index_buffers.lock().expect("Operation failed");
for i in 0..buffers.len() {
if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
let buffer = buffers.remove(i).expect("Operation failed");
self.stats.record_hit();
return IndexBuffer::new(buffer, self);
}
}
self.stats.record_miss();
let new_buffer = vec![0usize; size].into_boxed_slice();
IndexBuffer::new(new_buffer, self)
}
pub fn get_matrix_buffer(&self, rows: usize, cols: usize) -> MatrixBuffer {
let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
for i in 0..buffers.len() {
let (r, c) = buffers[i].dim();
if r >= rows && c >= cols && r <= rows * 2 && c <= cols * 2 {
let mut matrix = buffers.remove(i).expect("Operation failed");
matrix = matrix.slice_mut(s![..rows, ..cols]).to_owned();
self.stats.record_hit();
return MatrixBuffer::new(matrix, self);
}
}
self.stats.record_miss();
let matrix = Array2::zeros((rows, cols));
MatrixBuffer::new(matrix, self)
}
fn create_aligned_buffer(&self, size: usize) -> Box<[f64]> {
let layout = Layout::from_size_align(
size * std::mem::size_of::<f64>(),
self.config.cache_line_size,
)
.expect("Operation failed");
unsafe {
let ptr = System.alloc(layout) as *mut f64;
if ptr.is_null() {
panic!("Failed to allocate aligned memory");
}
if self.config.enable_memory_warming {
std::ptr::write_bytes(ptr, 0, size);
}
Box::from_raw(std::ptr::slice_from_raw_parts_mut(ptr, size))
}
}
fn create_numa_aligned_buffer(&self, size: usize) -> Box<[f64]> {
let numa_node = self.numa_node.load(Ordering::Relaxed);
#[cfg(target_os = "linux")]
{
if self.config.numa_aware && numa_node >= 0 {
match Self::allocate_on_numa_node_linux(size, numa_node as u32) {
Ok(buffer) => {
if self.config.enable_memory_warming {
Self::warm_memory(&buffer);
}
return buffer;
}
Err(_) => {
}
}
}
}
#[cfg(target_os = "windows")]
{
if self.config.numa_aware && numa_node >= 0 {
match Self::allocate_on_numa_node_windows(size, numa_node as u32) {
Ok(buffer) => {
if self.config.enable_memory_warming {
Self::warm_memory(&buffer);
}
return buffer;
}
Err(_) => {
}
}
}
}
let buffer = self.create_aligned_buffer(size);
if self.config.enable_memory_warming {
Self::warm_memory(&buffer);
}
buffer
}
#[cfg(target_os = "linux")]
fn allocate_on_numa_node_linux(
size: usize,
node: u32,
) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
let total_size = size * std::mem::size_of::<f64>();
let layout = Layout::from_size_align(total_size, 64)?;
unsafe {
let ptr = System.alloc(layout) as *mut f64;
if ptr.is_null() {
return Err("Failed to allocate memory".into());
}
std::ptr::write_bytes(ptr, 0, size);
Ok(Box::from_raw(std::ptr::slice_from_raw_parts_mut(ptr, size)))
}
}
#[cfg(target_os = "windows")]
fn allocate_on_numa_node_windows(
size: usize,
node: u32,
) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
Err("Windows NUMA allocation not implemented".into())
}
pub fn bind_thread_to_numa_node(node: u32) -> Result<(), Box<dyn std::error::Error>> {
#[cfg(target_os = "linux")]
{
Self::bind_thread_to_numa_node_linux(node)
}
#[cfg(target_os = "windows")]
{
Self::bind_thread_to_numa_node_windows(node)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
Ok(()) }
}
#[cfg(target_os = "linux")]
fn bind_thread_to_numa_node_linux(node: u32) -> Result<(), Box<dyn std::error::Error>> {
if let Some(_cpu_count) = Self::get_node_cpu_count(node) {
let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", node);
if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
for range in cpulist.trim().split(',') {
if let Some((start, end)) = range.split_once('-') {
if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
for cpu in s..=e {
unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
}
}
} else if let Ok(cpu) = range.parse::<u32>() {
unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
}
}
unsafe {
libc::sched_setaffinity(
0, std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
}
}
}
Ok(())
}
#[cfg(target_os = "windows")]
fn bind_thread_to_numa_node_windows(node: u32) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
fn warm_memory(buffer: &[f64]) {
if buffer.is_empty() {
return;
}
let page_size = 4096; let elements_per_page = page_size / std::mem::size_of::<f64>();
for i in (0..buffer.len()).step_by(elements_per_page) {
unsafe {
std::ptr::read_volatile(&buffer[i]);
}
}
}
fn detect_numa_node() -> i32 {
#[cfg(target_os = "linux")]
{
Self::detect_numa_node_linux().unwrap_or(0)
}
#[cfg(target_os = "windows")]
{
Self::detect_numa_node_windows().unwrap_or(0)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
0 }
}
#[cfg(target_os = "linux")]
fn detect_numa_node_linux() -> Option<i32> {
let _tid = unsafe { libc::gettid() };
match Self::get_current_numa_node_linux() {
Ok(node) => Some(node),
Err(_) => {
Self::detect_numa_from_cpu_linux()
}
}
}
#[cfg(target_os = "linux")]
fn get_current_numa_node_linux() -> Result<i32, Box<dyn std::error::Error>> {
let mut cpu: u32 = 0;
let mut node: u32 = 0;
let result = unsafe {
libc::syscall(
libc::SYS_getcpu,
&mut cpu as *mut u32,
&mut node as *mut u32,
std::ptr::null_mut::<libc::c_void>(),
)
};
if result == 0 {
Ok(node as i32)
} else {
Err("getcpu syscall failed".into())
}
}
#[cfg(target_os = "linux")]
fn detect_numa_from_cpu_linux() -> Option<i32> {
if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
for entry in entries.flatten() {
let name = entry.file_name();
if let Some(name_str) = name.to_str() {
if let Some(stripped) = name_str.strip_prefix("node") {
if let Ok(node_num) = stripped.parse::<i32>() {
return Some(node_num);
}
}
}
}
}
None
}
#[cfg(target_os = "windows")]
fn detect_numa_node_windows() -> Option<i32> {
Some(0)
}
pub fn get_numa_topology() -> NumaTopology {
#[cfg(target_os = "linux")]
{
Self::get_numa_topology_linux()
}
#[cfg(target_os = "windows")]
{
Self::get_numa_topology_windows()
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
NumaTopology::default()
}
}
#[cfg(target_os = "linux")]
fn get_numa_topology_linux() -> NumaTopology {
let mut topology = NumaTopology::default();
if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
for entry in entries.flatten() {
let name = entry.file_name();
if let Some(name_str) = name.to_str() {
if let Some(stripped) = name_str.strip_prefix("node") {
if let Ok(_nodeid) = stripped.parse::<u32>() {
let meminfo_path =
format!("/sys/devices/system/node/{name_str}/meminfo");
if let Ok(meminfo) = fs::read_to_string(&meminfo_path) {
if let Some(total_kb) = Self::parse_meminfo_total(&meminfo) {
topology.nodes.push(NumaNode {
id: _nodeid,
total_memory_bytes: total_kb * 1024,
available_memory_bytes: total_kb * 1024, cpu_count: Self::get_node_cpu_count(_nodeid).unwrap_or(1),
});
}
}
}
}
}
}
}
if topology.nodes.is_empty() {
topology.nodes.push(NumaNode {
id: 0,
total_memory_bytes: Self::get_total_system_memory()
.unwrap_or(8 * 1024 * 1024 * 1024), available_memory_bytes: Self::get_available_system_memory()
.unwrap_or(4 * 1024 * 1024 * 1024), cpu_count: num_cpus::get() as u32,
});
}
topology
}
#[cfg(target_os = "linux")]
fn parse_meminfo_total(meminfo: &str) -> Option<u64> {
for line in meminfo.lines() {
if line.starts_with("Node") && line.contains("MemTotal:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
return parts[2].parse().ok();
}
}
}
None
}
#[cfg(target_os = "linux")]
fn get_node_cpu_count(_nodeid: u32) -> Option<u32> {
let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _nodeid);
if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
let mut count = 0;
for range in cpulist.trim().split(',') {
if let Some((start, end)) = range.split_once('-') {
if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
count += e - s + 1;
}
} else if range.parse::<u32>().is_ok() {
count += 1;
}
}
Some(count)
} else {
None
}
}
#[cfg(target_os = "linux")]
fn get_total_system_memory() -> Option<u64> {
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if line.starts_with("MemTotal:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
}
}
}
}
None
}
#[cfg(target_os = "linux")]
fn get_available_system_memory() -> Option<u64> {
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if line.starts_with("MemAvailable:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
}
}
}
}
None
}
#[cfg(target_os = "windows")]
fn get_numa_topology_windows() -> NumaTopology {
NumaTopology::default()
}
fn cleanup_excess_memory(&self) {
let cleanup_ratio = 0.25;
{
let mut buffers = self.distance_buffers.lock().expect("Operation failed");
let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
for _ in 0..cleanup_count {
if let Some(buffer) = buffers.pop_back() {
let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
self.memory_usage
.fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
}
}
}
{
let mut buffers = self.large_buffers.lock().expect("Operation failed");
let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
for _ in 0..cleanup_count {
if let Some(buffer) = buffers.pop_back() {
let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
self.memory_usage
.fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
fn return_distance_buffer(&self, buffer: Box<[f64]>) {
let buffer_size_bytes = buffer.len() * std::mem::size_of::<f64>();
let is_large = buffer_size_bytes > self.config.large_object_threshold;
self.memory_usage
.fetch_sub(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
if is_large {
let mut buffers = self.large_buffers.lock().expect("Operation failed");
if buffers.len() < self.config.max_pool_size / 10 {
buffers.push_back(buffer);
}
} else {
let mut buffers = self.distance_buffers.lock().expect("Operation failed");
if buffers.len() < self.config.max_pool_size {
buffers.push_back(buffer);
}
}
}
fn return_index_buffer(&self, buffer: Box<[usize]>) {
let mut buffers = self.index_buffers.lock().expect("Operation failed");
if buffers.len() < self.config.max_pool_size {
buffers.push_back(buffer);
}
}
fn return_matrix_buffer(&self, matrix: Array2<f64>) {
let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
if buffers.len() < self.config.max_pool_size / 4 {
buffers.push_back(matrix);
}
}
pub fn statistics(&self) -> PoolStatistics {
self.stats.clone()
}
pub fn memory_usage(&self) -> usize {
self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn current_numa_node(&self) -> i32 {
self.numa_node.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn pool_info(&self) -> PoolInfo {
let distance_count = self
.distance_buffers
.lock()
.expect("Operation failed")
.len();
let index_count = self.index_buffers.lock().expect("Operation failed").len();
let matrix_count = self.matrix_buffers.lock().expect("Operation failed").len();
let large_count = self.large_buffers.lock().expect("Operation failed").len();
PoolInfo {
distance_buffer_count: distance_count,
index_buffer_count: index_count,
matrix_buffer_count: matrix_count,
large_buffer_count: large_count,
total_memory_usage: self.memory_usage(),
numa_node: self.current_numa_node(),
hit_rate: self.stats.hit_rate(),
}
}
pub fn clear(&self) {
self.distance_buffers
.lock()
.expect("Operation failed")
.clear();
self.index_buffers.lock().expect("Operation failed").clear();
self.matrix_buffers
.lock()
.expect("Operation failed")
.clear();
self.large_buffers.lock().expect("Operation failed").clear();
self.memory_usage
.store(0, std::sync::atomic::Ordering::Relaxed);
self.stats.reset();
}
}
use scirs2_core::ndarray::s;
pub struct DistanceBuffer<'a> {
buffer: Option<Box<[f64]>>,
pool: &'a DistancePool,
}
impl<'a> DistanceBuffer<'a> {
fn new(buffer: Box<[f64]>, pool: &'a DistancePool) -> Self {
Self {
buffer: Some(buffer),
pool,
}
}
pub fn as_mut_slice(&mut self) -> &mut [f64] {
self.buffer.as_mut().expect("Operation failed").as_mut()
}
pub fn as_slice(&self) -> &[f64] {
self.buffer.as_ref().expect("Operation failed").as_ref()
}
pub fn len(&self) -> usize {
self.buffer.as_ref().expect("Operation failed").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_array_mut(&mut self) -> ArrayViewMut1<f64> {
ArrayViewMut1::from(self.as_mut_slice())
}
}
impl Drop for DistanceBuffer<'_> {
fn drop(&mut self) {
if let Some(buffer) = self.buffer.take() {
self.pool.return_distance_buffer(buffer);
}
}
}
pub struct IndexBuffer<'a> {
buffer: Option<Box<[usize]>>,
pool: &'a DistancePool,
}
impl<'a> IndexBuffer<'a> {
fn new(buffer: Box<[usize]>, pool: &'a DistancePool) -> Self {
Self {
buffer: Some(buffer),
pool,
}
}
pub fn as_mut_slice(&mut self) -> &mut [usize] {
self.buffer.as_mut().expect("Operation failed").as_mut()
}
pub fn as_slice(&self) -> &[usize] {
self.buffer.as_ref().expect("Operation failed").as_ref()
}
pub fn len(&self) -> usize {
self.buffer.as_ref().expect("Operation failed").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Drop for IndexBuffer<'_> {
fn drop(&mut self) {
if let Some(buffer) = self.buffer.take() {
self.pool.return_index_buffer(buffer);
}
}
}
pub struct MatrixBuffer<'a> {
matrix: Option<Array2<f64>>,
pool: &'a DistancePool,
}
impl<'a> MatrixBuffer<'a> {
fn new(matrix: Array2<f64>, pool: &'a DistancePool) -> Self {
Self {
matrix: Some(matrix),
pool,
}
}
pub fn as_mut(&mut self) -> ArrayViewMut2<f64> {
self.matrix.as_mut().expect("Operation failed").view_mut()
}
pub fn dim(&mut self) -> (usize, usize) {
self.matrix.as_ref().expect("Operation failed").dim()
}
pub fn fill(&mut self, value: f64) {
self.matrix.as_mut().expect("Operation failed").fill(value);
}
}
impl Drop for MatrixBuffer<'_> {
fn drop(&mut self) {
if let Some(matrix) = self.matrix.take() {
self.pool.return_matrix_buffer(matrix);
}
}
}
pub struct ClusteringArena {
config: MemoryPoolConfig,
current_block: Mutex<Option<ArenaBlock>>,
full_blocks: Mutex<Vec<ArenaBlock>>,
stats: ArenaStatistics,
}
impl ClusteringArena {
pub fn new() -> Self {
Self::with_config(MemoryPoolConfig::default())
}
pub fn with_config(config: MemoryPoolConfig) -> Self {
Self {
config,
current_block: Mutex::new(None),
full_blocks: Mutex::new(Vec::new()),
stats: ArenaStatistics::new(),
}
}
pub fn alloc_temp_vec<T: Default + Clone>(&self, size: usize) -> ArenaVec<T> {
let layout = Layout::array::<T>(size).expect("Operation failed");
let ptr = self.allocate_raw(layout);
unsafe {
for i in 0..size {
std::ptr::write(ptr.as_ptr().add(i) as *mut T, T::default());
}
ArenaVec::new(ptr.as_ptr() as *mut T, size)
}
}
fn allocate_raw(&self, layout: Layout) -> NonNull<u8> {
let mut current = self.current_block.lock().expect("Operation failed");
if current.is_none()
|| !current
.as_ref()
.expect("Operation failed")
.can_allocate(layout)
{
if let Some(old_block) = current.take() {
self.full_blocks
.lock()
.expect("Operation failed")
.push(old_block);
}
*current = Some(ArenaBlock::new(self.config.arena_block_size));
}
current.as_mut().expect("Operation failed").allocate(layout)
}
pub fn reset(&self) {
let mut current = self.current_block.lock().expect("Operation failed");
let mut full_blocks = self.full_blocks.lock().expect("Operation failed");
if let Some(block) = current.take() {
full_blocks.push(block);
}
for block in full_blocks.iter_mut() {
block.reset();
}
if let Some(block) = full_blocks.pop() {
*current = Some(block);
}
self.stats.reset();
}
pub fn statistics(&self) -> ArenaStatistics {
self.stats.clone()
}
}
impl Default for ClusteringArena {
fn default() -> Self {
Self::new()
}
}
struct ArenaBlock {
memory: NonNull<u8>,
size: usize,
offset: usize,
}
unsafe impl Send for ArenaBlock {}
unsafe impl Sync for ArenaBlock {}
impl ArenaBlock {
fn new(size: usize) -> Self {
let layout = Layout::from_size_align(size, 64).expect("Operation failed"); let memory =
unsafe { NonNull::new(System.alloc(layout)).expect("Failed to allocate arena block") };
Self {
memory,
size,
offset: 0,
}
}
fn can_allocate(&self, layout: Layout) -> bool {
let aligned_offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
aligned_offset + layout.size() <= self.size
}
fn allocate(&mut self, layout: Layout) -> NonNull<u8> {
assert!(self.can_allocate(layout));
self.offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
let ptr = unsafe { NonNull::new_unchecked(self.memory.as_ptr().add(self.offset)) };
self.offset += layout.size();
ptr
}
fn reset(&mut self) {
self.offset = 0;
}
}
impl Drop for ArenaBlock {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.size, 64).expect("Operation failed");
unsafe {
System.dealloc(self.memory.as_ptr(), layout);
}
}
}
pub struct ArenaVec<T> {
ptr: *mut T,
len: usize,
phantom: std::marker::PhantomData<T>,
}
impl<T> ArenaVec<T> {
fn new(ptr: *mut T, len: usize) -> Self {
Self {
ptr,
len,
phantom: std::marker::PhantomData,
}
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
pub fn as_slice(&mut self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
pub fn len(&mut self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
#[derive(Debug, Clone)]
pub struct PoolInfo {
pub distance_buffer_count: usize,
pub index_buffer_count: usize,
pub matrix_buffer_count: usize,
pub large_buffer_count: usize,
pub total_memory_usage: usize,
pub numa_node: i32,
pub hit_rate: f64,
}
#[derive(Debug)]
pub struct PoolStatistics {
hits: std::sync::atomic::AtomicUsize,
misses: std::sync::atomic::AtomicUsize,
total_allocations: std::sync::atomic::AtomicUsize,
}
impl PoolStatistics {
fn new() -> Self {
Self {
hits: std::sync::atomic::AtomicUsize::new(0),
misses: std::sync::atomic::AtomicUsize::new(0),
total_allocations: std::sync::atomic::AtomicUsize::new(0),
}
}
fn record_hit(&self) {
self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn record_miss(&self) {
self.misses
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.total_allocations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
fn reset(&self) {
self.hits.store(0, std::sync::atomic::Ordering::Relaxed);
self.misses.store(0, std::sync::atomic::Ordering::Relaxed);
self.total_allocations
.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub fn hit_rate(&self) -> f64 {
let hits = self.hits.load(std::sync::atomic::Ordering::Relaxed);
let total = hits + self.misses.load(std::sync::atomic::Ordering::Relaxed);
if total == 0 {
0.0
} else {
hits as f64 / total as f64 * 100.0
}
}
pub fn total_requests(&self) -> usize {
self.hits.load(std::sync::atomic::Ordering::Relaxed)
+ self.misses.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn total_allocations(&self) -> usize {
self.total_allocations
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Clone for PoolStatistics {
fn clone(&self) -> Self {
Self {
hits: std::sync::atomic::AtomicUsize::new(
self.hits.load(std::sync::atomic::Ordering::Relaxed),
),
misses: std::sync::atomic::AtomicUsize::new(
self.misses.load(std::sync::atomic::Ordering::Relaxed),
),
total_allocations: std::sync::atomic::AtomicUsize::new(
self.total_allocations
.load(std::sync::atomic::Ordering::Relaxed),
),
}
}
}
#[derive(Debug)]
pub struct ArenaStatistics {
blocks_allocated: std::sync::atomic::AtomicUsize,
total_memory: std::sync::atomic::AtomicUsize,
active_objects: std::sync::atomic::AtomicUsize,
}
impl ArenaStatistics {
fn new() -> Self {
Self {
blocks_allocated: std::sync::atomic::AtomicUsize::new(0),
total_memory: std::sync::atomic::AtomicUsize::new(0),
active_objects: std::sync::atomic::AtomicUsize::new(0),
}
}
fn reset(&self) {
self.blocks_allocated
.store(0, std::sync::atomic::Ordering::Relaxed);
self.total_memory
.store(0, std::sync::atomic::Ordering::Relaxed);
self.active_objects
.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub fn blocks_allocated(&self) -> usize {
self.blocks_allocated
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn total_memory(&self) -> usize {
self.total_memory.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn active_objects(&self) -> usize {
self.active_objects
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Clone for ArenaStatistics {
fn clone(&self) -> Self {
Self {
blocks_allocated: std::sync::atomic::AtomicUsize::new(
self.blocks_allocated
.load(std::sync::atomic::Ordering::Relaxed),
),
total_memory: std::sync::atomic::AtomicUsize::new(
self.total_memory.load(std::sync::atomic::Ordering::Relaxed),
),
active_objects: std::sync::atomic::AtomicUsize::new(
self.active_objects
.load(std::sync::atomic::Ordering::Relaxed),
),
}
}
}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub nodes: Vec<NumaNode>,
}
#[derive(Debug, Clone)]
pub struct NumaNode {
pub id: u32,
pub total_memory_bytes: u64,
pub available_memory_bytes: u64,
pub cpu_count: u32,
}
impl Default for NumaTopology {
fn default() -> Self {
Self {
nodes: vec![NumaNode {
id: 0,
total_memory_bytes: 8 * 1024 * 1024 * 1024, available_memory_bytes: 4 * 1024 * 1024 * 1024, cpu_count: 4, }],
}
}
}
impl NumaTopology {
pub fn get_optimal_node(&self) -> u32 {
if !self.nodes.is_empty() {
self.nodes[0].id
} else {
0
}
}
pub fn get_node_with_most_memory(&self) -> Option<u32> {
self.nodes
.iter()
.max_by_key(|node| node.available_memory_bytes)
.map(|node| node.id)
}
pub fn total_system_memory(&self) -> u64 {
self.nodes.iter().map(|node| node.total_memory_bytes).sum()
}
pub fn total_available_memory(&self) -> u64 {
self.nodes
.iter()
.map(|node| node.available_memory_bytes)
.sum()
}
pub fn has_node(&self, _nodeid: u32) -> bool {
self.nodes.iter().any(|node| node.id == _nodeid)
}
pub fn get_node_info(&self, _nodeid: u32) -> Option<&NumaNode> {
self.nodes.iter().find(|node| node.id == _nodeid)
}
}
static GLOBAL_DISTANCE_POOL: std::sync::OnceLock<DistancePool> = std::sync::OnceLock::new();
static GLOBAL_CLUSTERING_ARENA: std::sync::OnceLock<ClusteringArena> = std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn global_distance_pool() -> &'static DistancePool {
GLOBAL_DISTANCE_POOL.get_or_init(|| DistancePool::new(1000))
}
#[allow(dead_code)]
pub fn global_clustering_arena() -> &'static ClusteringArena {
GLOBAL_CLUSTERING_ARENA.get_or_init(ClusteringArena::new)
}
#[allow(dead_code)]
pub fn create_numa_optimized_pool(capacity: usize) -> DistancePool {
let config = MemoryPoolConfig {
numa_aware: true,
auto_numa_discovery: true,
enable_thread_affinity: true,
..Default::default()
};
DistancePool::with_config(capacity, config)
}
#[allow(dead_code)]
pub fn get_numa_topology() -> NumaTopology {
DistancePool::get_numa_topology()
}
#[allow(dead_code)]
pub fn test_numa_capabilities() -> NumaCapabilities {
NumaCapabilities::detect()
}
#[derive(Debug, Clone)]
pub struct NumaCapabilities {
pub numa_available: bool,
pub num_nodes: u32,
pub memory_binding_supported: bool,
pub thread_affinity_supported: bool,
pub platform_details: String,
}
impl NumaCapabilities {
pub fn detect() -> Self {
#[cfg(target_os = "linux")]
{
Self::detect_linux()
}
#[cfg(target_os = "windows")]
{
Self::detect_windows()
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
Self {
numa_available: false,
num_nodes: 1,
memory_binding_supported: false,
thread_affinity_supported: false,
platform_details: "Unsupported platform".to_string(),
}
}
}
#[cfg(target_os = "linux")]
fn detect_linux() -> Self {
let numa_available = std::path::Path::new("/sys/devices/system/node").exists();
let num_nodes = if numa_available {
DistancePool::get_numa_topology().nodes.len() as u32
} else {
1
};
Self {
numa_available,
num_nodes,
memory_binding_supported: numa_available,
thread_affinity_supported: true, platform_details: format!("Linux with {num_nodes} NUMA nodes"),
}
}
#[cfg(target_os = "windows")]
fn detect_windows() -> Self {
Self {
numa_available: true, num_nodes: 1, memory_binding_supported: true,
thread_affinity_supported: true,
platform_details: "Windows NUMA support".to_string(),
}
}
pub fn should_enable_numa(&self) -> bool {
self.numa_available && self.num_nodes > 1
}
pub fn recommended_memory_strategy(&self) -> &'static str {
if self.should_enable_numa() {
"NUMA-aware"
} else {
"Standard"
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_distance_pool() {
let pool = DistancePool::new(10);
let mut buffer1 = pool.get_distance_buffer(100);
assert_eq!(buffer1.len(), 100);
buffer1.as_mut_slice()[0] = 42.0;
assert_eq!(buffer1.as_slice()[0], 42.0);
let buffer2 = pool.get_distance_buffer(50);
assert_eq!(buffer2.len(), 50);
drop(buffer1);
let buffer3 = pool.get_distance_buffer(100);
assert_eq!(buffer3.len(), 100);
}
#[test]
fn test_arena_allocator() {
let arena = ClusteringArena::new();
let mut vec1 = arena.alloc_temp_vec::<f64>(100);
let mut vec2 = arena.alloc_temp_vec::<usize>(50);
vec1.as_mut_slice()[0] = std::f64::consts::PI;
vec2.as_mut_slice()[0] = 42;
assert_eq!(vec1.as_slice()[0], std::f64::consts::PI);
assert_eq!(vec2.as_slice()[0], 42);
arena.reset();
let mut vec3 = arena.alloc_temp_vec::<f64>(200);
vec3.as_mut_slice()[0] = 2.71;
assert_eq!(vec3.as_slice()[0], 2.71);
}
#[test]
fn test_pool_statistics() {
let pool = DistancePool::new(2);
let stats = pool.statistics();
assert_eq!(stats.total_requests(), 0);
assert_eq!(stats.total_allocations(), 0);
let _buffer1 = pool.get_distance_buffer(100);
let stats = pool.statistics();
assert_eq!(stats.total_requests(), 1);
assert_eq!(stats.total_allocations(), 1);
assert!(stats.hit_rate() < 1.0);
drop(_buffer1);
let _buffer2 = pool.get_distance_buffer(100);
let stats = pool.statistics();
assert_eq!(stats.total_requests(), 2);
assert_eq!(stats.total_allocations(), 1); assert!(stats.hit_rate() > 0.0);
}
#[test]
fn test_matrix_buffer() {
let pool = DistancePool::new(5);
let mut matrix = pool.get_matrix_buffer(10, 10);
assert_eq!(matrix.dim(), (10, 10));
matrix.fill(42.0);
drop(matrix);
let mut matrix2 = pool.get_matrix_buffer(8, 8);
assert_eq!(matrix2.dim(), (8, 8));
}
#[test]
fn test_global_pools() {
let pool = global_distance_pool();
let arena = global_clustering_arena();
let buffer = pool.get_distance_buffer(10);
let _vec = arena.alloc_temp_vec::<f64>(10);
}
}