1#[derive(Clone, Debug)]
2pub struct ResourcePolicy {
3 pub max_single_materialization_bytes: usize,
4 pub max_operator_cache_bytes: usize,
5 pub max_spatial_distance_cache_bytes: usize,
6 pub max_owned_data_cache_bytes: usize,
7 pub row_chunk_target_bytes: usize,
8 pub derivative_storage_mode: DerivativeStorageMode,
9}
10
11pub const SPATIAL_DISTANCE_CACHE_MAX_BYTES: usize = 512 * 1024 * 1024;
12pub const SPATIAL_DISTANCE_CACHE_SINGLE_ENTRY_MAX_BYTES: usize = 256 * 1024 * 1024;
13pub const OWNED_DATA_CACHE_MAX_ENTRIES: usize = 2;
14
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum DerivativeStorageMode {
17 AnalyticOperatorRequired,
19 MaterializeIfSmall,
21 DiagnosticsOnly,
23}
24
25#[derive(Clone, Debug)]
26pub struct MaterializationPolicy {
27 pub max_single_dense_bytes: usize,
28 pub max_cached_dense_bytes: usize,
29 pub row_chunk_target_bytes: usize,
30 pub allow_operator_materialization: bool,
31 pub allow_diagnostic_materialization: bool,
32}
33
34#[derive(Debug, thiserror::Error)]
35pub enum MatrixMaterializationError {
36 #[error(
37 "{context}: dense materialization of {nrows}x{ncols} requires {bytes} bytes (limit {limit_bytes})"
38 )]
39 TooLarge {
40 context: &'static str,
41 nrows: usize,
42 ncols: usize,
43 bytes: usize,
44 limit_bytes: usize,
45 },
46
47 #[error("{context}: operator does not implement chunked row access")]
48 MissingRowChunk { context: &'static str },
49
50 #[error("{context}: materialization forbidden by policy (mode={mode:?})")]
51 Forbidden {
52 context: &'static str,
53 mode: DerivativeStorageMode,
54 },
55}
56
57pub trait ResidentBytes {
58 fn resident_bytes(&self) -> usize;
59}
60
61impl ResourcePolicy {
62 pub fn default_library() -> Self {
71 Self {
72 max_single_materialization_bytes: 256 * 1024 * 1024, max_operator_cache_bytes: 1024 * 1024 * 1024, max_spatial_distance_cache_bytes: SPATIAL_DISTANCE_CACHE_MAX_BYTES,
75 max_owned_data_cache_bytes: 512 * 1024 * 1024, row_chunk_target_bytes: 8 * 1024 * 1024, derivative_storage_mode: DerivativeStorageMode::MaterializeIfSmall,
78 }
79 }
80
81 pub fn analytic_operator_required() -> Self {
85 Self {
86 derivative_storage_mode: DerivativeStorageMode::AnalyticOperatorRequired,
87 ..Self::default_library()
88 }
89 }
90
91 pub fn permissive_small_data() -> Self {
93 Self {
94 max_single_materialization_bytes: 2 * 1024 * 1024 * 1024, max_operator_cache_bytes: 2 * 1024 * 1024 * 1024,
96 max_spatial_distance_cache_bytes: SPATIAL_DISTANCE_CACHE_MAX_BYTES,
97 max_owned_data_cache_bytes: 512 * 1024 * 1024,
98 row_chunk_target_bytes: 64 * 1024 * 1024,
99 derivative_storage_mode: DerivativeStorageMode::MaterializeIfSmall,
100 }
101 }
102
103 pub fn material_policy(&self) -> MaterializationPolicy {
104 MaterializationPolicy {
105 max_single_dense_bytes: self.max_single_materialization_bytes,
106 max_cached_dense_bytes: self.max_operator_cache_bytes,
107 row_chunk_target_bytes: self.row_chunk_target_bytes,
108 allow_operator_materialization: matches!(
109 self.derivative_storage_mode,
110 DerivativeStorageMode::MaterializeIfSmall
111 ),
112 allow_diagnostic_materialization: !matches!(
113 self.derivative_storage_mode,
114 DerivativeStorageMode::AnalyticOperatorRequired
115 ),
116 }
117 }
118}
119
120pub fn rows_for_target_bytes(target_bytes: usize, cols: usize) -> usize {
123 let bytes_per_row = cols.saturating_mul(std::mem::size_of::<f64>()).max(1);
124 (target_bytes / bytes_per_row).max(1)
125}
126
127use std::collections::{HashMap, VecDeque};
128use std::hash::Hash;
129use std::sync::{Arc, Mutex};
130
131pub struct ByteLruCache<K: Eq + Hash + Clone, V> {
142 inner: Mutex<ByteLruInner<K, V>>,
143 max_bytes: usize,
144 max_entries: Option<usize>,
145}
146
147struct ByteLruInner<K, V> {
148 map: HashMap<K, (V, usize)>, order: VecDeque<K>,
150 resident_bytes: usize,
151}
152
153impl<K: Eq + Hash + Clone, V: Clone + ResidentBytes> ByteLruCache<K, V> {
154 pub fn new(max_bytes: usize) -> Self {
155 Self {
156 inner: Mutex::new(ByteLruInner {
157 map: HashMap::new(),
158 order: VecDeque::new(),
159 resident_bytes: 0,
160 }),
161 max_bytes,
162 max_entries: None,
163 }
164 }
165
166 pub fn with_max_entries(max_bytes: usize, max_entries: usize) -> Self {
167 Self {
168 inner: Mutex::new(ByteLruInner {
169 map: HashMap::new(),
170 order: VecDeque::new(),
171 resident_bytes: 0,
172 }),
173 max_bytes,
174 max_entries: Some(max_entries),
175 }
176 }
177
178 pub fn get(&self, key: &K) -> Option<V> {
179 let mut g = self.inner.lock().unwrap();
180 let v = g.map.get(key)?.0.clone();
181 if let Some(pos) = g.order.iter().position(|k| k == key) {
183 let k = g.order.remove(pos).unwrap();
184 g.order.push_back(k);
185 }
186 Some(v)
187 }
188
189 pub fn insert(&self, key: K, value: V) {
190 let charge = value.resident_bytes();
191 let mut g = self.inner.lock().unwrap();
192
193 if let Some((_old, old_charge)) = g.map.remove(&key) {
196 g.resident_bytes = g.resident_bytes.saturating_sub(old_charge);
197 if let Some(pos) = g.order.iter().position(|k| k == &key) {
198 g.order.remove(pos);
199 }
200 }
201
202 if charge > self.max_bytes {
203 return;
205 }
206
207 if let Some(max_entries) = self.max_entries {
208 if max_entries == 0 {
209 return;
210 }
211 while g.map.len() >= max_entries {
212 if let Some(evict_key) = g.order.pop_front() {
213 if let Some((_v, c)) = g.map.remove(&evict_key) {
214 g.resident_bytes = g.resident_bytes.saturating_sub(c);
215 }
216 } else {
217 break;
218 }
219 }
220 }
221
222 while g.resident_bytes + charge > self.max_bytes {
223 if let Some(evict_key) = g.order.pop_front() {
224 if let Some((_v, c)) = g.map.remove(&evict_key) {
225 g.resident_bytes = g.resident_bytes.saturating_sub(c);
226 }
227 } else {
228 break;
229 }
230 }
231
232 g.map.insert(key.clone(), (value, charge));
233 g.order.push_back(key);
234 g.resident_bytes = g.resident_bytes.saturating_add(charge);
235 }
236
237 pub fn resident_bytes(&self) -> usize {
238 self.inner.lock().unwrap().resident_bytes
239 }
240
241 pub fn max_bytes(&self) -> usize {
242 self.max_bytes
243 }
244
245 pub fn len(&self) -> usize {
246 self.inner.lock().unwrap().map.len()
247 }
248
249 pub fn clear(&self) {
250 let mut g = self.inner.lock().unwrap();
251 g.map.clear();
252 g.order.clear();
253 g.resident_bytes = 0;
254 }
255}
256
257impl<K: Eq + Hash + Clone, V: Clone + ResidentBytes> std::fmt::Debug for ByteLruCache<K, V> {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("ByteLruCache")
260 .field("resident_bytes", &self.resident_bytes())
261 .field("max_bytes", &self.max_bytes)
262 .field("max_entries", &self.max_entries)
263 .finish()
264 }
265}
266
267impl ResidentBytes for Arc<ndarray::Array2<f64>> {
274 fn resident_bytes(&self) -> usize {
275 std::mem::size_of::<f64>()
276 .saturating_mul(self.nrows())
277 .saturating_mul(self.ncols())
278 }
279}