s2_common/
read_extent.rs

1use crate::record::Timestamp;
2
3#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
4pub struct CountOrBytes {
5    pub count: usize,
6    pub bytes: usize,
7}
8
9impl CountOrBytes {
10    pub const ZERO: CountOrBytes = CountOrBytes { count: 0, bytes: 0 };
11
12    pub const MAX: CountOrBytes = CountOrBytes {
13        count: usize::MAX,
14        bytes: usize::MAX,
15    };
16}
17
18#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
19pub enum ReadLimit {
20    #[default]
21    Unbounded,
22    Count(usize),
23    Bytes(usize),
24    CountOrBytes(CountOrBytes),
25}
26
27#[derive(PartialEq, Debug)]
28pub enum EvaluatedReadLimit {
29    Remaining(ReadLimit),
30    Exhausted,
31}
32
33impl ReadLimit {
34    pub fn is_unbounded(&self) -> bool {
35        matches!(self, ReadLimit::Unbounded)
36    }
37
38    pub fn is_bounded(&self) -> bool {
39        !matches!(self, ReadLimit::Unbounded)
40    }
41
42    pub fn from_count_and_bytes(count: Option<usize>, bytes: Option<usize>) -> Self {
43        match (count, bytes) {
44            (None, None) => Self::Unbounded,
45            (Some(0), _) | (_, Some(0)) => Self::Count(0),
46            (Some(count), None) => Self::Count(count),
47            (None, Some(bytes)) => Self::Bytes(bytes),
48            (Some(count), Some(bytes)) => Self::CountOrBytes(CountOrBytes { count, bytes }),
49        }
50    }
51
52    pub fn count(&self) -> Option<usize> {
53        match self {
54            ReadLimit::Unbounded => None,
55            ReadLimit::Count(count) => Some(*count),
56            ReadLimit::Bytes(_) => None,
57            ReadLimit::CountOrBytes(CountOrBytes { count, .. }) => Some(*count),
58        }
59    }
60
61    pub fn bytes(&self) -> Option<usize> {
62        match self {
63            ReadLimit::Unbounded => None,
64            ReadLimit::Count(_) => None,
65            ReadLimit::Bytes(bytes) => Some(*bytes),
66            ReadLimit::CountOrBytes(CountOrBytes { bytes, .. }) => Some(*bytes),
67        }
68    }
69
70    pub fn into_allowance(self, max: CountOrBytes) -> CountOrBytes {
71        match self {
72            ReadLimit::Unbounded => max,
73            ReadLimit::Count(count) => CountOrBytes {
74                count: count.min(max.count),
75                bytes: max.bytes,
76            },
77            ReadLimit::Bytes(bytes) => CountOrBytes {
78                count: max.count,
79                bytes: bytes.min(max.bytes),
80            },
81            ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => CountOrBytes {
82                count: count.min(max.count),
83                bytes: bytes.min(max.bytes),
84            },
85        }
86    }
87
88    pub fn allow(&self, additional_count: usize, additional_bytes: usize) -> bool {
89        match self {
90            ReadLimit::Unbounded => true,
91            ReadLimit::Count(count) => additional_count <= *count,
92            ReadLimit::Bytes(bytes) => additional_bytes <= *bytes,
93            ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
94                additional_count <= *count && additional_bytes <= *bytes
95            }
96        }
97    }
98
99    pub fn deny(&self, additional_count: usize, additional_bytes: usize) -> bool {
100        match self {
101            ReadLimit::Unbounded => false,
102            ReadLimit::Count(count) => additional_count > *count,
103            ReadLimit::Bytes(bytes) => additional_bytes > *bytes,
104            ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
105                additional_count > *count || additional_bytes > *bytes
106            }
107        }
108    }
109
110    /// Given the amount of records already consumed, generate a new `ReadLimit` representing
111    /// the remaining limit, or none if the limit has been met.
112    pub fn remaining(&self, consumed_count: usize, consumed_bytes: usize) -> EvaluatedReadLimit {
113        let remaining = match self {
114            ReadLimit::Unbounded => Some(ReadLimit::Unbounded),
115            ReadLimit::Count(count) => {
116                (consumed_count < *count).then(|| ReadLimit::Count(count - consumed_count))
117            }
118            ReadLimit::Bytes(bytes) => {
119                (consumed_bytes < *bytes).then(|| ReadLimit::Bytes(bytes - consumed_bytes))
120            }
121            ReadLimit::CountOrBytes(CountOrBytes { count, bytes }) => {
122                (consumed_count < *count && consumed_bytes < *bytes).then(|| {
123                    ReadLimit::CountOrBytes(CountOrBytes {
124                        count: count - consumed_count,
125                        bytes: bytes - consumed_bytes,
126                    })
127                })
128            }
129        };
130
131        match remaining {
132            Some(limit) => EvaluatedReadLimit::Remaining(limit),
133            None => EvaluatedReadLimit::Exhausted,
134        }
135    }
136}
137
138#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy)]
139pub enum ReadUntil {
140    #[default]
141    Unbounded,
142    Timestamp(Timestamp),
143}
144
145impl From<Option<Timestamp>> for ReadUntil {
146    fn from(timestamp: Option<Timestamp>) -> Self {
147        match timestamp {
148            Some(ts) => ReadUntil::Timestamp(ts),
149            None => ReadUntil::Unbounded,
150        }
151    }
152}
153
154impl From<ReadUntil> for Option<Timestamp> {
155    fn from(until: ReadUntil) -> Self {
156        match until {
157            ReadUntil::Unbounded => None,
158            ReadUntil::Timestamp(ts) => Some(ts),
159        }
160    }
161}
162
163impl ReadUntil {
164    pub fn is_unbounded(&self) -> bool {
165        matches!(self, ReadUntil::Unbounded)
166    }
167
168    pub fn is_timestamp(&self) -> bool {
169        matches!(self, ReadUntil::Timestamp(_))
170    }
171
172    pub fn allow(&self, timestamp: Timestamp) -> bool {
173        match self {
174            ReadUntil::Unbounded => true,
175            ReadUntil::Timestamp(t) => timestamp < *t,
176        }
177    }
178
179    pub fn deny(&self, timestamp: Timestamp) -> bool {
180        match self {
181            ReadUntil::Unbounded => false,
182            ReadUntil::Timestamp(t) => timestamp >= *t,
183        }
184    }
185}
186
187#[cfg(test)]
188mod test {
189    use rstest::rstest;
190
191    use super::{CountOrBytes, EvaluatedReadLimit, ReadLimit};
192
193    #[rstest]
194    #[case(
195        ReadLimit::Count(100),
196        10,
197        100000,
198        EvaluatedReadLimit::Remaining(ReadLimit::Count(90))
199    )]
200    #[case(ReadLimit::Count(100), 100, 100000, EvaluatedReadLimit::Exhausted)]
201    #[case(
202        ReadLimit::Bytes(100),
203        1000,
204        99,
205        EvaluatedReadLimit::Remaining(ReadLimit::Bytes(1))
206    )]
207    #[case(ReadLimit::CountOrBytes(CountOrBytes{count: 50, bytes: 50}), 40, 45, EvaluatedReadLimit::Remaining(ReadLimit::CountOrBytes(CountOrBytes{count: 10, bytes: 5})))]
208    #[case(ReadLimit::CountOrBytes(CountOrBytes{count: 50, bytes: 50}), 51, 45, EvaluatedReadLimit::Exhausted)]
209    fn remaining(
210        #[case] old_limit: ReadLimit,
211        #[case] consumed_count: usize,
212        #[case] consumed_bytes: usize,
213        #[case] remaining_limit: EvaluatedReadLimit,
214    ) {
215        assert_eq!(
216            old_limit.remaining(consumed_count, consumed_bytes),
217            remaining_limit
218        )
219    }
220}