egglog_core_relations/offsets/
mod.rs

1use std::{cmp, fmt, mem};
2
3use crate::numeric_id::{NumericId, define_id};
4
5use crate::{
6    Pool,
7    pool::{Clear, Pooled, with_pool_set},
8};
9
10define_id!(pub RowId, u32, "a numeric offset into a table");
11
12#[cfg(test)]
13mod tests;
14
15/// A trait for types that represent a sequence of sorted offsets into a table.
16///
17/// NB: this trait may have outlived its usefulness. We may want to just get rid
18/// of it.
19pub(crate) trait Offsets {
20    // A half-open range enclosing the offsets in this sequence.
21    fn bounds(&self) -> Option<(RowId, RowId)>;
22    fn is_empty(&self) -> bool {
23        self.bounds().is_none_or(|(lo, hi)| lo == hi)
24    }
25    fn offsets(&self, f: impl FnMut(RowId));
26}
27
28#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
29pub struct OffsetRange {
30    pub(crate) start: RowId,
31    pub(crate) end: RowId,
32}
33
34impl Offsets for OffsetRange {
35    fn bounds(&self) -> Option<(RowId, RowId)> {
36        Some((self.start, self.end))
37    }
38
39    fn offsets(&self, f: impl FnMut(RowId)) {
40        RowId::range(self.start, self.end).for_each(f)
41    }
42}
43
44impl OffsetRange {
45    pub fn new(start: RowId, end: RowId) -> OffsetRange {
46        assert!(
47            start <= end,
48            "attempting to create malformed range {start:?}..{end:?}"
49        );
50        OffsetRange { start, end }
51    }
52    pub(crate) fn size(&self) -> usize {
53        self.end.index() - self.start.index()
54    }
55}
56
57#[derive(Default, Clone, PartialEq, Eq, Debug, Hash)]
58pub struct SortedOffsetVector(Vec<RowId>);
59
60impl SortedOffsetVector {
61    pub(crate) fn slice(&self) -> &SortedOffsetSlice {
62        // SAFETY: self.0 is sorted.
63        unsafe { SortedOffsetSlice::new_unchecked(&self.0) }
64    }
65
66    pub(crate) fn push(&mut self, offset: RowId) {
67        assert!(self.0.last().is_none_or(|last| last <= &offset));
68        // SAFETY: we just checked the invariant
69        unsafe { self.push_unchecked(offset) }
70    }
71
72    pub(crate) unsafe fn push_unchecked(&mut self, offset: RowId) {
73        self.0.push(offset)
74    }
75
76    pub(crate) fn retain(&mut self, mut f: impl FnMut(RowId) -> bool) {
77        self.0.retain(|off| f(*off))
78    }
79
80    pub(crate) fn extend_nonoverlapping(&mut self, other: &SortedOffsetSlice) {
81        if other.inner().is_empty() {
82            return;
83        }
84        if self.0.is_empty() {
85            self.0.extend(other.iter());
86            return;
87        }
88        if self.0.last().unwrap() <= other.inner().first().unwrap() {
89            self.0.extend(other.iter());
90            return;
91        }
92        panic!("attempting to extend with overlapping offsets")
93    }
94
95    /// Overwrite the contents of the current vector with those of the offset range.
96    pub(crate) fn fill_from_dense(&mut self, range: &OffsetRange) {
97        self.0.clear();
98        self.0
99            .extend((range.start.index()..range.end.index()).map(RowId::from_usize));
100    }
101}
102
103impl Clear for SortedOffsetVector {
104    fn clear(&mut self) {
105        self.0.clear()
106    }
107    fn reuse(&self) -> bool {
108        self.0.capacity() > 0
109    }
110    fn bytes(&self) -> usize {
111        self.0.capacity() * mem::size_of::<RowId>()
112    }
113}
114
115impl Offsets for SortedOffsetVector {
116    fn bounds(&self) -> Option<(RowId, RowId)> {
117        self.slice().bounds()
118    }
119
120    fn offsets(&self, f: impl FnMut(RowId)) {
121        self.slice().offsets(f)
122    }
123}
124
125#[derive(PartialEq, Eq)]
126#[repr(transparent)]
127pub struct SortedOffsetSlice([RowId]);
128
129impl fmt::Debug for SortedOffsetSlice {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_list().entries(self.0.iter()).finish()
132    }
133}
134
135impl SortedOffsetSlice {
136    pub(crate) unsafe fn new_unchecked(slice: &[RowId]) -> &SortedOffsetSlice {
137        debug_assert!(
138            slice.windows(2).all(|w| w[0] <= w[1]),
139            "slice is not sorted: {slice:?}"
140        );
141        // SAFETY: SortedOffsetSlice is repr(transparent), so the two layouts are compatible.
142        unsafe { mem::transmute::<&[RowId], &SortedOffsetSlice>(slice) }
143    }
144    fn len(&self) -> usize {
145        self.0.len()
146    }
147
148    pub(crate) fn iter(&self) -> impl Iterator<Item = RowId> + '_ {
149        self.0.iter().copied()
150    }
151
152    pub(crate) fn inner(&self) -> &[RowId] {
153        &self.0
154    }
155
156    pub(crate) fn subslice(&self, lo: usize, hi: usize) -> &SortedOffsetSlice {
157        // Safety: any subslice of a sorted slice is sorted.
158        unsafe { SortedOffsetSlice::new_unchecked(&self.inner()[lo..hi]) }
159    }
160
161    /// Return the index of the first offset in the slice that is greater than or equal to `target`.
162    pub(crate) fn binary_search_by_id(&self, target: RowId) -> usize {
163        self.binary_search_from(0, target)
164    }
165    fn binary_search_from(&self, start: usize, target: RowId) -> usize {
166        match self.inner()[start..].binary_search(&target) {
167            Ok(mut found) => {
168                found += start;
169                // This is O(n), but offset slices probably won't have duplicates at all.
170                while found > 0 && self.inner()[found - 1] == target {
171                    found -= 1;
172                }
173                found
174            }
175            Err(x) => start + x,
176        }
177    }
178
179    fn scan_for_offset(&self, start: usize, target: RowId) -> Result<usize, usize> {
180        let i = self.binary_search_from(start, target);
181        if i < self.len() && self.inner()[i] == target {
182            Ok(i)
183        } else {
184            Err(i)
185        }
186    }
187}
188
189impl Offsets for SortedOffsetSlice {
190    fn bounds(&self) -> Option<(RowId, RowId)> {
191        Some((
192            *self.0.first()?,
193            RowId::from_usize(self.0.last()?.index() + 1),
194        ))
195    }
196
197    fn offsets(&self, f: impl FnMut(RowId)) {
198        self.0.iter().copied().for_each(f)
199    }
200}
201
202impl Offsets for &'_ SortedOffsetSlice {
203    fn bounds(&self) -> Option<(RowId, RowId)> {
204        Some((
205            *self.0.first()?,
206            RowId::from_usize(self.0.last()?.index() + 1),
207        ))
208    }
209
210    fn offsets(&self, f: impl FnMut(RowId)) {
211        self.0.iter().copied().for_each(f)
212    }
213}
214
215#[derive(Copy, Clone)]
216pub enum SubsetRef<'a> {
217    Dense(OffsetRange),
218    Sparse(&'a SortedOffsetSlice),
219}
220
221impl Offsets for SubsetRef<'_> {
222    fn bounds(&self) -> Option<(RowId, RowId)> {
223        match self {
224            SubsetRef::Dense(r) => r.bounds(),
225            SubsetRef::Sparse(s) => s.bounds(),
226        }
227    }
228    fn offsets(&self, f: impl FnMut(RowId)) {
229        match self {
230            SubsetRef::Dense(r) => r.offsets(f),
231            SubsetRef::Sparse(s) => s.offsets(f),
232        }
233    }
234}
235
236impl SubsetRef<'_> {
237    pub(crate) fn size(&self) -> usize {
238        match self {
239            SubsetRef::Dense(range) => range.size(),
240            SubsetRef::Sparse(vec) => vec.0.len(),
241        }
242    }
243
244    pub(crate) fn to_owned(self, pool: &Pool<SortedOffsetVector>) -> Subset {
245        match self {
246            SubsetRef::Dense(r) => Subset::Dense(r),
247            SubsetRef::Sparse(s) => {
248                let mut vec = pool.get();
249                vec.extend_nonoverlapping(s);
250                Subset::Sparse(vec)
251            }
252        }
253    }
254
255    /// Get the underlying slice of a sparse subset. Used for debugging.
256    pub(crate) fn _slice(&self) -> &[RowId] {
257        match self {
258            SubsetRef::Dense(_) => panic!("getting slice from dense subset"),
259            SubsetRef::Sparse(slc) => slc.inner(),
260        }
261    }
262    pub(crate) fn iter_bounded(
263        self,
264        start: usize,
265        end: usize,
266        mut f: impl FnMut(RowId),
267    ) -> Option<usize> {
268        match self {
269            SubsetRef::Dense(r) => {
270                let mut cur = start;
271                for row in (r.start.index() + start.index())
272                    ..cmp::min(r.start.index().saturating_add(end), r.end.index())
273                {
274                    f(RowId::new(row as _));
275                    cur += 1;
276                }
277                if cur + r.start.index() < r.end.index() {
278                    Some(cur)
279                } else {
280                    None
281                }
282            }
283            SubsetRef::Sparse(vec) => {
284                let end = cmp::min(vec.0.len(), end);
285                let next = if end == vec.0.len() { None } else { Some(end) };
286                vec.0[start..end].iter().copied().for_each(f);
287                next
288            }
289        }
290    }
291}
292
293/// Either or an offset range or a sorted offset vector.
294#[derive(Debug, Hash, PartialEq, Eq)]
295pub enum Subset {
296    Dense(OffsetRange),
297    Sparse(Pooled<SortedOffsetVector>),
298}
299
300impl Offsets for Subset {
301    fn bounds(&self) -> Option<(RowId, RowId)> {
302        match self {
303            Subset::Dense(r) => r.bounds(),
304            Subset::Sparse(s) => s.slice().bounds(),
305        }
306    }
307    fn offsets(&self, f: impl FnMut(RowId)) {
308        match self {
309            Subset::Dense(r) => r.offsets(f),
310            Subset::Sparse(s) => s.slice().offsets(f),
311        }
312    }
313}
314
315impl Clone for Subset {
316    fn clone(&self) -> Self {
317        match self {
318            Subset::Dense(r) => Subset::Dense(*r),
319            Subset::Sparse(s) => Subset::Sparse(Pooled::cloned(s)),
320        }
321    }
322}
323
324// TODO: consider making Subset::Sparse an Rc, so copies are shallow?
325
326impl Subset {
327    /// The size of the subset.
328    pub fn size(&self) -> usize {
329        match self {
330            Subset::Dense(range) => range.size(),
331            Subset::Sparse(vec) => vec.0.len(),
332        }
333    }
334
335    pub(crate) fn is_dense(&self) -> bool {
336        matches!(self, Subset::Dense(_))
337    }
338
339    pub fn as_ref(&self) -> SubsetRef<'_> {
340        match self {
341            Subset::Dense(r) => SubsetRef::Dense(*r),
342            Subset::Sparse(s) => SubsetRef::Sparse(s.slice()),
343        }
344    }
345
346    pub(crate) fn retain(&mut self, mut filter: impl FnMut(RowId) -> bool) {
347        match self {
348            Subset::Dense(offs) => {
349                let mut res = Subset::empty();
350                offs.offsets(|row| {
351                    if filter(row) {
352                        res.add_row_sorted(row);
353                    }
354                });
355                *self = res;
356            }
357            Subset::Sparse(offs) => offs.retain(filter),
358        }
359    }
360    /// Remove any elements of the current subset not present in `other`.
361    pub(crate) fn intersect(&mut self, other: SubsetRef, pool: &Pool<SortedOffsetVector>) {
362        match (self, other) {
363            (Subset::Dense(cur), SubsetRef::Dense(other)) => {
364                let resl = cmp::max(cur.start, other.start);
365                let resr = cmp::min(cur.end, other.end);
366                if resl >= resr {
367                    *cur = OffsetRange::new(resl, resl);
368                } else {
369                    *cur = OffsetRange::new(resl, resr);
370                }
371            }
372            (x @ Subset::Dense(_), SubsetRef::Sparse(sparse)) => {
373                let (low, hi) = x.bounds().unwrap();
374                if sparse.bounds().is_some() {
375                    let mut res = pool.get();
376                    let l = sparse.binary_search_by_id(low);
377                    let r = sparse.binary_search_by_id(hi);
378                    let subslice = sparse.subslice(l, r);
379                    res.extend_nonoverlapping(subslice);
380                    *x = Subset::Sparse(res);
381                } else {
382                    // empty range
383                    *x = Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)));
384                }
385            }
386            (Subset::Sparse(sparse), SubsetRef::Dense(dense)) => {
387                let r = sparse.slice().binary_search_by_id(dense.end);
388                sparse.0.truncate(r);
389                sparse.retain(|row| row >= dense.start);
390            }
391            (Subset::Sparse(cur), SubsetRef::Sparse(other)) => {
392                let mut other_off = 0;
393                cur.retain(|rowid| match other.scan_for_offset(other_off, rowid) {
394                    Ok(found) => {
395                        other_off = found + 1;
396                        true
397                    }
398                    Err(next_off) => {
399                        other_off = next_off;
400                        false
401                    }
402                })
403            }
404        }
405    }
406
407    /// Append the given row id to the Subset.
408    ///
409    /// # Panics
410    /// The row id in question must be greater than or equal to the upper bound
411    /// of the subset. This method will panic if it is not.
412    pub(crate) fn add_row_sorted(&mut self, row: RowId) {
413        match self {
414            Subset::Dense(range) => {
415                if range.end == range.start {
416                    range.start = row;
417                    range.end = row.inc();
418                    return;
419                }
420                if range.end == row {
421                    range.end = row.inc();
422                    return;
423                }
424                let mut vec = with_pool_set(|pool_set| pool_set.get::<SortedOffsetVector>());
425                vec.fill_from_dense(range);
426                vec.push(row);
427                *self = Subset::Sparse(vec);
428            }
429            Subset::Sparse(s) => {
430                s.push(row);
431            }
432        }
433    }
434
435    pub(crate) fn empty() -> Subset {
436        Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
437    }
438}