1use crate::record::Timestamp;
2
3#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
4pub struct CountOrBytes {
5 pub count: usize,
6 pub bytes: usize,
7}
8
9impl CountOrBytes {
10 pub const ZERO: CountOrBytes = CountOrBytes { count: 0, bytes: 0 };
11
12 pub const MAX: CountOrBytes = CountOrBytes {
13 count: usize::MAX,
14 bytes: usize::MAX,
15 };
16}
17
18#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
19pub enum ReadLimit {
20 #[default]
21 Unbounded,
22 Count(usize),
23 Bytes(usize),
24 CountOrBytes(CountOrBytes),
25}
26
27#[derive(PartialEq, Debug)]
28pub enum EvaluatedReadLimit {
29 Remaining(ReadLimit),
30 Exhausted,
31}
32
33impl ReadLimit {
34 pub fn is_unbounded(&self) -> bool {
35 matches!(self, ReadLimit::Unbounded)
36 }
37
38 pub fn is_bounded(&self) -> bool {
39 !matches!(self, ReadLimit::Unbounded)
40 }
41
42 pub fn from_count_and_bytes(count: Option<usize>, bytes: Option<usize>) -> Self {
43 match (count, bytes) {
44 (None, None) => Self::Unbounded,
45 (Some(0), _) | (_, Some(0)) => Self::Count(0),
46 (Some(count), None) => Self::Count(count),
47 (None, Some(bytes)) => Self::Bytes(bytes),
48 (Some(count), Some(bytes)) => Self::CountOrBytes(CountOrBytes { count, bytes }),
49 }
50 }
51
52 pub fn count(&self) -> Option<usize> {
53 match self {
54 ReadLimit::Unbounded => None,
55 ReadLimit::Count(count) => Some(*count),
56 ReadLimit::Bytes(_) => None,
57 ReadLimit::CountOrBytes(CountOrBytes { count, .. }) => Some(*count),
58 }
59 }
60
61 pub fn bytes(&self) -> Option<usize> {
62 match self {
63 ReadLimit::Unbounded => None,
64 ReadLimit::Count(_) => None,
65 ReadLimit::Bytes(bytes) => Some(*bytes),
66 ReadLimit::CountOrBytes(CountOrBytes { bytes, .. }) => Some(*bytes),
67 }
68 }
69
70 pub fn into_allowance(self, max: CountOrBytes) -> CountOrBytes {
71 match self {
72 ReadLimit::Unbounded => max,
73 ReadLimit::Count(count) => CountOrBytes {
74 count: count.min(max.count),
75 bytes: max.bytes,
76 },
77 ReadLimit::Bytes(bytes) => CountOrBytes {
78 count: max.count,
79 bytes: bytes.min(max.bytes),
80 },
81 ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => CountOrBytes {
82 count: count.min(max.count),
83 bytes: bytes.min(max.bytes),
84 },
85 }
86 }
87
88 pub fn allow(&self, additional_count: usize, additional_bytes: usize) -> bool {
89 match self {
90 ReadLimit::Unbounded => true,
91 ReadLimit::Count(count) => additional_count <= *count,
92 ReadLimit::Bytes(bytes) => additional_bytes <= *bytes,
93 ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
94 additional_count <= *count && additional_bytes <= *bytes
95 }
96 }
97 }
98
99 pub fn deny(&self, additional_count: usize, additional_bytes: usize) -> bool {
100 match self {
101 ReadLimit::Unbounded => false,
102 ReadLimit::Count(count) => additional_count > *count,
103 ReadLimit::Bytes(bytes) => additional_bytes > *bytes,
104 ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
105 additional_count > *count || additional_bytes > *bytes
106 }
107 }
108 }
109
110 pub fn remaining(&self, consumed_count: usize, consumed_bytes: usize) -> EvaluatedReadLimit {
113 let remaining = match self {
114 ReadLimit::Unbounded => Some(ReadLimit::Unbounded),
115 ReadLimit::Count(count) => {
116 (consumed_count < *count).then(|| ReadLimit::Count(count - consumed_count))
117 }
118 ReadLimit::Bytes(bytes) => {
119 (consumed_bytes < *bytes).then(|| ReadLimit::Bytes(bytes - consumed_bytes))
120 }
121 ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
122 (consumed_count < *count && consumed_bytes < *bytes).then(|| {
123 ReadLimit::CountOrBytes(CountOrBytes {
124 count: count - consumed_count,
125 bytes: bytes - consumed_bytes,
126 })
127 })
128 }
129 };
130
131 match remaining {
132 Some(limit) => EvaluatedReadLimit::Remaining(limit),
133 None => EvaluatedReadLimit::Exhausted,
134 }
135 }
136}
137
138#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
139pub enum ReadUntil {
140 #[default]
141 Unbounded,
142 Timestamp(Timestamp),
143}
144
145impl From<Option<Timestamp>> for ReadUntil {
146 fn from(timestamp: Option<Timestamp>) -> Self {
147 match timestamp {
148 Some(ts) => ReadUntil::Timestamp(ts),
149 None => ReadUntil::Unbounded,
150 }
151 }
152}
153
154impl From<ReadUntil> for Option<Timestamp> {
155 fn from(until: ReadUntil) -> Self {
156 match until {
157 ReadUntil::Unbounded => None,
158 ReadUntil::Timestamp(ts) => Some(ts),
159 }
160 }
161}
162
163impl ReadUntil {
164 pub fn is_unbounded(&self) -> bool {
165 matches!(self, ReadUntil::Unbounded)
166 }
167
168 pub fn is_timestamp(&self) -> bool {
169 matches!(self, ReadUntil::Timestamp(_))
170 }
171
172 pub fn allow(&self, timestamp: Timestamp) -> bool {
173 match self {
174 ReadUntil::Unbounded => true,
175 ReadUntil::Timestamp(t) => timestamp < *t,
176 }
177 }
178
179 pub fn deny(&self, timestamp: Timestamp) -> bool {
180 match self {
181 ReadUntil::Unbounded => false,
182 ReadUntil::Timestamp(t) => timestamp >= *t,
183 }
184 }
185}
186
187#[cfg(test)]
188mod test {
189 use rstest::rstest;
190
191 use super::{CountOrBytes, EvaluatedReadLimit, ReadLimit};
192
193 #[rstest]
194 #[case(
195 ReadLimit::Count(100),
196 10,
197 100000,
198 EvaluatedReadLimit::Remaining(ReadLimit::Count(90))
199 )]
200 #[case(ReadLimit::Count(100), 100, 100000, EvaluatedReadLimit::Exhausted)]
201 #[case(
202 ReadLimit::Bytes(100),
203 1000,
204 99,
205 EvaluatedReadLimit::Remaining(ReadLimit::Bytes(1))
206 )]
207 #[case(ReadLimit::CountOrBytes(CountOrBytes{count: 50, bytes: 50}), 40, 45, EvaluatedReadLimit::Remaining(ReadLimit::CountOrBytes(CountOrBytes{count: 10, bytes: 5})))]
208 #[case(ReadLimit::CountOrBytes(CountOrBytes{count: 50, bytes: 50}), 51, 45, EvaluatedReadLimit::Exhausted)]
209 fn remaining(
210 #[case] old_limit: ReadLimit,
211 #[case] consumed_count: usize,
212 #[case] consumed_bytes: usize,
213 #[case] remaining_limit: EvaluatedReadLimit,
214 ) {
215 assert_eq!(
216 old_limit.remaining(consumed_count, consumed_bytes),
217 remaining_limit
218 )
219 }
220}