1use std::{cmp::Ordering, num::NonZeroU64};
2
3use bao_tree::{ChunkNum, ChunkRanges};
4use range_collections::range_set::RangeSetRange;
5use serde::{Deserialize, Deserializer, Serialize};
6use smallvec::SmallVec;
7
8use crate::store::util::{
9 observer::{Combine, CombineInPlace},
10 RangeSetExt,
11};
12
13pub(crate) fn is_validated(size: NonZeroU64, ranges: &ChunkRanges) -> bool {
14 let size = size.get();
15 let last_chunk = ChunkNum::chunks(size) - 1;
17 ranges.contains(&last_chunk)
18}
19
20pub fn is_complete(size: NonZeroU64, ranges: &ChunkRanges) -> bool {
21 let complete = ChunkRanges::from(..ChunkNum::chunks(size.get()));
22 complete.is_subset(ranges)
24}
25
26#[derive(Debug, PartialEq, Eq, Clone, Default)]
28pub struct Bitfield {
29 pub(crate) size: u64,
31 pub ranges: ChunkRanges,
33}
34
35#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
36pub struct BitfieldState {
37 pub complete: bool,
38 pub validated_size: Option<u64>,
39}
40
41impl Serialize for Bitfield {
42 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
43 let mut numbers = SmallVec::<[_; 4]>::new();
44 numbers.push(self.size);
45 numbers.extend(self.ranges.boundaries().iter().map(|x| x.0));
46 numbers.serialize(serializer)
47 }
48}
49
50impl<'de> Deserialize<'de> for Bitfield {
51 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
52 let mut numbers: SmallVec<[u64; 4]> = SmallVec::deserialize(deserializer)?;
53
54 if numbers.is_empty() {
56 return Err(serde::de::Error::custom("Bitfield needs at least size"));
57 }
58
59 let size = numbers.remove(0);
61 let mut ranges = SmallVec::<[_; 2]>::new();
62 for num in numbers {
63 ranges.push(ChunkNum(num));
64 }
65 let Some(ranges) = ChunkRanges::new(ranges) else {
66 return Err(serde::de::Error::custom("Boundaries are not sorted"));
67 };
68 Ok(Bitfield::new(ranges, size))
69 }
70}
71
72impl Bitfield {
73 pub(crate) fn new_unchecked(ranges: ChunkRanges, size: u64) -> Self {
74 Self { ranges, size }
75 }
76
77 pub fn new(mut ranges: ChunkRanges, size: u64) -> Self {
78 if let Some(size) = NonZeroU64::new(size) {
80 let end = ChunkNum::chunks(size.get());
81 if ChunkRanges::from(..end).is_subset(&ranges) {
82 ranges = ChunkRanges::all();
84 } else if ranges.contains(&(end - 1)) {
85 ranges |= ChunkRanges::from(end..);
87 }
88 }
89 Self { ranges, size }
90 }
91
92 pub fn size(&self) -> u64 {
93 self.size
94 }
95
96 pub fn empty() -> Self {
98 Self {
99 ranges: ChunkRanges::empty(),
100 size: 0,
101 }
102 }
103
104 pub fn complete(size: u64) -> Self {
106 Self {
107 ranges: ChunkRanges::all(),
108 size,
109 }
110 }
111
112 pub fn is_validated(&self) -> bool {
114 if let Some(size) = NonZeroU64::new(self.size) {
115 is_validated(size, &self.ranges)
116 } else {
117 self.ranges.is_all()
118 }
119 }
120
121 pub fn is_complete(&self) -> bool {
123 if let Some(size) = NonZeroU64::new(self.size) {
124 is_complete(size, &self.ranges)
125 } else {
126 self.ranges.is_all()
127 }
128 }
129
130 pub fn validated_size(&self) -> Option<u64> {
132 if self.is_validated() {
133 Some(self.size)
134 } else {
135 None
136 }
137 }
138
139 pub fn total_bytes(&self) -> u64 {
141 let mut total = 0;
142 for range in self.ranges.iter() {
143 let (start, end) = match range {
144 RangeSetRange::Range(range) => {
145 (range.start.to_bytes(), range.end.to_bytes().min(self.size))
146 }
147 RangeSetRange::RangeFrom(range) => (range.start.to_bytes(), self.size),
148 };
149 total += end - start;
150 }
151 total
152 }
153
154 pub fn state(&self) -> BitfieldState {
155 BitfieldState {
156 complete: self.is_complete(),
157 validated_size: self.validated_size(),
158 }
159 }
160
161 pub fn update(&mut self, update: &Bitfield) -> UpdateResult {
166 let s0 = self.state();
167 let bitfield1 = self.clone().combine(update.clone());
169 if bitfield1 != *self {
170 let s1 = bitfield1.state();
171 *self = bitfield1;
172 if s0 != s1 {
173 UpdateResult::MajorChange(s0, s1)
174 } else {
175 UpdateResult::MinorChange(s1)
176 }
177 } else {
178 UpdateResult::NoChange(s0)
179 }
180 }
181
182 pub fn is_empty(&self) -> bool {
183 self.ranges.is_empty() && self.size == 0
184 }
185
186 pub fn diff(&self, that: &Bitfield) -> Self {
188 let size = choose_size(self, that);
189 let ranges_diff = &self.ranges ^ &that.ranges;
190 Self {
191 ranges: ranges_diff,
192 size,
193 }
194 }
195}
196
197pub fn choose_size(a: &Bitfield, b: &Bitfield) -> u64 {
198 match (a.ranges.upper_bound(), b.ranges.upper_bound()) {
199 (Some(ac), Some(bc)) => match ac.cmp(&bc) {
200 Ordering::Less => b.size,
201 Ordering::Greater => a.size,
202 Ordering::Equal => a.size.max(b.size),
203 },
204 (Some(_), None) => b.size,
205 (None, Some(_)) => a.size,
206 (None, None) => a.size.max(b.size),
207 }
208}
209
210impl Combine for Bitfield {
211 fn combine(self, that: Self) -> Self {
212 let size = choose_size(&self, &that);
214 let ranges = self.ranges | that.ranges;
215 Self::new(ranges, size)
216 }
217}
218
219impl CombineInPlace for Bitfield {
220 fn combine_with(&mut self, other: Self) -> Self {
221 let new = &other.ranges - &self.ranges;
222 if new.is_empty() {
223 return Bitfield::empty();
224 }
225 self.ranges.union_with(&new);
226 self.size = choose_size(self, &other);
227 Bitfield {
228 ranges: new,
229 size: self.size,
230 }
231 }
232
233 fn is_neutral(&self) -> bool {
234 self.ranges.is_empty() && self.size == 0
235 }
236}
237
238#[allow(clippy::enum_variant_names)]
239#[derive(Debug, Clone, Copy)]
240pub enum UpdateResult {
241 NoChange(BitfieldState),
242 MinorChange(BitfieldState),
243 MajorChange(BitfieldState, BitfieldState),
244}
245
246impl UpdateResult {
247 pub fn new_state(&self) -> &BitfieldState {
248 match self {
249 UpdateResult::NoChange(new) => new,
250 UpdateResult::MinorChange(new) => new,
251 UpdateResult::MajorChange(_, new) => new,
252 }
253 }
254
255 pub fn was_validated(&self) -> bool {
257 match self {
258 UpdateResult::NoChange(_) => false,
259 UpdateResult::MinorChange(_) => false,
260 UpdateResult::MajorChange(old, new) => {
261 new.validated_size.is_some() && old.validated_size.is_none()
262 }
263 }
264 }
265
266 pub fn changed(&self) -> bool {
267 match self {
268 UpdateResult::NoChange(_) => false,
269 UpdateResult::MinorChange(_) => true,
270 UpdateResult::MajorChange(_, _) => true,
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use bao_tree::{ChunkNum, ChunkRanges};
278 use proptest::prelude::{prop, Strategy};
279 use smallvec::SmallVec;
280 use test_strategy::proptest;
281
282 use super::Bitfield;
283 use crate::store::util::observer::{Combine, CombineInPlace};
284
285 fn gen_chunk_ranges(max: ChunkNum, k: usize) -> impl Strategy<Value = ChunkRanges> {
286 prop::collection::btree_set(0..=max.0, 0..=k).prop_map(|vec| {
287 let bounds = vec.into_iter().map(ChunkNum).collect::<SmallVec<[_; 2]>>();
288 ChunkRanges::new(bounds).unwrap()
289 })
290 }
291
292 fn gen_bitfields(size: u64, k: usize) -> impl Strategy<Value = Bitfield> {
293 (0..size).prop_flat_map(move |size| {
294 let chunks = ChunkNum::full_chunks(size);
295 gen_chunk_ranges(chunks, k).prop_map(move |ranges| Bitfield::new(ranges, size))
296 })
297 }
298
299 fn gen_non_empty_bitfields(size: u64, k: usize) -> impl Strategy<Value = Bitfield> {
300 gen_bitfields(size, k).prop_filter("non-empty", |x| !x.is_neutral())
301 }
302
303 #[proptest]
304 fn test_combine_empty(#[strategy(gen_non_empty_bitfields(32768, 4))] a: Bitfield) {
305 assert_eq!(a.clone().combine(Bitfield::empty()), a);
306 assert_eq!(Bitfield::empty().combine(a.clone()), a);
307 }
308
309 #[proptest]
310 fn test_combine_order(
311 #[strategy(gen_non_empty_bitfields(32768, 4))] a: Bitfield,
312 #[strategy(gen_non_empty_bitfields(32768, 4))] b: Bitfield,
313 ) {
314 let ab = a.clone().combine(b.clone());
315 let ba = b.combine(a);
316 assert_eq!(ab, ba);
317 }
318
319 #[proptest]
320 fn test_complete_normalized(#[strategy(gen_non_empty_bitfields(32768, 4))] a: Bitfield) {
321 if a.is_complete() {
322 assert_eq!(a.ranges, ChunkRanges::all());
323 }
324 }
325}