1use std::{fmt::Debug, sync::Arc, time::Duration};
16
17use crate::io::device::statistics::Statistics;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum StorageFilterResult {
22 Admit,
24 Reject,
26 Throttled(Duration),
29}
30
31impl StorageFilterResult {
32 pub fn is_admitted(&self) -> bool {
34 matches!(self, StorageFilterResult::Admit)
35 }
36
37 pub fn is_rejected(&self) -> bool {
39 matches!(self, StorageFilterResult::Reject)
40 }
41}
42
43pub trait StorageFilterCondition: Send + Sync + Debug + 'static {
45 fn filter(&self, stats: &Arc<Statistics>, hash: u64, estimated_size: usize) -> StorageFilterResult;
47}
48
49#[derive(Debug, Default)]
53pub struct StorageFilter {
54 conditions: Vec<Box<dyn StorageFilterCondition>>,
55}
56
57impl StorageFilter {
58 pub fn new() -> Self {
60 Self::default()
61 }
62
63 pub fn with_condition<C: StorageFilterCondition>(mut self, condition: C) -> Self {
65 self.conditions.push(Box::new(condition));
66 self
67 }
68
69 pub fn filter(&self, stats: &Arc<Statistics>, hash: u64, estimated_size: usize) -> StorageFilterResult {
71 let mut duration = Duration::ZERO;
72 for condition in &self.conditions {
73 match condition.filter(stats, hash, estimated_size) {
74 StorageFilterResult::Admit => {}
75 StorageFilterResult::Reject => return StorageFilterResult::Reject,
76 StorageFilterResult::Throttled(dur) => duration += dur,
77 }
78 }
79 if duration.is_zero() {
80 StorageFilterResult::Admit
81 } else {
82 StorageFilterResult::Throttled(duration)
83 }
84 }
85}
86
87pub mod conditions {
88
89 use std::ops::{Bound, Range, RangeBounds};
90
91 pub use super::*;
92
93 #[derive(Debug, Default)]
95 pub struct AdmitAll;
96
97 impl StorageFilterCondition for AdmitAll {
98 fn filter(&self, _: &Arc<Statistics>, _: u64, _: usize) -> StorageFilterResult {
99 StorageFilterResult::Admit
100 }
101 }
102
103 #[derive(Debug, Default)]
105 pub struct RejectAll;
106
107 impl StorageFilterCondition for RejectAll {
108 fn filter(&self, _: &Arc<Statistics>, _: u64, _: usize) -> StorageFilterResult {
109 StorageFilterResult::Reject
110 }
111 }
112
113 #[derive(Debug, Default)]
114 pub struct IoThrottle;
115
116 impl StorageFilterCondition for IoThrottle {
117 fn filter(&self, stats: &Arc<Statistics>, _: u64, _: usize) -> StorageFilterResult {
118 let duration = stats.write_throttle();
119 if duration.is_zero() {
120 StorageFilterResult::Admit
121 } else {
122 StorageFilterResult::Throttled(duration)
123 }
124 }
125 }
126
127 #[derive(Debug)]
129 pub struct EstimatedSize {
130 range: Range<usize>,
131 }
132
133 impl EstimatedSize {
134 pub fn new<R: RangeBounds<usize>>(range: R) -> Self {
136 let start = match range.start_bound() {
137 Bound::Included(v) => *v,
138 Bound::Excluded(v) => *v + 1,
139 Bound::Unbounded => 0,
140 };
141 let end = match range.end_bound() {
142 Bound::Included(v) => *v + 1,
143 Bound::Excluded(v) => *v,
144 Bound::Unbounded => usize::MAX,
145 };
146 Self { range: start..end }
147 }
148 }
149
150 impl StorageFilterCondition for EstimatedSize {
151 fn filter(&self, _: &Arc<Statistics>, _: u64, estimated_size: usize) -> StorageFilterResult {
152 if self.range.contains(&estimated_size) {
153 StorageFilterResult::Admit
154 } else {
155 StorageFilterResult::Reject
156 }
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::conditions::*;
164 use crate::Throttle;
165
166 #[test]
167 fn test_estimated_size_condition() {
168 let condition = EstimatedSize::new(10..20);
169 let statistics = Arc::new(Statistics::new(Throttle::default()));
170 assert_eq!(condition.filter(&statistics, 0, 15), StorageFilterResult::Admit);
171 assert_eq!(condition.filter(&statistics, 0, 5), StorageFilterResult::Reject);
172 assert_eq!(condition.filter(&statistics, 0, 20), StorageFilterResult::Reject);
173 }
174}