1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub enum IoWorkloadType {
42 Query,
44 Compaction,
46 Backup,
48 Wal,
50 Warmup,
52}
53
54impl IoWorkloadType {
55 pub fn prefers_direct_io(&self) -> bool {
57 match self {
58 IoWorkloadType::Query => false, IoWorkloadType::Compaction => true, IoWorkloadType::Backup => true, IoWorkloadType::Wal => false, IoWorkloadType::Warmup => false, }
64 }
65
66 pub fn cache_weight(&self) -> u32 {
68 match self {
69 IoWorkloadType::Query => 80, IoWorkloadType::Compaction => 10, IoWorkloadType::Backup => 0, IoWorkloadType::Wal => 5, IoWorkloadType::Warmup => 5, }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum AccessPattern {
81 RandomRead,
83 SequentialScan,
85 RandomWrite,
87 SequentialWrite,
89 Mixed,
91}
92
93impl AccessPattern {
94 pub fn cache_benefit_probability(&self) -> f64 {
96 match self {
97 AccessPattern::RandomRead => 0.8, AccessPattern::SequentialScan => 0.2, AccessPattern::RandomWrite => 0.5, AccessPattern::SequentialWrite => 0.1, AccessPattern::Mixed => 0.5,
102 }
103 }
104}
105
106pub struct CachePartition {
108 pub name: String,
110 pub max_bytes: usize,
112 current_bytes: AtomicUsize,
114 hits: AtomicU64,
116 misses: AtomicU64,
118 evictions: AtomicU64,
120}
121
122impl CachePartition {
123 pub fn new(name: &str, max_bytes: usize) -> Self {
125 Self {
126 name: name.to_string(),
127 max_bytes,
128 current_bytes: AtomicUsize::new(0),
129 hits: AtomicU64::new(0),
130 misses: AtomicU64::new(0),
131 evictions: AtomicU64::new(0),
132 }
133 }
134
135 pub fn try_allocate(&self, bytes: usize) -> bool {
137 loop {
138 let current = self.current_bytes.load(Ordering::Relaxed);
139 if current + bytes > self.max_bytes {
140 return false;
141 }
142 if self
143 .current_bytes
144 .compare_exchange_weak(
145 current,
146 current + bytes,
147 Ordering::AcqRel,
148 Ordering::Relaxed,
149 )
150 .is_ok()
151 {
152 return true;
153 }
154 }
155 }
156
157 pub fn release(&self, bytes: usize) {
159 self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
160 }
161
162 pub fn record_hit(&self) {
164 self.hits.fetch_add(1, Ordering::Relaxed);
165 }
166
167 pub fn record_miss(&self) {
169 self.misses.fetch_add(1, Ordering::Relaxed);
170 }
171
172 pub fn record_eviction(&self, bytes: usize) {
174 self.evictions.fetch_add(1, Ordering::Relaxed);
175 self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
176 }
177
178 pub fn hit_rate(&self) -> f64 {
180 let hits = self.hits.load(Ordering::Relaxed);
181 let misses = self.misses.load(Ordering::Relaxed);
182 let total = hits + misses;
183 if total == 0 {
184 return 1.0;
185 }
186 hits as f64 / total as f64
187 }
188
189 pub fn utilization(&self) -> f64 {
191 self.current_bytes.load(Ordering::Relaxed) as f64 / self.max_bytes as f64
192 }
193
194 pub fn stats(&self) -> PartitionStats {
196 PartitionStats {
197 name: self.name.clone(),
198 max_bytes: self.max_bytes,
199 current_bytes: self.current_bytes.load(Ordering::Relaxed),
200 hits: self.hits.load(Ordering::Relaxed),
201 misses: self.misses.load(Ordering::Relaxed),
202 evictions: self.evictions.load(Ordering::Relaxed),
203 hit_rate: self.hit_rate(),
204 utilization: self.utilization(),
205 }
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct PartitionStats {
212 pub name: String,
213 pub max_bytes: usize,
214 pub current_bytes: usize,
215 pub hits: u64,
216 pub misses: u64,
217 pub evictions: u64,
218 pub hit_rate: f64,
219 pub utilization: f64,
220}
221
222#[derive(Debug, Clone)]
224pub struct IoIsolationConfig {
225 pub total_cache_bytes: usize,
227 pub query_partition_pct: u8,
229 pub compaction_partition_pct: u8,
231 pub wal_partition_pct: u8,
233 pub auto_direct_io: bool,
235 pub direct_io_threshold: usize,
237 pub prefer_eviction: bool,
239 pub memory_pressure_threshold: f64,
241}
242
243impl Default for IoIsolationConfig {
244 fn default() -> Self {
245 Self {
246 total_cache_bytes: 1024 * 1024 * 1024, query_partition_pct: 70,
248 compaction_partition_pct: 20,
249 wal_partition_pct: 10,
250 auto_direct_io: true,
251 direct_io_threshold: 64 * 1024 * 1024, prefer_eviction: true,
253 memory_pressure_threshold: 0.85,
254 }
255 }
256}
257
258pub struct IoIsolationManager {
260 config: IoIsolationConfig,
261 query_partition: CachePartition,
263 compaction_partition: CachePartition,
265 wal_partition: CachePartition,
267 total_read_bytes: AtomicU64,
269 total_write_bytes: AtomicU64,
271 direct_io_bytes: AtomicU64,
273 buffered_io_bytes: AtomicU64,
275}
276
277impl IoIsolationManager {
278 pub fn new(config: IoIsolationConfig) -> Self {
280 let total = config.total_cache_bytes;
281 let query_bytes = total * config.query_partition_pct as usize / 100;
282 let compaction_bytes = total * config.compaction_partition_pct as usize / 100;
283 let wal_bytes = total * config.wal_partition_pct as usize / 100;
284
285 Self {
286 config,
287 query_partition: CachePartition::new("query", query_bytes),
288 compaction_partition: CachePartition::new("compaction", compaction_bytes),
289 wal_partition: CachePartition::new("wal", wal_bytes),
290 total_read_bytes: AtomicU64::new(0),
291 total_write_bytes: AtomicU64::new(0),
292 direct_io_bytes: AtomicU64::new(0),
293 buffered_io_bytes: AtomicU64::new(0),
294 }
295 }
296
297 pub fn partition_for(&self, workload: IoWorkloadType) -> &CachePartition {
299 match workload {
300 IoWorkloadType::Query | IoWorkloadType::Warmup => &self.query_partition,
301 IoWorkloadType::Compaction | IoWorkloadType::Backup => &self.compaction_partition,
302 IoWorkloadType::Wal => &self.wal_partition,
303 }
304 }
305
306 pub fn should_use_direct_io(
308 &self,
309 workload: IoWorkloadType,
310 pattern: AccessPattern,
311 size_bytes: usize,
312 ) -> bool {
313 if workload.prefers_direct_io() {
315 return true;
316 }
317
318 if self.config.auto_direct_io {
320 if size_bytes >= self.config.direct_io_threshold {
321 if pattern.cache_benefit_probability() < 0.3 {
323 return true;
324 }
325 }
326 }
327
328 false
329 }
330
331 pub fn record_io(&self, bytes: usize, is_write: bool, is_direct: bool) {
333 if is_write {
334 self.total_write_bytes
335 .fetch_add(bytes as u64, Ordering::Relaxed);
336 } else {
337 self.total_read_bytes
338 .fetch_add(bytes as u64, Ordering::Relaxed);
339 }
340
341 if is_direct {
342 self.direct_io_bytes
343 .fetch_add(bytes as u64, Ordering::Relaxed);
344 } else {
345 self.buffered_io_bytes
346 .fetch_add(bytes as u64, Ordering::Relaxed);
347 }
348 }
349
350 pub fn under_memory_pressure(&self) -> bool {
352 let total_util = (self.query_partition.utilization()
353 + self.compaction_partition.utilization()
354 + self.wal_partition.utilization())
355 / 3.0;
356 total_util > self.config.memory_pressure_threshold
357 }
358
359 pub fn maybe_evict(&self, target_bytes: usize) -> usize {
361 if !self.under_memory_pressure() {
362 return 0;
363 }
364
365 if !self.config.prefer_eviction {
366 return 0;
367 }
368
369 target_bytes
372 }
373
374 pub fn all_stats(&self) -> Vec<PartitionStats> {
376 vec![
377 self.query_partition.stats(),
378 self.compaction_partition.stats(),
379 self.wal_partition.stats(),
380 ]
381 }
382
383 pub fn io_stats(&self) -> IoStats {
385 IoStats {
386 total_read_bytes: self.total_read_bytes.load(Ordering::Relaxed),
387 total_write_bytes: self.total_write_bytes.load(Ordering::Relaxed),
388 direct_io_bytes: self.direct_io_bytes.load(Ordering::Relaxed),
389 buffered_io_bytes: self.buffered_io_bytes.load(Ordering::Relaxed),
390 direct_io_ratio: {
391 let direct = self.direct_io_bytes.load(Ordering::Relaxed);
392 let buffered = self.buffered_io_bytes.load(Ordering::Relaxed);
393 let total = direct + buffered;
394 if total == 0 {
395 0.0
396 } else {
397 direct as f64 / total as f64
398 }
399 },
400 }
401 }
402}
403
404#[derive(Debug, Clone)]
406pub struct IoStats {
407 pub total_read_bytes: u64,
408 pub total_write_bytes: u64,
409 pub direct_io_bytes: u64,
410 pub buffered_io_bytes: u64,
411 pub direct_io_ratio: f64,
412}
413
414pub struct AlignmentContract {
416 pub buffer_alignment: usize,
418 pub offset_alignment: usize,
420 pub size_alignment: usize,
422}
423
424impl AlignmentContract {
425 #[cfg(target_os = "linux")]
427 pub fn platform_default() -> Self {
428 Self {
429 buffer_alignment: 512,
430 offset_alignment: 512,
431 size_alignment: 512,
432 }
433 }
434
435 #[cfg(target_os = "macos")]
436 pub fn platform_default() -> Self {
437 Self {
438 buffer_alignment: 4096,
439 offset_alignment: 4096,
440 size_alignment: 4096,
441 }
442 }
443
444 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
445 pub fn platform_default() -> Self {
446 Self {
447 buffer_alignment: 4096,
448 offset_alignment: 4096,
449 size_alignment: 4096,
450 }
451 }
452
453 pub fn validate_buffer(&self, ptr: *const u8) -> Result<(), AlignmentError> {
455 if (ptr as usize).is_multiple_of(self.buffer_alignment) {
456 Ok(())
457 } else {
458 Err(AlignmentError::BufferMisaligned {
459 actual: ptr as usize % self.buffer_alignment,
460 required: self.buffer_alignment,
461 })
462 }
463 }
464
465 pub fn validate_offset(&self, offset: u64) -> Result<(), AlignmentError> {
467 if (offset as usize).is_multiple_of(self.offset_alignment) {
468 Ok(())
469 } else {
470 Err(AlignmentError::OffsetMisaligned {
471 actual: offset,
472 required: self.offset_alignment,
473 })
474 }
475 }
476
477 pub fn validate_size(&self, size: usize) -> Result<(), AlignmentError> {
479 if size.is_multiple_of(self.size_alignment) {
480 Ok(())
481 } else {
482 Err(AlignmentError::SizeMisaligned {
483 actual: size,
484 required: self.size_alignment,
485 })
486 }
487 }
488
489 pub fn align_size(&self, size: usize) -> usize {
491 size.div_ceil(self.size_alignment) * self.size_alignment
492 }
493}
494
495#[derive(Debug)]
497pub enum AlignmentError {
498 BufferMisaligned { actual: usize, required: usize },
499 OffsetMisaligned { actual: u64, required: usize },
500 SizeMisaligned { actual: usize, required: usize },
501}
502
503impl std::fmt::Display for AlignmentError {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 match self {
506 AlignmentError::BufferMisaligned { actual, required } => {
507 write!(
508 f,
509 "Buffer misaligned: offset {} not multiple of {}",
510 actual, required
511 )
512 }
513 AlignmentError::OffsetMisaligned { actual, required } => {
514 write!(f, "Offset {} not aligned to {} bytes", actual, required)
515 }
516 AlignmentError::SizeMisaligned { actual, required } => {
517 write!(f, "Size {} not aligned to {} bytes", actual, required)
518 }
519 }
520 }
521}
522
523impl std::error::Error for AlignmentError {}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528
529 #[test]
530 fn test_cache_partition_allocation() {
531 let partition = CachePartition::new("test", 1024);
532
533 assert!(partition.try_allocate(512));
534 assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 512);
535
536 assert!(partition.try_allocate(512));
537 assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 1024);
538
539 assert!(!partition.try_allocate(1));
541
542 partition.release(512);
544 assert!(partition.try_allocate(512));
545 }
546
547 #[test]
548 fn test_partition_stats() {
549 let partition = CachePartition::new("test", 1000);
550
551 partition.try_allocate(500);
552 partition.record_hit();
553 partition.record_hit();
554 partition.record_miss();
555
556 let stats = partition.stats();
557 assert_eq!(stats.current_bytes, 500);
558 assert_eq!(stats.hits, 2);
559 assert_eq!(stats.misses, 1);
560 assert!((stats.hit_rate - 0.666).abs() < 0.01);
561 assert!((stats.utilization - 0.5).abs() < 0.01);
562 }
563
564 #[test]
565 fn test_direct_io_decision() {
566 let manager = IoIsolationManager::new(IoIsolationConfig::default());
567
568 assert!(manager.should_use_direct_io(
570 IoWorkloadType::Compaction,
571 AccessPattern::SequentialScan,
572 1024
573 ));
574
575 assert!(!manager.should_use_direct_io(
577 IoWorkloadType::Query,
578 AccessPattern::RandomRead,
579 4096
580 ));
581
582 assert!(manager.should_use_direct_io(
584 IoWorkloadType::Query,
585 AccessPattern::SequentialScan,
586 100 * 1024 * 1024 ));
588 }
589
590 #[test]
591 fn test_alignment_contract() {
592 let contract = AlignmentContract::platform_default();
593
594 assert!(contract.validate_offset(4096).is_ok());
596 assert!(contract.validate_size(4096).is_ok());
597
598 if contract.offset_alignment > 1 {
600 assert!(contract.validate_offset(1).is_err());
601 }
602
603 let aligned = contract.align_size(5000);
605 assert!(aligned >= 5000);
606 assert!(aligned.is_multiple_of(contract.size_alignment));
607 }
608}