Skip to main content

hexane/
cursor.rs

1use super::aggregate::{Acc, Agg};
2use super::columndata::ColumnData;
3use super::encoder::{Encoder, EncoderState, SpliceEncoder, Writer};
4use super::pack::{MaybePackable, PackError, Packable};
5use super::slab::{Slab, SlabWeight, SlabWriter, SpanWeight};
6use super::Cow;
7
8use std::fmt::Debug;
9use std::ops::Range;
10
11pub trait HasMinMax {
12    fn min(&self) -> Agg;
13    fn max(&self) -> Agg;
14    fn intersects(&self, a: Range<usize>) -> bool {
15        let b = self.min().as_usize()..self.max().as_usize();
16        a.start <= b.end && b.start <= a.end
17    }
18}
19
20/// Implemented by slab weight / span-weight types that carry a cumulative item count.
21pub trait HasPos {
22    fn pos(&self) -> usize;
23}
24
25/// Implemented by slab weight / span-weight types that carry a cumulative [`Acc`] sum.
26pub trait HasAcc {
27    fn acc(&self) -> Acc;
28}
29
30impl HasPos for SlabWeight {
31    fn pos(&self) -> usize {
32        self.pos
33    }
34}
35
36impl HasMinMax for SlabWeight {
37    fn min(&self) -> Agg {
38        self.min
39    }
40    fn max(&self) -> Agg {
41        self.max
42    }
43}
44
45impl HasAcc for SlabWeight {
46    fn acc(&self) -> Acc {
47        self.acc
48    }
49}
50
51/*
52#[derive(Debug, PartialEq)]
53pub enum MyCow<'a, T: PartialEq + ?Sized + ToOwned> {
54  Owned(T::Owned),
55  Borrowed(&'a T)
56}
57*/
58
59/// A single RLE run: `count` consecutive repetitions of `value`.
60///
61/// `value` is `None` for null runs. `Run` also implements `Iterator`, yielding
62/// `Option<Cow<'a, P>>` for each repetition until the count is exhausted.
63#[derive(Debug, PartialEq, Default)]
64pub struct Run<'a, P: Packable + ?Sized> {
65    pub count: usize,
66    pub value: Option<Cow<'a, P>>,
67}
68
69impl<'a, P: Packable + ?Sized> Copy for Run<'a, P> where Cow<'a, P>: Copy {}
70
71impl<P: Packable + ?Sized> Clone for Run<'_, P> {
72    fn clone(&self) -> Self {
73        Run {
74            count: self.count,
75            value: self.value.clone(),
76        }
77    }
78}
79
80impl<'a, P: Packable + ?Sized> Iterator for Run<'a, P> {
81    type Item = Option<Cow<'a, P>>;
82
83    fn next(&mut self) -> Option<Self::Item> {
84        if self.count >= 1 {
85            self.count -= 1;
86            Some(self.value.clone())
87        } else {
88            None
89        }
90    }
91
92    fn nth(&mut self, n: usize) -> Option<Self::Item> {
93        if self.count > n {
94            self.count -= n + 1;
95            Some(self.value.clone())
96        } else {
97            self.count = 0;
98            None
99        }
100    }
101}
102
103impl<'a, T: Packable + ?Sized> Run<'a, T> {
104    pub(crate) fn pop_n(&self, n: usize) -> Option<Run<'a, T>> {
105        if self.count <= n {
106            None
107        } else {
108            let count = self.count - n;
109            let value = self.value.clone();
110            Some(Run { count, value })
111        }
112    }
113
114    pub(crate) fn pop(&self) -> Option<Run<'a, T>> {
115        self.pop_n(1)
116    }
117
118    pub fn agg(&self) -> Agg {
119        self.value.as_ref().map(|i| T::agg(i)).unwrap_or_default()
120    }
121    pub fn acc(&self) -> Acc {
122        self.agg() * self.count
123    }
124}
125
126impl<T: Packable<Owned = T>> Run<'_, T> {
127    pub fn init(count: usize, value: T) -> Self {
128        Self {
129            count,
130            value: Some(Cow::Owned(value)),
131        }
132    }
133}
134
135impl Run<'_, i64> {
136    pub fn delta(&self) -> i64 {
137        self.count as i64 * self.value.as_deref().cloned().unwrap_or(0)
138    }
139}
140
141impl<'a, T: Packable + ?Sized> Run<'a, T> {
142    pub fn new(count: usize, value: Option<Cow<'a, T>>) -> Self {
143        Run { count, value }
144    }
145
146    pub fn plus(mut self, num: usize) -> Self {
147        self.count += num;
148        self
149    }
150}
151
152/// Core trait for cursor types that know how to decode a specific columnar encoding.
153///
154/// A `ColumnCursor` is a stateful byte-level decoder for a single slab. It advances through
155/// the slab byte-by-byte, yielding [`Run`]s of decoded values. The cursor is embedded in
156/// [`RunIter`] and ultimately drives [`ColumnDataIter`](crate::ColumnDataIter).
157///
158/// You rarely need to implement this trait directly; use one of the built-in cursor type
159/// aliases ([`UIntCursor`](crate::UIntCursor), [`StrCursor`](crate::StrCursor), etc.) or
160/// implement [`Packable`] for your type and use `RleCursor<B, T>`.
161///
162/// # Associated Types
163/// - `Item` — the decoded value type (e.g. `u64`, `str`)
164/// - `Export` — the type returned by `to_vec()` (often `Option<Item::Owned>`)
165/// - `SlabIndex` — the weight type stored in the B-tree for this cursor
166/// - `State` / `PostState` — encoder state used during splice
167pub trait ColumnCursor: Debug + Clone + Copy + PartialEq + Default {
168    type Item: Packable + ?Sized;
169    type State<'a>: EncoderState<'a, Self::Item>
170    where
171        <Self as ColumnCursor>::Item: 'a;
172    type PostState<'a>: Debug
173    where
174        Self::Item: 'a;
175    type Export: Debug + PartialEq + Clone;
176    type SlabIndex: Debug + Clone + HasPos + HasAcc + SpanWeight<Slab>;
177
178    // TODO: needs a test
179    #[inline(never)]
180    fn encode<'a, M, I>(out: &mut Vec<u8>, values: I) -> Range<usize>
181    where
182        M: MaybePackable<'a, Self::Item>,
183        I: IntoIterator<Item = M>,
184        Self::Item: 'a,
185    {
186        let start = out.len();
187        let mut state = Self::State::default();
188        for v in values {
189            state.append(out, v.maybe_packable());
190        }
191        state.flush(out);
192        let end = out.len();
193        start..end
194    }
195
196    fn encode_unless_empty<'a, M, I>(out: &mut Vec<u8>, values: I) -> Range<usize>
197    where
198        M: MaybePackable<'a, Self::Item>,
199        I: IntoIterator<Item = M>,
200        Self::Item: 'a,
201    {
202        let start = out.len();
203        let mut state = Self::State::default();
204        for v in values {
205            state.append(out, v.maybe_packable());
206        }
207        if out.len() == start && state.is_empty() {
208            out.truncate(start);
209            return start..start;
210        }
211        state.flush(out);
212        let end = out.len();
213        start..end
214    }
215
216    fn empty() -> Self;
217
218    fn new(_: &Slab) -> Self {
219        Self::empty()
220    }
221
222    fn iter(slab: &[u8]) -> CursorIter<'_, Self> {
223        CursorIter {
224            slab,
225            cursor: Self::empty(),
226            run: None,
227        }
228    }
229
230    fn compute_min_max(_slabs: &mut [Slab]) {
231        for s in _slabs {
232            let (_run, c) = Self::seek(s.len(), s);
233            let _next = c.clone().next(s.as_slice());
234            assert!(_run.is_some());
235            assert!(_next.is_none());
236        }
237    }
238
239    fn is_empty(v: Option<Cow<'_, Self::Item>>) -> bool {
240        v.is_none()
241    }
242
243    fn contains_range(
244        &self,
245        run: &Run<'_, Self::Item>,
246        range: &Range<usize>,
247    ) -> Option<Range<usize>> {
248        let runval = <Self::Item>::maybe_agg(&run.value).as_usize();
249        if range.contains(&runval) {
250            Some(0..run.count)
251        } else {
252            None
253        }
254    }
255
256    fn contains(&self, run: &Run<'_, Self::Item>, target: Agg) -> Option<Range<usize>> {
257        let start = target.as_usize();
258        let range = start..(start + 1);
259        let old = self.contains_agg(run, target);
260        let new = self.contains_range(run, &range);
261        assert_eq!(old, new);
262        old
263    }
264
265    fn contains_agg(&self, run: &Run<'_, Self::Item>, agg: Agg) -> Option<Range<usize>> {
266        if agg == <Self::Item>::maybe_agg(&run.value) {
267            Some(0..run.count)
268        } else {
269            None
270        }
271    }
272
273    fn pop<'a>(&self, run: &mut Run<'a, Self::Item>) -> Option<Option<Cow<'a, Self::Item>>> {
274        run.next()
275    }
276
277    fn pop_n<'a>(
278        &self,
279        run: &mut Run<'a, Self::Item>,
280        n: usize,
281    ) -> Option<Option<Cow<'a, Self::Item>>> {
282        assert!(n > 0);
283        run.nth(n - 1)
284    }
285
286    // ENCODER
287    fn finalize_state<'a>(
288        slab: &'a Slab,
289        encoder: &mut Encoder<'a, Self>,
290        post: Self::PostState<'a>,
291        cursor: Self,
292    ) -> Option<Self>;
293
294    // ENCODER
295    fn finish<'a>(slab: &'a Slab, writer: &mut SlabWriter<'a, Self::Item>, cursor: Self);
296
297    fn copy_between<'a>(
298        slab: &'a [u8],
299        writer: &mut SlabWriter<'a, Self::Item>,
300        c0: Self,
301        c1: Self,
302        run: Run<'a, Self::Item>,
303        size: usize,
304    ) -> Self::State<'a>;
305
306    fn splice_encoder(index: usize, del: usize, slab: &Slab) -> SpliceEncoder<'_, Self>;
307
308    fn slab_size() -> usize;
309
310    fn try_next<'a>(&mut self, data: &'a [u8]) -> Result<Option<Run<'a, Self::Item>>, PackError>;
311
312    fn try_again<'a>(&self, data: &'a [u8]) -> Result<Option<Run<'a, Self::Item>>, PackError>;
313
314    fn export_splice<'a, I>(data: &mut Vec<Self::Export>, range: Range<usize>, values: I)
315    where
316        I: Iterator<Item = Option<Cow<'a, Self::Item>>>,
317        Self::Item: 'a;
318
319    fn next<'a>(&mut self, data: &'a [u8]) -> Option<Run<'a, Self::Item>> {
320        match self.try_next(data).unwrap() {
321            Some(run) if run.count == 0 => self.next(data),
322            result => result,
323        }
324    }
325
326    fn index(&self) -> usize;
327
328    fn offset(&self) -> usize;
329
330    fn acc(&self) -> Acc {
331        Acc::new()
332    }
333
334    fn min(&self) -> Agg {
335        Agg::default()
336    }
337
338    fn max(&self) -> Agg {
339        Agg::default()
340    }
341
342    fn seek(index: usize, slab: &Slab) -> (Option<Run<'_, Self::Item>>, Self) {
343        if index == 0 {
344            return (None, Self::new(slab));
345        } else {
346            let mut cursor = Self::new(slab);
347            while let Some(val) = cursor.next(slab.as_slice()) {
348                if cursor.index() >= index {
349                    return (Some(val), cursor);
350                }
351            }
352        }
353        panic!()
354    }
355
356    fn debug_scan<F>(data: &[u8], test: &F) -> Result<Self, PackError>
357    where
358        F: Fn(Option<&Self::Item>) -> Option<String>,
359    {
360        let mut cursor = Self::empty();
361        while let Some(val) = cursor.try_next(data)? {
362            Self::Item::validate(val.value.as_deref(), test)?;
363        }
364        Ok(cursor)
365    }
366
367    fn load_with<F>(data: &[u8], test: &F) -> Result<ColumnData<Self>, PackError>
368    where
369        F: Fn(Option<&Self::Item>) -> Option<String>;
370
371    fn load(data: &[u8]) -> Result<ColumnData<Self>, PackError> {
372        Self::load_with(data, &|_| None)
373    }
374
375    fn splice<'a, 'b, I, M>(
376        slab: &'a Slab,
377        index: usize,
378        del: usize,
379        values: I,
380        #[cfg(feature = "slow_path_assertions")] debug: (&mut Vec<Self::Export>, Range<usize>),
381    ) -> SpliceResult
382    where
383        M: MaybePackable<'b, Self::Item>,
384        I: Iterator<Item = M>,
385        Self::Item: 'b,
386    {
387        #[cfg(feature = "slow_path_assertions")]
388        let mut copy_of_values = vec![];
389        let mut encoder = Self::splice_encoder(index, del, slab);
390        let mut add = 0;
391        let mut value_acc = Acc::new();
392        for v in values {
393            value_acc += v.agg();
394            let opt_v = v.maybe_packable();
395            #[cfg(feature = "slow_path_assertions")]
396            copy_of_values.push(opt_v.clone());
397            add += encoder.append_item(opt_v);
398        }
399
400        #[cfg(feature = "slow_path_assertions")]
401        Self::export_splice(debug.0, debug.1, copy_of_values.into_iter());
402
403        let overflow = encoder.overflow;
404        let del = encoder.deleted;
405        let group = encoder.acc;
406        let slabs = encoder.finish();
407        if del == 0 {
408            assert_eq!(
409                slabs.iter().map(|s| s.acc()).sum::<Acc>(),
410                slab.acc() + value_acc
411            );
412            assert_eq!(
413                slabs.iter().map(|s| s.len()).sum::<usize>(),
414                slab.len() + add
415            );
416        }
417        SpliceResult {
418            add,
419            del,
420            overflow,
421            group,
422            slabs,
423        }
424    }
425
426    fn splice_delete<'a>(
427        _post: Option<Run<'a, Self::Item>>,
428        _cursor: Self,
429        _del: usize,
430        slab: &'a Slab,
431    ) -> SpliceDel<'a, Self> {
432        let mut cursor = _cursor;
433        let mut post = _post;
434        let mut del = _del;
435        let mut overflow = 0;
436        let mut deleted = 0;
437        while del > 0 {
438            match post {
439                // if del is less than the current run
440                Some(Run { count, value }) if del < count => {
441                    deleted += del;
442                    post = Some(Run {
443                        count: count - del,
444                        value,
445                    });
446                    del = 0;
447                }
448                // if del is greather than or equal the current run
449                Some(Run { count, .. }) => {
450                    del -= count;
451                    deleted += count;
452                    post = None;
453                }
454                None => {
455                    if let Some(p) = Self::next(&mut cursor, slab.as_slice()) {
456                        post = Some(p);
457                    } else {
458                        post = None;
459                        overflow = del;
460                        del = 0;
461                    }
462                }
463            }
464        }
465        assert!(_del == deleted + overflow);
466        SpliceDel {
467            deleted,
468            overflow,
469            cursor,
470            post,
471        }
472    }
473
474    fn init_empty(len: usize) -> Slab {
475        if len > 0 {
476            let mut writer = SlabWriter::<Self::Item>::new(usize::MAX, false);
477            writer.flush_null(len);
478            writer.finish().pop().unwrap_or_default()
479        } else {
480            Slab::default()
481        }
482    }
483}
484
485pub struct SpliceDel<'a, C: ColumnCursor> {
486    pub(crate) deleted: usize,
487    pub(crate) overflow: usize,
488    pub(crate) cursor: C,
489    pub(crate) post: Option<Run<'a, C::Item>>,
490}
491
492pub struct SpliceResult {
493    pub(crate) add: usize,
494    pub(crate) del: usize,
495    pub(crate) overflow: usize,
496    pub(crate) group: Acc,
497    pub(crate) slabs: Vec<Slab>,
498}
499
500/// A low-level iterator that decodes [`Run`]s from a raw byte slice using a [`ColumnCursor`].
501///
502/// Unlike [`ColumnDataIter`](crate::ColumnDataIter), `CursorIter` operates on a single
503/// flat `&[u8]` rather than a multi-slab tree, and returns `Result` to surface decode errors.
504/// It is most useful when reading directly from a serialized buffer.
505// TODO: needs tests
506#[derive(Debug)]
507pub struct CursorIter<'a, C: ColumnCursor> {
508    pub(crate) slab: &'a [u8],
509    pub(crate) cursor: C,
510    pub(crate) run: Option<Run<'a, C::Item>>,
511}
512
513impl<C: ColumnCursor> Clone for CursorIter<'_, C> {
514    fn clone(&self) -> Self {
515        CursorIter {
516            slab: self.slab,
517            cursor: self.cursor,
518            run: self.run.clone(),
519        }
520    }
521}
522
523impl<'a, C: ColumnCursor> CursorIter<'a, C> {
524    fn next_run(&mut self) -> Result<Option<Run<'a, C::Item>>, PackError> {
525        while let Some(run) = self.cursor.try_next(self.slab)? {
526            if run.count > 0 {
527                return Ok(Some(run));
528            }
529        }
530        Ok(None)
531    }
532}
533
534impl<'a, C: ColumnCursor> Iterator for CursorIter<'a, C>
535where
536    C::Item: 'a,
537{
538    type Item = Result<Option<Cow<'a, C::Item>>, PackError>;
539
540    fn next(&mut self) -> Option<Self::Item> {
541        match self.run.as_mut() {
542            Some(run) if run.count > 0 => Ok(self.cursor.pop(run)).transpose(),
543            _ => match self.next_run() {
544                Ok(Some(mut run)) if run.count > 0 => {
545                    let value = self.cursor.pop(&mut run);
546                    self.run = Some(run);
547                    Ok(value).transpose()
548                }
549                Ok(_) => None,
550                Err(e) => Some(Err(e)),
551            },
552        }
553    }
554}
555
556#[derive(Debug, Clone, Default, Copy)]
557pub struct RunIter<'a, C: ColumnCursor> {
558    pub(crate) slab: &'a [u8],
559    pub(crate) cursor: C,
560    pub(crate) pos_left: usize,
561    pub(crate) acc_left: Acc,
562}
563
564#[derive(Debug, Clone, Default, Copy)]
565pub(crate) struct RunIterState<C: ColumnCursor> {
566    pub(crate) cursor: C,
567    pub(crate) pos_left: usize,
568    pub(crate) acc_left: Acc,
569}
570
571#[derive(Debug, Clone, Default)]
572pub(crate) struct RunIterContaining1<'a, C: ColumnCursor>
573where
574    C::Item: 'a,
575{
576    pub(crate) iter: RunIter<'a, C>,
577    pub(crate) pos: usize,
578    pub(crate) target: Agg,
579    pub(crate) range: Range<usize>,
580}
581
582#[derive(Debug, Clone, Default)]
583pub(crate) struct RunIterContaining2<'a, C: ColumnCursor>
584where
585    C::Item: 'a,
586{
587    pub(crate) iter: RunIter<'a, C>,
588    pub(crate) pos: usize,
589    pub(crate) target: Range<usize>,
590    pub(crate) range: Range<usize>,
591}
592
593impl<C: ColumnCursor> Iterator for RunIterContaining1<'_, C> {
594    type Item = usize;
595
596    fn next(&mut self) -> Option<usize> {
597        while self.range.is_empty() {
598            let run = self.iter.next()?; // return None
599            if let Some(range) = self.iter.cursor.contains(&run, self.target) {
600                self.range.start = range.start + self.pos;
601                self.range.end = range.end + self.pos;
602            }
603            self.pos += run.count;
604        }
605        self.range.next()
606    }
607}
608
609impl<C: ColumnCursor> Iterator for RunIterContaining2<'_, C> {
610    type Item = usize;
611
612    fn next(&mut self) -> Option<usize> {
613        while self.range.is_empty() {
614            let run = self.iter.next()?; // return None
615            if let Some(range) = self.iter.cursor.contains_range(&run, &self.target) {
616                self.range.start = range.start + self.pos;
617                self.range.end = range.end + self.pos;
618            }
619            self.pos += run.count;
620        }
621        self.range.next()
622    }
623}
624
625impl<'a, C: ColumnCursor> RunIter<'a, C> {
626    pub fn empty() -> Self {
627        RunIter {
628            slab: &[],
629            cursor: C::empty(),
630            pos_left: 0,
631            acc_left: Acc::new(),
632        }
633    }
634
635    pub(crate) fn resume(slab: &'a [u8], state: RunIterState<C>) -> RunIter<'a, C> {
636        RunIter {
637            slab,
638            cursor: state.cursor,
639            pos_left: state.pos_left,
640            acc_left: state.acc_left,
641        }
642    }
643
644    pub(crate) fn suspend(&self) -> RunIterState<C> {
645        RunIterState {
646            cursor: self.cursor,
647            pos_left: self.pos_left,
648            acc_left: self.acc_left,
649        }
650    }
651
652    pub(crate) fn current(&self) -> Option<Run<'a, C::Item>> {
653        self.cursor.try_again(self.slab).unwrap()
654    }
655
656    pub(crate) fn pos_left(&self) -> usize {
657        self.pos_left
658    }
659    pub(crate) fn acc_left(&self) -> Acc {
660        self.acc_left
661    }
662
663    pub(crate) fn sub_advance_acc(&mut self, mut n: Acc) -> (usize, Option<Run<'a, C::Item>>) {
664        let mut pos = 0;
665        while let Some(mut run) = self.next() {
666            let agg = run.agg();
667            if agg * run.count <= n {
668                n -= agg * run.count;
669                pos += run.count;
670            } else {
671                assert!(agg.as_usize() > 0);
672                let advance = n / agg;
673                run.count -= advance;
674                pos += advance;
675                if run.count == 0 {
676                    let tmp = self.next();
677                    return (pos, tmp);
678                } else {
679                    return (pos, Some(run));
680                }
681            }
682        }
683        (pos, None)
684    }
685
686    pub(crate) fn sub_advance(&mut self, mut n: usize) -> Option<Run<'a, C::Item>> {
687        while let Some(mut run) = self.next() {
688            if run.count <= n {
689                n -= run.count;
690            } else {
691                run.count -= n;
692                if run.count == 0 {
693                    let tmp = self.next();
694                    return tmp;
695                } else {
696                    return Some(run);
697                }
698            }
699        }
700        None
701    }
702
703    pub(crate) fn with_cursor(self) -> RunIterWithCursor<'a, C> {
704        RunIterWithCursor(self)
705    }
706
707    pub(crate) fn containing_agg(self, pos: usize, target: Agg) -> RunIterContaining1<'a, C> {
708        RunIterContaining1 {
709            iter: self,
710            pos,
711            target,
712            range: 0..0,
713        }
714    }
715
716    pub(crate) fn containing_range(
717        self,
718        pos: usize,
719        target: Range<usize>,
720    ) -> RunIterContaining2<'a, C> {
721        RunIterContaining2 {
722            iter: self,
723            pos,
724            target,
725            range: 0..0,
726        }
727    }
728}
729
730impl<'a, C: ColumnCursor> Iterator for RunIter<'a, C>
731where
732    C::Item: 'a,
733{
734    type Item = Run<'a, C::Item>;
735
736    fn next(&mut self) -> Option<Self::Item> {
737        let run = self.cursor.next(self.slab)?;
738        self.pos_left -= run.count;
739        self.acc_left -= run.acc();
740        Some(run)
741    }
742}
743
744pub(crate) struct RunIterWithCursor<'a, C: ColumnCursor>(RunIter<'a, C>);
745
746impl<'a, C: ColumnCursor> Iterator for RunIterWithCursor<'a, C>
747where
748    C::Item: 'a,
749{
750    type Item = (Run<'a, C::Item>, C);
751
752    fn next(&mut self) -> Option<Self::Item> {
753        let run = self.0.next()?;
754        Some((run, self.0.cursor))
755    }
756}