Skip to main content

reifydb_core/value/column/
mask.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4// Per-row selection bitmap. Set bit = row is selected/kept. This is the dual
5// of `NoneBitmap` - the storage layout is identical but the semantics are
6// opposite, so the two are kept as distinct types to prevent accidental mixing.
7#[derive(Clone, Debug, PartialEq, Eq)]
8pub struct RowMask {
9	words: Vec<u64>,
10	len: usize,
11}
12
13impl RowMask {
14	pub fn all_set(len: usize) -> Self {
15		let word_count = len.div_ceil(64);
16		let mut words = vec![u64::MAX; word_count];
17		let trailing = len % 64;
18		if trailing != 0 && word_count > 0 {
19			words[word_count - 1] = (1u64 << trailing) - 1;
20		}
21		Self {
22			words,
23			len,
24		}
25	}
26
27	pub fn none_set(len: usize) -> Self {
28		Self {
29			words: vec![0u64; len.div_ceil(64)],
30			len,
31		}
32	}
33
34	pub fn len(&self) -> usize {
35		self.len
36	}
37
38	pub fn is_empty(&self) -> bool {
39		self.len == 0
40	}
41
42	pub fn get(&self, row: usize) -> bool {
43		debug_assert!(row < self.len);
44		(self.words[row / 64] >> (row % 64)) & 1 == 1
45	}
46
47	pub fn set(&mut self, row: usize, value: bool) {
48		debug_assert!(row < self.len);
49		let word = &mut self.words[row / 64];
50		let bit = 1u64 << (row % 64);
51		if value {
52			*word |= bit;
53		} else {
54			*word &= !bit;
55		}
56	}
57
58	pub fn popcount(&self) -> usize {
59		let word_count = self.words.len();
60		if word_count == 0 {
61			return 0;
62		}
63		let mut count = 0usize;
64		for &w in &self.words[..word_count - 1] {
65			count += w.count_ones() as usize;
66		}
67		let trailing = self.len - 64 * (word_count - 1);
68		let mask = if trailing == 64 {
69			u64::MAX
70		} else {
71			(1u64 << trailing) - 1
72		};
73		count += (self.words[word_count - 1] & mask).count_ones() as usize;
74		count
75	}
76
77	pub fn and(&self, other: &Self) -> Self {
78		assert_eq!(self.len, other.len, "RowMask::and length mismatch");
79		let words = self.words.iter().zip(&other.words).map(|(a, b)| a & b).collect();
80		Self {
81			words,
82			len: self.len,
83		}
84	}
85
86	pub fn or(&self, other: &Self) -> Self {
87		assert_eq!(self.len, other.len, "RowMask::or length mismatch");
88		let words = self.words.iter().zip(&other.words).map(|(a, b)| a | b).collect();
89		Self {
90			words,
91			len: self.len,
92		}
93	}
94
95	pub fn not(&self) -> Self {
96		let word_count = self.words.len();
97		let mut words: Vec<u64> = self.words.iter().map(|w| !w).collect();
98		let trailing = self.len % 64;
99		if trailing != 0 && word_count > 0 {
100			words[word_count - 1] &= (1u64 << trailing) - 1;
101		}
102		Self {
103			words,
104			len: self.len,
105		}
106	}
107
108	// Extract a sub-mask covering rows `[start, end)`. Used by predicate
109	// pushdown to align a batch-wide mask with each individual chunk slice.
110	pub fn slice(&self, start: usize, end: usize) -> Self {
111		debug_assert!(start <= end, "RowMask::slice: start {start} > end {end}");
112		debug_assert!(end <= self.len, "RowMask::slice: end {end} > len {}", self.len);
113		let new_len = end - start;
114		let mut out = Self::none_set(new_len);
115		for i in 0..new_len {
116			if self.get(start + i) {
117				out.set(i, true);
118			}
119		}
120		out
121	}
122
123	// Concatenate per-chunk masks into a single block-wide mask. Used by
124	// multi-chunk predicate evaluation to assemble a column-spanning mask from
125	// independent per-chunk evaluations.
126	pub fn concat(parts: &[Self]) -> Self {
127		let total: usize = parts.iter().map(|m| m.len).sum();
128		let mut out = Self::none_set(total);
129		let mut row_offset = 0;
130		for part in parts {
131			for i in 0..part.len {
132				if part.get(i) {
133					out.set(row_offset + i, true);
134				}
135			}
136			row_offset += part.len;
137		}
138		out
139	}
140}
141
142#[cfg(test)]
143mod tests {
144	use super::*;
145
146	#[test]
147	fn all_set_counts_every_row() {
148		let m = RowMask::all_set(100);
149		assert_eq!(m.popcount(), 100);
150		assert!(m.get(0));
151		assert!(m.get(99));
152	}
153
154	#[test]
155	fn none_set_has_zero_popcount() {
156		let m = RowMask::none_set(10);
157		assert_eq!(m.popcount(), 0);
158		assert!(!m.get(0));
159	}
160
161	#[test]
162	fn not_inverts_within_length_only() {
163		let mut m = RowMask::none_set(65);
164		m.set(0, true);
165		m.set(64, true);
166		let inverted = m.not();
167		assert_eq!(inverted.popcount(), 63);
168		assert!(!inverted.get(0));
169		assert!(inverted.get(1));
170		assert!(!inverted.get(64));
171	}
172
173	#[test]
174	fn and_or_combine_masks() {
175		let mut a = RowMask::none_set(8);
176		a.set(1, true);
177		a.set(3, true);
178		a.set(5, true);
179		let mut b = RowMask::none_set(8);
180		b.set(3, true);
181		b.set(5, true);
182		b.set(7, true);
183		assert_eq!(a.and(&b).popcount(), 2);
184		assert_eq!(a.or(&b).popcount(), 4);
185	}
186
187	#[test]
188	fn concat_appends_each_part_at_its_offset() {
189		let mut a = RowMask::none_set(3);
190		a.set(0, true);
191		a.set(2, true);
192		let mut b = RowMask::none_set(2);
193		b.set(1, true);
194		let mut c = RowMask::none_set(4);
195		c.set(0, true);
196		c.set(3, true);
197		let combined = RowMask::concat(&[a, b, c]);
198		assert_eq!(combined.len(), 9);
199		assert!(combined.get(0));
200		assert!(!combined.get(1));
201		assert!(combined.get(2));
202		assert!(!combined.get(3));
203		assert!(combined.get(4));
204		assert!(combined.get(5));
205		assert!(!combined.get(6));
206		assert!(!combined.get(7));
207		assert!(combined.get(8));
208	}
209
210	#[test]
211	fn concat_handles_word_boundary_crossings() {
212		let a = RowMask::all_set(70);
213		let b = RowMask::all_set(70);
214		let combined = RowMask::concat(&[a, b]);
215		assert_eq!(combined.len(), 140);
216		assert_eq!(combined.popcount(), 140);
217	}
218
219	#[test]
220	fn concat_empty_parts_yield_empty_mask() {
221		let combined = RowMask::concat(&[]);
222		assert_eq!(combined.len(), 0);
223		assert_eq!(combined.popcount(), 0);
224	}
225
226	#[test]
227	fn slice_extracts_inner_window() {
228		let mut m = RowMask::none_set(8);
229		m.set(1, true);
230		m.set(3, true);
231		m.set(5, true);
232		m.set(7, true);
233		let s = m.slice(2, 6);
234		assert_eq!(s.len(), 4);
235		assert!(!s.get(0));
236		assert!(s.get(1));
237		assert!(!s.get(2));
238		assert!(s.get(3));
239	}
240
241	#[test]
242	fn slice_crosses_word_boundary() {
243		let mut m = RowMask::none_set(140);
244		m.set(60, true);
245		m.set(64, true);
246		m.set(70, true);
247		let s = m.slice(50, 80);
248		assert_eq!(s.len(), 30);
249		assert_eq!(s.popcount(), 3);
250		assert!(s.get(10));
251		assert!(s.get(14));
252		assert!(s.get(20));
253	}
254
255	#[test]
256	fn slice_full_range_equals_self() {
257		let mut m = RowMask::none_set(10);
258		m.set(0, true);
259		m.set(4, true);
260		m.set(9, true);
261		let s = m.slice(0, 10);
262		assert_eq!(s, m);
263	}
264
265	#[test]
266	fn slice_empty_range_yields_empty_mask() {
267		let m = RowMask::all_set(10);
268		let s = m.slice(5, 5);
269		assert_eq!(s.len(), 0);
270		assert_eq!(s.popcount(), 0);
271	}
272}