#[derive(Clone, Debug)]
pub struct ResourcePolicy {
pub max_single_materialization_bytes: usize,
pub max_operator_cache_bytes: usize,
pub max_spatial_distance_cache_bytes: usize,
pub max_owned_data_cache_bytes: usize,
pub row_chunk_target_bytes: usize,
pub derivative_storage_mode: DerivativeStorageMode,
}
pub const SPATIAL_DISTANCE_CACHE_MAX_BYTES: usize = 512 * 1024 * 1024;
pub const SPATIAL_DISTANCE_CACHE_SINGLE_ENTRY_MAX_BYTES: usize = 256 * 1024 * 1024;
pub const OWNED_DATA_CACHE_MAX_ENTRIES: usize = 2;
pub const STRICT_POLICY_NROWS_THRESHOLD: usize = 100_000;
pub const STRICT_POLICY_P_THRESHOLD: usize = 5_000;
#[derive(Clone, Copy, Debug, Default)]
pub struct ProblemHints {
pub marginal_slope_biobank_active: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DerivativeStorageMode {
AnalyticOperatorRequired,
MaterializeIfSmall,
DiagnosticsOnly,
}
#[derive(Clone, Debug)]
pub struct MaterializationPolicy {
pub max_single_dense_bytes: usize,
pub max_cached_dense_bytes: usize,
pub row_chunk_target_bytes: usize,
pub allow_operator_materialization: bool,
pub allow_diagnostic_materialization: bool,
}
#[derive(Debug, thiserror::Error)]
pub enum MatrixMaterializationError {
#[error(
"{context}: dense materialization of {nrows}x{ncols} requires {bytes} bytes (limit {limit_bytes})"
)]
TooLarge {
context: &'static str,
nrows: usize,
ncols: usize,
bytes: usize,
limit_bytes: usize,
},
#[error("{context}: operator does not implement chunked row access")]
MissingRowChunk { context: &'static str },
#[error("{context}: materialization forbidden by policy (mode={mode:?})")]
Forbidden {
context: &'static str,
mode: DerivativeStorageMode,
},
}
pub trait ResidentBytes {
fn resident_bytes(&self) -> usize;
}
impl ResourcePolicy {
pub fn default_library() -> Self {
Self {
max_single_materialization_bytes: 256 * 1024 * 1024, max_operator_cache_bytes: 1024 * 1024 * 1024, max_spatial_distance_cache_bytes: SPATIAL_DISTANCE_CACHE_MAX_BYTES,
max_owned_data_cache_bytes: 512 * 1024 * 1024, row_chunk_target_bytes: 8 * 1024 * 1024, derivative_storage_mode: DerivativeStorageMode::MaterializeIfSmall,
}
}
pub fn analytic_operator_required() -> Self {
Self {
derivative_storage_mode: DerivativeStorageMode::AnalyticOperatorRequired,
..Self::default_library()
}
}
pub fn for_problem(n_rows: usize, p_estimate: usize, hints: ProblemHints) -> Self {
let strict = n_rows >= STRICT_POLICY_NROWS_THRESHOLD
|| p_estimate >= STRICT_POLICY_P_THRESHOLD
|| hints.marginal_slope_biobank_active;
if strict {
Self::analytic_operator_required()
} else {
Self::default_library()
}
}
pub fn permissive_small_data() -> Self {
Self {
max_single_materialization_bytes: 2 * 1024 * 1024 * 1024, max_operator_cache_bytes: 2 * 1024 * 1024 * 1024,
max_spatial_distance_cache_bytes: SPATIAL_DISTANCE_CACHE_MAX_BYTES,
max_owned_data_cache_bytes: 512 * 1024 * 1024,
row_chunk_target_bytes: 64 * 1024 * 1024,
derivative_storage_mode: DerivativeStorageMode::MaterializeIfSmall,
}
}
pub fn material_policy(&self) -> MaterializationPolicy {
MaterializationPolicy {
max_single_dense_bytes: self.max_single_materialization_bytes,
max_cached_dense_bytes: self.max_operator_cache_bytes,
row_chunk_target_bytes: self.row_chunk_target_bytes,
allow_operator_materialization: matches!(
self.derivative_storage_mode,
DerivativeStorageMode::MaterializeIfSmall
),
allow_diagnostic_materialization: !matches!(
self.derivative_storage_mode,
DerivativeStorageMode::AnalyticOperatorRequired
),
}
}
}
pub fn rows_for_target_bytes(target_bytes: usize, cols: usize) -> usize {
let bytes_per_row = cols.saturating_mul(std::mem::size_of::<f64>()).max(1);
(target_bytes / bytes_per_row).max(1)
}
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::sync::{Arc, Mutex};
pub struct ByteLruCache<K: Eq + Hash + Clone, V> {
inner: Mutex<ByteLruInner<K, V>>,
max_bytes: usize,
max_entries: Option<usize>,
}
struct ByteLruInner<K, V> {
map: HashMap<K, (V, usize)>, order: VecDeque<K>,
resident_bytes: usize,
}
impl<K: Eq + Hash + Clone, V: Clone + ResidentBytes> ByteLruCache<K, V> {
pub fn new(max_bytes: usize) -> Self {
Self {
inner: Mutex::new(ByteLruInner {
map: HashMap::new(),
order: VecDeque::new(),
resident_bytes: 0,
}),
max_bytes,
max_entries: None,
}
}
pub fn with_max_entries(max_bytes: usize, max_entries: usize) -> Self {
Self {
inner: Mutex::new(ByteLruInner {
map: HashMap::new(),
order: VecDeque::new(),
resident_bytes: 0,
}),
max_bytes,
max_entries: Some(max_entries),
}
}
pub fn get(&self, key: &K) -> Option<V> {
let mut g = self.inner.lock().unwrap();
let v = g.map.get(key)?.0.clone();
if let Some(pos) = g.order.iter().position(|k| k == key) {
let k = g.order.remove(pos).unwrap();
g.order.push_back(k);
}
Some(v)
}
pub fn insert(&self, key: K, value: V) {
let charge = value.resident_bytes();
let mut g = self.inner.lock().unwrap();
if let Some((_old, old_charge)) = g.map.remove(&key) {
g.resident_bytes = g.resident_bytes.saturating_sub(old_charge);
if let Some(pos) = g.order.iter().position(|k| k == &key) {
g.order.remove(pos);
}
}
if charge > self.max_bytes {
return;
}
if let Some(max_entries) = self.max_entries {
if max_entries == 0 {
return;
}
while g.map.len() >= max_entries {
if let Some(evict_key) = g.order.pop_front() {
if let Some((_v, c)) = g.map.remove(&evict_key) {
g.resident_bytes = g.resident_bytes.saturating_sub(c);
}
} else {
break;
}
}
}
while g.resident_bytes + charge > self.max_bytes {
if let Some(evict_key) = g.order.pop_front() {
if let Some((_v, c)) = g.map.remove(&evict_key) {
g.resident_bytes = g.resident_bytes.saturating_sub(c);
}
} else {
break;
}
}
g.map.insert(key.clone(), (value, charge));
g.order.push_back(key);
g.resident_bytes = g.resident_bytes.saturating_add(charge);
}
pub fn resident_bytes(&self) -> usize {
self.inner.lock().unwrap().resident_bytes
}
pub fn max_bytes(&self) -> usize {
self.max_bytes
}
pub fn len(&self) -> usize {
self.inner.lock().unwrap().map.len()
}
pub fn clear(&self) {
let mut g = self.inner.lock().unwrap();
g.map.clear();
g.order.clear();
g.resident_bytes = 0;
}
}
impl<K: Eq + Hash + Clone, V: Clone + ResidentBytes> std::fmt::Debug for ByteLruCache<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ByteLruCache")
.field("resident_bytes", &self.resident_bytes())
.field("max_bytes", &self.max_bytes)
.field("max_entries", &self.max_entries)
.finish()
}
}
impl ResidentBytes for Arc<ndarray::Array2<f64>> {
fn resident_bytes(&self) -> usize {
std::mem::size_of::<f64>()
.saturating_mul(self.nrows())
.saturating_mul(self.ncols())
}
}
pub struct RayonSafeOnce<T> {
slot: std::sync::OnceLock<T>,
}
impl<T> RayonSafeOnce<T> {
pub const fn new() -> Self {
Self {
slot: std::sync::OnceLock::new(),
}
}
#[inline]
pub fn get(&self) -> Option<&T> {
self.slot.get()
}
pub fn get_or_init<F>(&self, init: F) -> &T
where
F: FnOnce() -> T,
{
if let Some(v) = self.slot.get() {
return v;
}
let candidate = init();
let _ = self.slot.set(candidate);
self.slot
.get()
.expect("RayonSafeOnce slot populated by set() above")
}
pub fn get_or_try_init<F, E>(&self, init: F) -> Result<&T, E>
where
F: FnOnce() -> Result<T, E>,
{
if let Some(v) = self.slot.get() {
return Ok(v);
}
let candidate = init()?;
let _ = self.slot.set(candidate);
Ok(self
.slot
.get()
.expect("RayonSafeOnce slot populated by set() above"))
}
}
impl<T> Default for RayonSafeOnce<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone> Clone for RayonSafeOnce<T> {
fn clone(&self) -> Self {
let cloned = Self::new();
if let Some(value) = self.slot.get() {
let _ = cloned.slot.set(value.clone());
}
cloned
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for RayonSafeOnce<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RayonSafeOnce")
.field("slot", &self.slot.get())
.finish()
}
}