1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38use std::sync::Arc;
39
40use parking_lot::RwLock;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum IoWorkloadType {
45 Query,
47 Compaction,
49 Backup,
51 Wal,
53 Warmup,
55}
56
57impl IoWorkloadType {
58 pub fn prefers_direct_io(&self) -> bool {
60 match self {
61 IoWorkloadType::Query => false, IoWorkloadType::Compaction => true, IoWorkloadType::Backup => true, IoWorkloadType::Wal => false, IoWorkloadType::Warmup => false, }
67 }
68
69 pub fn cache_weight(&self) -> u32 {
71 match self {
72 IoWorkloadType::Query => 80, IoWorkloadType::Compaction => 10, IoWorkloadType::Backup => 0, IoWorkloadType::Wal => 5, IoWorkloadType::Warmup => 5, }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum AccessPattern {
84 RandomRead,
86 SequentialScan,
88 RandomWrite,
90 SequentialWrite,
92 Mixed,
94}
95
96impl AccessPattern {
97 pub fn cache_benefit_probability(&self) -> f64 {
99 match self {
100 AccessPattern::RandomRead => 0.8, AccessPattern::SequentialScan => 0.2, AccessPattern::RandomWrite => 0.5, AccessPattern::SequentialWrite => 0.1,AccessPattern::Mixed => 0.5,
105 }
106 }
107}
108
109pub struct CachePartition {
111 pub name: String,
113 pub max_bytes: usize,
115 current_bytes: AtomicUsize,
117 hits: AtomicU64,
119 misses: AtomicU64,
121 evictions: AtomicU64,
123}
124
125impl CachePartition {
126 pub fn new(name: &str, max_bytes: usize) -> Self {
128 Self {
129 name: name.to_string(),
130 max_bytes,
131 current_bytes: AtomicUsize::new(0),
132 hits: AtomicU64::new(0),
133 misses: AtomicU64::new(0),
134 evictions: AtomicU64::new(0),
135 }
136 }
137
138 pub fn try_allocate(&self, bytes: usize) -> bool {
140 loop {
141 let current = self.current_bytes.load(Ordering::Relaxed);
142 if current + bytes > self.max_bytes {
143 return false;
144 }
145 if self.current_bytes
146 .compare_exchange_weak(
147 current,
148 current + bytes,
149 Ordering::AcqRel,
150 Ordering::Relaxed,
151 )
152 .is_ok()
153 {
154 return true;
155 }
156 }
157 }
158
159 pub fn release(&self, bytes: usize) {
161 self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
162 }
163
164 pub fn record_hit(&self) {
166 self.hits.fetch_add(1, Ordering::Relaxed);
167 }
168
169 pub fn record_miss(&self) {
171 self.misses.fetch_add(1, Ordering::Relaxed);
172 }
173
174 pub fn record_eviction(&self, bytes: usize) {
176 self.evictions.fetch_add(1, Ordering::Relaxed);
177 self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
178 }
179
180 pub fn hit_rate(&self) -> f64 {
182 let hits = self.hits.load(Ordering::Relaxed);
183 let misses = self.misses.load(Ordering::Relaxed);
184 let total = hits + misses;
185 if total == 0 {
186 return 1.0;
187 }
188 hits as f64 / total as f64
189 }
190
191 pub fn utilization(&self) -> f64 {
193 self.current_bytes.load(Ordering::Relaxed) as f64 / self.max_bytes as f64
194 }
195
196 pub fn stats(&self) -> PartitionStats {
198 PartitionStats {
199 name: self.name.clone(),
200 max_bytes: self.max_bytes,
201 current_bytes: self.current_bytes.load(Ordering::Relaxed),
202 hits: self.hits.load(Ordering::Relaxed),
203 misses: self.misses.load(Ordering::Relaxed),
204 evictions: self.evictions.load(Ordering::Relaxed),
205 hit_rate: self.hit_rate(),
206 utilization: self.utilization(),
207 }
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct PartitionStats {
214 pub name: String,
215 pub max_bytes: usize,
216 pub current_bytes: usize,
217 pub hits: u64,
218 pub misses: u64,
219 pub evictions: u64,
220 pub hit_rate: f64,
221 pub utilization: f64,
222}
223
224#[derive(Debug, Clone)]
226pub struct IoIsolationConfig {
227 pub total_cache_bytes: usize,
229 pub query_partition_pct: u8,
231 pub compaction_partition_pct: u8,
233 pub wal_partition_pct: u8,
235 pub auto_direct_io: bool,
237 pub direct_io_threshold: usize,
239 pub prefer_eviction: bool,
241 pub memory_pressure_threshold: f64,
243}
244
245impl Default for IoIsolationConfig {
246 fn default() -> Self {
247 Self {
248 total_cache_bytes: 1024 * 1024 * 1024, query_partition_pct: 70,
250 compaction_partition_pct: 20,
251 wal_partition_pct: 10,
252 auto_direct_io: true,
253 direct_io_threshold: 64 * 1024 * 1024, prefer_eviction: true,
255 memory_pressure_threshold: 0.85,
256 }
257 }
258}
259
260pub struct IoIsolationManager {
262 config: IoIsolationConfig,
263 query_partition: CachePartition,
265 compaction_partition: CachePartition,
267 wal_partition: CachePartition,
269 total_read_bytes: AtomicU64,
271 total_write_bytes: AtomicU64,
273 direct_io_bytes: AtomicU64,
275 buffered_io_bytes: AtomicU64,
277}
278
279impl IoIsolationManager {
280 pub fn new(config: IoIsolationConfig) -> Self {
282 let total = config.total_cache_bytes;
283 let query_bytes = total * config.query_partition_pct as usize / 100;
284 let compaction_bytes = total * config.compaction_partition_pct as usize / 100;
285 let wal_bytes = total * config.wal_partition_pct as usize / 100;
286
287 Self {
288 config,
289 query_partition: CachePartition::new("query", query_bytes),
290 compaction_partition: CachePartition::new("compaction", compaction_bytes),
291 wal_partition: CachePartition::new("wal", wal_bytes),
292 total_read_bytes: AtomicU64::new(0),
293 total_write_bytes: AtomicU64::new(0),
294 direct_io_bytes: AtomicU64::new(0),
295 buffered_io_bytes: AtomicU64::new(0),
296 }
297 }
298
299 pub fn partition_for(&self, workload: IoWorkloadType) -> &CachePartition {
301 match workload {
302 IoWorkloadType::Query | IoWorkloadType::Warmup => &self.query_partition,
303 IoWorkloadType::Compaction | IoWorkloadType::Backup => &self.compaction_partition,
304 IoWorkloadType::Wal => &self.wal_partition,
305 }
306 }
307
308 pub fn should_use_direct_io(
310 &self,
311 workload: IoWorkloadType,
312 pattern: AccessPattern,
313 size_bytes: usize,
314 ) -> bool {
315 if workload.prefers_direct_io() {
317 return true;
318 }
319
320 if self.config.auto_direct_io {
322 if size_bytes >= self.config.direct_io_threshold {
323 if pattern.cache_benefit_probability() < 0.3 {
325 return true;
326 }
327 }
328 }
329
330 false
331 }
332
333 pub fn record_io(&self, bytes: usize, is_write: bool, is_direct: bool) {
335 if is_write {
336 self.total_write_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
337 } else {
338 self.total_read_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
339 }
340
341 if is_direct {
342 self.direct_io_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
343 } else {
344 self.buffered_io_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
345 }
346 }
347
348 pub fn under_memory_pressure(&self) -> bool {
350 let total_util = (self.query_partition.utilization()
351 + self.compaction_partition.utilization()
352 + self.wal_partition.utilization())
353 / 3.0;
354 total_util > self.config.memory_pressure_threshold
355 }
356
357 pub fn maybe_evict(&self, target_bytes: usize) -> usize {
359 if !self.under_memory_pressure() {
360 return 0;
361 }
362
363 if !self.config.prefer_eviction {
364 return 0;
365 }
366
367 target_bytes
370 }
371
372 pub fn all_stats(&self) -> Vec<PartitionStats> {
374 vec![
375 self.query_partition.stats(),
376 self.compaction_partition.stats(),
377 self.wal_partition.stats(),
378 ]
379 }
380
381 pub fn io_stats(&self) -> IoStats {
383 IoStats {
384 total_read_bytes: self.total_read_bytes.load(Ordering::Relaxed),
385 total_write_bytes: self.total_write_bytes.load(Ordering::Relaxed),
386 direct_io_bytes: self.direct_io_bytes.load(Ordering::Relaxed),
387 buffered_io_bytes: self.buffered_io_bytes.load(Ordering::Relaxed),
388 direct_io_ratio: {
389 let direct = self.direct_io_bytes.load(Ordering::Relaxed);
390 let buffered = self.buffered_io_bytes.load(Ordering::Relaxed);
391 let total = direct + buffered;
392 if total == 0 { 0.0 } else { direct as f64 / total as f64 }
393 },
394 }
395 }
396}
397
398#[derive(Debug, Clone)]
400pub struct IoStats {
401 pub total_read_bytes: u64,
402 pub total_write_bytes: u64,
403 pub direct_io_bytes: u64,
404 pub buffered_io_bytes: u64,
405 pub direct_io_ratio: f64,
406}
407
408pub struct AlignmentContract {
410 pub buffer_alignment: usize,
412 pub offset_alignment: usize,
414 pub size_alignment: usize,
416}
417
418impl AlignmentContract {
419 #[cfg(target_os = "linux")]
421 pub fn platform_default() -> Self {
422 Self {
423 buffer_alignment: 512,
424 offset_alignment: 512,
425 size_alignment: 512,
426 }
427 }
428
429 #[cfg(target_os = "macos")]
430 pub fn platform_default() -> Self {
431 Self {
432 buffer_alignment: 4096,
433 offset_alignment: 4096,
434 size_alignment: 4096,
435 }
436 }
437
438 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
439 pub fn platform_default() -> Self {
440 Self {
441 buffer_alignment: 4096,
442 offset_alignment: 4096,
443 size_alignment: 4096,
444 }
445 }
446
447 pub fn validate_buffer(&self, ptr: *const u8) -> Result<(), AlignmentError> {
449 if (ptr as usize).is_multiple_of(self.buffer_alignment) {
450 Ok(())
451 } else {
452 Err(AlignmentError::BufferMisaligned {
453 actual: ptr as usize % self.buffer_alignment,
454 required: self.buffer_alignment,
455 })
456 }
457 }
458
459 pub fn validate_offset(&self, offset: u64) -> Result<(), AlignmentError> {
461 if (offset as usize).is_multiple_of(self.offset_alignment) {
462 Ok(())
463 } else {
464 Err(AlignmentError::OffsetMisaligned {
465 actual: offset,
466 required: self.offset_alignment,
467 })
468 }
469 }
470
471 pub fn validate_size(&self, size: usize) -> Result<(), AlignmentError> {
473 if size.is_multiple_of(self.size_alignment) {
474 Ok(())
475 } else {
476 Err(AlignmentError::SizeMisaligned {
477 actual: size,
478 required: self.size_alignment,
479 })
480 }
481 }
482
483 pub fn align_size(&self, size: usize) -> usize {
485 size.div_ceil(self.size_alignment) * self.size_alignment
486 }
487}
488
489#[derive(Debug)]
491pub enum AlignmentError {
492 BufferMisaligned { actual: usize, required: usize },
493 OffsetMisaligned { actual: u64, required: usize },
494 SizeMisaligned { actual: usize, required: usize },
495}
496
497impl std::fmt::Display for AlignmentError {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 match self {
500 AlignmentError::BufferMisaligned { actual, required } => {
501 write!(
502 f,
503 "Buffer misaligned: offset {} not multiple of {}",
504 actual, required
505 )
506 }
507 AlignmentError::OffsetMisaligned { actual, required } => {
508 write!(
509 f,
510 "Offset {} not aligned to {} bytes",
511 actual, required
512 )
513 }
514 AlignmentError::SizeMisaligned { actual, required } => {
515 write!(
516 f,
517 "Size {} not aligned to {} bytes",
518 actual, required
519 )
520 }
521 }
522 }
523}
524
525impl std::error::Error for AlignmentError {}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530
531 #[test]
532 fn test_cache_partition_allocation() {
533 let partition = CachePartition::new("test", 1024);
534
535 assert!(partition.try_allocate(512));
536 assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 512);
537
538 assert!(partition.try_allocate(512));
539 assert_eq!(partition.current_bytes.load(Ordering::Relaxed), 1024);
540
541 assert!(!partition.try_allocate(1));
543
544 partition.release(512);
546 assert!(partition.try_allocate(512));
547 }
548
549 #[test]
550 fn test_partition_stats() {
551 let partition = CachePartition::new("test", 1000);
552
553 partition.try_allocate(500);
554 partition.record_hit();
555 partition.record_hit();
556 partition.record_miss();
557
558 let stats = partition.stats();
559 assert_eq!(stats.current_bytes, 500);
560 assert_eq!(stats.hits, 2);
561 assert_eq!(stats.misses, 1);
562 assert!((stats.hit_rate - 0.666).abs() < 0.01);
563 assert!((stats.utilization - 0.5).abs() < 0.01);
564 }
565
566 #[test]
567 fn test_direct_io_decision() {
568 let manager = IoIsolationManager::new(IoIsolationConfig::default());
569
570 assert!(manager.should_use_direct_io(
572 IoWorkloadType::Compaction,
573 AccessPattern::SequentialScan,
574 1024
575 ));
576
577 assert!(!manager.should_use_direct_io(
579 IoWorkloadType::Query,
580 AccessPattern::RandomRead,
581 4096
582 ));
583
584 assert!(manager.should_use_direct_io(
586 IoWorkloadType::Query,
587 AccessPattern::SequentialScan,
588 100 * 1024 * 1024 ));
590 }
591
592 #[test]
593 fn test_alignment_contract() {
594 let contract = AlignmentContract::platform_default();
595
596 assert!(contract.validate_offset(4096).is_ok());
598 assert!(contract.validate_size(4096).is_ok());
599
600 if contract.offset_alignment > 1 {
602 assert!(contract.validate_offset(1).is_err());
603 }
604
605 let aligned = contract.align_size(5000);
607 assert!(aligned >= 5000);
608 assert!(aligned.is_multiple_of(contract.size_alignment));
609 }
610}