1use super::aggregate::{Acc, Agg};
2use super::columndata::ColumnData;
3use super::encoder::{Encoder, EncoderState, SpliceEncoder, Writer};
4use super::pack::{MaybePackable, PackError, Packable};
5use super::slab::{Slab, SlabWeight, SlabWriter, SpanWeight};
6use super::Cow;
7
8use std::fmt::Debug;
9use std::ops::Range;
10
11pub trait HasMinMax {
12 fn min(&self) -> Agg;
13 fn max(&self) -> Agg;
14 fn intersects(&self, a: Range<usize>) -> bool {
15 let b = self.min().as_usize()..self.max().as_usize();
16 a.start <= b.end && b.start <= a.end
17 }
18}
19
20pub trait HasPos {
22 fn pos(&self) -> usize;
23}
24
25pub trait HasAcc {
27 fn acc(&self) -> Acc;
28}
29
30impl HasPos for SlabWeight {
31 fn pos(&self) -> usize {
32 self.pos
33 }
34}
35
36impl HasMinMax for SlabWeight {
37 fn min(&self) -> Agg {
38 self.min
39 }
40 fn max(&self) -> Agg {
41 self.max
42 }
43}
44
45impl HasAcc for SlabWeight {
46 fn acc(&self) -> Acc {
47 self.acc
48 }
49}
50
51#[derive(Debug, PartialEq, Default)]
64pub struct Run<'a, P: Packable + ?Sized> {
65 pub count: usize,
66 pub value: Option<Cow<'a, P>>,
67}
68
69impl<'a, P: Packable + ?Sized> Copy for Run<'a, P> where Cow<'a, P>: Copy {}
70
71impl<P: Packable + ?Sized> Clone for Run<'_, P> {
72 fn clone(&self) -> Self {
73 Run {
74 count: self.count,
75 value: self.value.clone(),
76 }
77 }
78}
79
80impl<'a, P: Packable + ?Sized> Iterator for Run<'a, P> {
81 type Item = Option<Cow<'a, P>>;
82
83 fn next(&mut self) -> Option<Self::Item> {
84 if self.count >= 1 {
85 self.count -= 1;
86 Some(self.value.clone())
87 } else {
88 None
89 }
90 }
91
92 fn nth(&mut self, n: usize) -> Option<Self::Item> {
93 if self.count > n {
94 self.count -= n + 1;
95 Some(self.value.clone())
96 } else {
97 self.count = 0;
98 None
99 }
100 }
101}
102
103impl<'a, T: Packable + ?Sized> Run<'a, T> {
104 pub(crate) fn pop_n(&self, n: usize) -> Option<Run<'a, T>> {
105 if self.count <= n {
106 None
107 } else {
108 let count = self.count - n;
109 let value = self.value.clone();
110 Some(Run { count, value })
111 }
112 }
113
114 pub(crate) fn pop(&self) -> Option<Run<'a, T>> {
115 self.pop_n(1)
116 }
117
118 pub fn agg(&self) -> Agg {
119 self.value.as_ref().map(|i| T::agg(i)).unwrap_or_default()
120 }
121 pub fn acc(&self) -> Acc {
122 self.agg() * self.count
123 }
124}
125
126impl<T: Packable<Owned = T>> Run<'_, T> {
127 pub fn init(count: usize, value: T) -> Self {
128 Self {
129 count,
130 value: Some(Cow::Owned(value)),
131 }
132 }
133}
134
135impl Run<'_, i64> {
136 pub fn delta(&self) -> i64 {
137 self.count as i64 * self.value.as_deref().cloned().unwrap_or(0)
138 }
139}
140
141impl<'a, T: Packable + ?Sized> Run<'a, T> {
142 pub fn new(count: usize, value: Option<Cow<'a, T>>) -> Self {
143 Run { count, value }
144 }
145
146 pub fn plus(mut self, num: usize) -> Self {
147 self.count += num;
148 self
149 }
150}
151
152pub trait ColumnCursor: Debug + Clone + Copy + PartialEq + Default {
168 type Item: Packable + ?Sized;
169 type State<'a>: EncoderState<'a, Self::Item>
170 where
171 <Self as ColumnCursor>::Item: 'a;
172 type PostState<'a>: Debug
173 where
174 Self::Item: 'a;
175 type Export: Debug + PartialEq + Clone;
176 type SlabIndex: Debug + Clone + HasPos + HasAcc + SpanWeight<Slab>;
177
178 #[inline(never)]
180 fn encode<'a, M, I>(out: &mut Vec<u8>, values: I) -> Range<usize>
181 where
182 M: MaybePackable<'a, Self::Item>,
183 I: IntoIterator<Item = M>,
184 Self::Item: 'a,
185 {
186 let start = out.len();
187 let mut state = Self::State::default();
188 for v in values {
189 state.append(out, v.maybe_packable());
190 }
191 state.flush(out);
192 let end = out.len();
193 start..end
194 }
195
196 fn encode_unless_empty<'a, M, I>(out: &mut Vec<u8>, values: I) -> Range<usize>
197 where
198 M: MaybePackable<'a, Self::Item>,
199 I: IntoIterator<Item = M>,
200 Self::Item: 'a,
201 {
202 let start = out.len();
203 let mut state = Self::State::default();
204 for v in values {
205 state.append(out, v.maybe_packable());
206 }
207 if out.len() == start && state.is_empty() {
208 out.truncate(start);
209 return start..start;
210 }
211 state.flush(out);
212 let end = out.len();
213 start..end
214 }
215
216 fn empty() -> Self;
217
218 fn new(_: &Slab) -> Self {
219 Self::empty()
220 }
221
222 fn iter(slab: &[u8]) -> CursorIter<'_, Self> {
223 CursorIter {
224 slab,
225 cursor: Self::empty(),
226 run: None,
227 }
228 }
229
230 fn compute_min_max(_slabs: &mut [Slab]) {
231 for s in _slabs {
232 let (_run, c) = Self::seek(s.len(), s);
233 let _next = c.clone().next(s.as_slice());
234 assert!(_run.is_some());
235 assert!(_next.is_none());
236 }
237 }
238
239 fn is_empty(v: Option<Cow<'_, Self::Item>>) -> bool {
240 v.is_none()
241 }
242
243 fn contains_range(
244 &self,
245 run: &Run<'_, Self::Item>,
246 range: &Range<usize>,
247 ) -> Option<Range<usize>> {
248 let runval = <Self::Item>::maybe_agg(&run.value).as_usize();
249 if range.contains(&runval) {
250 Some(0..run.count)
251 } else {
252 None
253 }
254 }
255
256 fn contains(&self, run: &Run<'_, Self::Item>, target: Agg) -> Option<Range<usize>> {
257 let start = target.as_usize();
258 let range = start..(start + 1);
259 let old = self.contains_agg(run, target);
260 let new = self.contains_range(run, &range);
261 assert_eq!(old, new);
262 old
263 }
264
265 fn contains_agg(&self, run: &Run<'_, Self::Item>, agg: Agg) -> Option<Range<usize>> {
266 if agg == <Self::Item>::maybe_agg(&run.value) {
267 Some(0..run.count)
268 } else {
269 None
270 }
271 }
272
273 fn pop<'a>(&self, run: &mut Run<'a, Self::Item>) -> Option<Option<Cow<'a, Self::Item>>> {
274 run.next()
275 }
276
277 fn pop_n<'a>(
278 &self,
279 run: &mut Run<'a, Self::Item>,
280 n: usize,
281 ) -> Option<Option<Cow<'a, Self::Item>>> {
282 assert!(n > 0);
283 run.nth(n - 1)
284 }
285
286 fn finalize_state<'a>(
288 slab: &'a Slab,
289 encoder: &mut Encoder<'a, Self>,
290 post: Self::PostState<'a>,
291 cursor: Self,
292 ) -> Option<Self>;
293
294 fn finish<'a>(slab: &'a Slab, writer: &mut SlabWriter<'a, Self::Item>, cursor: Self);
296
297 fn copy_between<'a>(
298 slab: &'a [u8],
299 writer: &mut SlabWriter<'a, Self::Item>,
300 c0: Self,
301 c1: Self,
302 run: Run<'a, Self::Item>,
303 size: usize,
304 ) -> Self::State<'a>;
305
306 fn splice_encoder(index: usize, del: usize, slab: &Slab) -> SpliceEncoder<'_, Self>;
307
308 fn slab_size() -> usize;
309
310 fn try_next<'a>(&mut self, data: &'a [u8]) -> Result<Option<Run<'a, Self::Item>>, PackError>;
311
312 fn try_again<'a>(&self, data: &'a [u8]) -> Result<Option<Run<'a, Self::Item>>, PackError>;
313
314 fn export_splice<'a, I>(data: &mut Vec<Self::Export>, range: Range<usize>, values: I)
315 where
316 I: Iterator<Item = Option<Cow<'a, Self::Item>>>,
317 Self::Item: 'a;
318
319 fn next<'a>(&mut self, data: &'a [u8]) -> Option<Run<'a, Self::Item>> {
320 match self.try_next(data).unwrap() {
321 Some(run) if run.count == 0 => self.next(data),
322 result => result,
323 }
324 }
325
326 fn index(&self) -> usize;
327
328 fn offset(&self) -> usize;
329
330 fn acc(&self) -> Acc {
331 Acc::new()
332 }
333
334 fn min(&self) -> Agg {
335 Agg::default()
336 }
337
338 fn max(&self) -> Agg {
339 Agg::default()
340 }
341
342 fn seek(index: usize, slab: &Slab) -> (Option<Run<'_, Self::Item>>, Self) {
343 if index == 0 {
344 return (None, Self::new(slab));
345 } else {
346 let mut cursor = Self::new(slab);
347 while let Some(val) = cursor.next(slab.as_slice()) {
348 if cursor.index() >= index {
349 return (Some(val), cursor);
350 }
351 }
352 }
353 panic!()
354 }
355
356 fn debug_scan<F>(data: &[u8], test: &F) -> Result<Self, PackError>
357 where
358 F: Fn(Option<&Self::Item>) -> Option<String>,
359 {
360 let mut cursor = Self::empty();
361 while let Some(val) = cursor.try_next(data)? {
362 Self::Item::validate(val.value.as_deref(), test)?;
363 }
364 Ok(cursor)
365 }
366
367 fn load_with<F>(data: &[u8], test: &F) -> Result<ColumnData<Self>, PackError>
368 where
369 F: Fn(Option<&Self::Item>) -> Option<String>;
370
371 fn load(data: &[u8]) -> Result<ColumnData<Self>, PackError> {
372 Self::load_with(data, &|_| None)
373 }
374
375 fn splice<'a, 'b, I, M>(
376 slab: &'a Slab,
377 index: usize,
378 del: usize,
379 values: I,
380 #[cfg(feature = "slow_path_assertions")] debug: (&mut Vec<Self::Export>, Range<usize>),
381 ) -> SpliceResult
382 where
383 M: MaybePackable<'b, Self::Item>,
384 I: Iterator<Item = M>,
385 Self::Item: 'b,
386 {
387 #[cfg(feature = "slow_path_assertions")]
388 let mut copy_of_values = vec![];
389 let mut encoder = Self::splice_encoder(index, del, slab);
390 let mut add = 0;
391 let mut value_acc = Acc::new();
392 for v in values {
393 value_acc += v.agg();
394 let opt_v = v.maybe_packable();
395 #[cfg(feature = "slow_path_assertions")]
396 copy_of_values.push(opt_v.clone());
397 add += encoder.append_item(opt_v);
398 }
399
400 #[cfg(feature = "slow_path_assertions")]
401 Self::export_splice(debug.0, debug.1, copy_of_values.into_iter());
402
403 let overflow = encoder.overflow;
404 let del = encoder.deleted;
405 let group = encoder.acc;
406 let slabs = encoder.finish();
407 if del == 0 {
408 assert_eq!(
409 slabs.iter().map(|s| s.acc()).sum::<Acc>(),
410 slab.acc() + value_acc
411 );
412 assert_eq!(
413 slabs.iter().map(|s| s.len()).sum::<usize>(),
414 slab.len() + add
415 );
416 }
417 SpliceResult {
418 add,
419 del,
420 overflow,
421 group,
422 slabs,
423 }
424 }
425
426 fn splice_delete<'a>(
427 _post: Option<Run<'a, Self::Item>>,
428 _cursor: Self,
429 _del: usize,
430 slab: &'a Slab,
431 ) -> SpliceDel<'a, Self> {
432 let mut cursor = _cursor;
433 let mut post = _post;
434 let mut del = _del;
435 let mut overflow = 0;
436 let mut deleted = 0;
437 while del > 0 {
438 match post {
439 Some(Run { count, value }) if del < count => {
441 deleted += del;
442 post = Some(Run {
443 count: count - del,
444 value,
445 });
446 del = 0;
447 }
448 Some(Run { count, .. }) => {
450 del -= count;
451 deleted += count;
452 post = None;
453 }
454 None => {
455 if let Some(p) = Self::next(&mut cursor, slab.as_slice()) {
456 post = Some(p);
457 } else {
458 post = None;
459 overflow = del;
460 del = 0;
461 }
462 }
463 }
464 }
465 assert!(_del == deleted + overflow);
466 SpliceDel {
467 deleted,
468 overflow,
469 cursor,
470 post,
471 }
472 }
473
474 fn init_empty(len: usize) -> Slab {
475 if len > 0 {
476 let mut writer = SlabWriter::<Self::Item>::new(usize::MAX, false);
477 writer.flush_null(len);
478 writer.finish().pop().unwrap_or_default()
479 } else {
480 Slab::default()
481 }
482 }
483}
484
485pub struct SpliceDel<'a, C: ColumnCursor> {
486 pub(crate) deleted: usize,
487 pub(crate) overflow: usize,
488 pub(crate) cursor: C,
489 pub(crate) post: Option<Run<'a, C::Item>>,
490}
491
492pub struct SpliceResult {
493 pub(crate) add: usize,
494 pub(crate) del: usize,
495 pub(crate) overflow: usize,
496 pub(crate) group: Acc,
497 pub(crate) slabs: Vec<Slab>,
498}
499
500#[derive(Debug)]
507pub struct CursorIter<'a, C: ColumnCursor> {
508 pub(crate) slab: &'a [u8],
509 pub(crate) cursor: C,
510 pub(crate) run: Option<Run<'a, C::Item>>,
511}
512
513impl<C: ColumnCursor> Clone for CursorIter<'_, C> {
514 fn clone(&self) -> Self {
515 CursorIter {
516 slab: self.slab,
517 cursor: self.cursor,
518 run: self.run.clone(),
519 }
520 }
521}
522
523impl<'a, C: ColumnCursor> CursorIter<'a, C> {
524 fn next_run(&mut self) -> Result<Option<Run<'a, C::Item>>, PackError> {
525 while let Some(run) = self.cursor.try_next(self.slab)? {
526 if run.count > 0 {
527 return Ok(Some(run));
528 }
529 }
530 Ok(None)
531 }
532}
533
534impl<'a, C: ColumnCursor> Iterator for CursorIter<'a, C>
535where
536 C::Item: 'a,
537{
538 type Item = Result<Option<Cow<'a, C::Item>>, PackError>;
539
540 fn next(&mut self) -> Option<Self::Item> {
541 match self.run.as_mut() {
542 Some(run) if run.count > 0 => Ok(self.cursor.pop(run)).transpose(),
543 _ => match self.next_run() {
544 Ok(Some(mut run)) if run.count > 0 => {
545 let value = self.cursor.pop(&mut run);
546 self.run = Some(run);
547 Ok(value).transpose()
548 }
549 Ok(_) => None,
550 Err(e) => Some(Err(e)),
551 },
552 }
553 }
554}
555
556#[derive(Debug, Clone, Default, Copy)]
557pub struct RunIter<'a, C: ColumnCursor> {
558 pub(crate) slab: &'a [u8],
559 pub(crate) cursor: C,
560 pub(crate) pos_left: usize,
561 pub(crate) acc_left: Acc,
562}
563
564#[derive(Debug, Clone, Default, Copy)]
565pub(crate) struct RunIterState<C: ColumnCursor> {
566 pub(crate) cursor: C,
567 pub(crate) pos_left: usize,
568 pub(crate) acc_left: Acc,
569}
570
571#[derive(Debug, Clone, Default)]
572pub(crate) struct RunIterContaining1<'a, C: ColumnCursor>
573where
574 C::Item: 'a,
575{
576 pub(crate) iter: RunIter<'a, C>,
577 pub(crate) pos: usize,
578 pub(crate) target: Agg,
579 pub(crate) range: Range<usize>,
580}
581
582#[derive(Debug, Clone, Default)]
583pub(crate) struct RunIterContaining2<'a, C: ColumnCursor>
584where
585 C::Item: 'a,
586{
587 pub(crate) iter: RunIter<'a, C>,
588 pub(crate) pos: usize,
589 pub(crate) target: Range<usize>,
590 pub(crate) range: Range<usize>,
591}
592
593impl<C: ColumnCursor> Iterator for RunIterContaining1<'_, C> {
594 type Item = usize;
595
596 fn next(&mut self) -> Option<usize> {
597 while self.range.is_empty() {
598 let run = self.iter.next()?; if let Some(range) = self.iter.cursor.contains(&run, self.target) {
600 self.range.start = range.start + self.pos;
601 self.range.end = range.end + self.pos;
602 }
603 self.pos += run.count;
604 }
605 self.range.next()
606 }
607}
608
609impl<C: ColumnCursor> Iterator for RunIterContaining2<'_, C> {
610 type Item = usize;
611
612 fn next(&mut self) -> Option<usize> {
613 while self.range.is_empty() {
614 let run = self.iter.next()?; if let Some(range) = self.iter.cursor.contains_range(&run, &self.target) {
616 self.range.start = range.start + self.pos;
617 self.range.end = range.end + self.pos;
618 }
619 self.pos += run.count;
620 }
621 self.range.next()
622 }
623}
624
625impl<'a, C: ColumnCursor> RunIter<'a, C> {
626 pub fn empty() -> Self {
627 RunIter {
628 slab: &[],
629 cursor: C::empty(),
630 pos_left: 0,
631 acc_left: Acc::new(),
632 }
633 }
634
635 pub(crate) fn resume(slab: &'a [u8], state: RunIterState<C>) -> RunIter<'a, C> {
636 RunIter {
637 slab,
638 cursor: state.cursor,
639 pos_left: state.pos_left,
640 acc_left: state.acc_left,
641 }
642 }
643
644 pub(crate) fn suspend(&self) -> RunIterState<C> {
645 RunIterState {
646 cursor: self.cursor,
647 pos_left: self.pos_left,
648 acc_left: self.acc_left,
649 }
650 }
651
652 pub(crate) fn current(&self) -> Option<Run<'a, C::Item>> {
653 self.cursor.try_again(self.slab).unwrap()
654 }
655
656 pub(crate) fn pos_left(&self) -> usize {
657 self.pos_left
658 }
659 pub(crate) fn acc_left(&self) -> Acc {
660 self.acc_left
661 }
662
663 pub(crate) fn sub_advance_acc(&mut self, mut n: Acc) -> (usize, Option<Run<'a, C::Item>>) {
664 let mut pos = 0;
665 while let Some(mut run) = self.next() {
666 let agg = run.agg();
667 if agg * run.count <= n {
668 n -= agg * run.count;
669 pos += run.count;
670 } else {
671 assert!(agg.as_usize() > 0);
672 let advance = n / agg;
673 run.count -= advance;
674 pos += advance;
675 if run.count == 0 {
676 let tmp = self.next();
677 return (pos, tmp);
678 } else {
679 return (pos, Some(run));
680 }
681 }
682 }
683 (pos, None)
684 }
685
686 pub(crate) fn sub_advance(&mut self, mut n: usize) -> Option<Run<'a, C::Item>> {
687 while let Some(mut run) = self.next() {
688 if run.count <= n {
689 n -= run.count;
690 } else {
691 run.count -= n;
692 if run.count == 0 {
693 let tmp = self.next();
694 return tmp;
695 } else {
696 return Some(run);
697 }
698 }
699 }
700 None
701 }
702
703 pub(crate) fn with_cursor(self) -> RunIterWithCursor<'a, C> {
704 RunIterWithCursor(self)
705 }
706
707 pub(crate) fn containing_agg(self, pos: usize, target: Agg) -> RunIterContaining1<'a, C> {
708 RunIterContaining1 {
709 iter: self,
710 pos,
711 target,
712 range: 0..0,
713 }
714 }
715
716 pub(crate) fn containing_range(
717 self,
718 pos: usize,
719 target: Range<usize>,
720 ) -> RunIterContaining2<'a, C> {
721 RunIterContaining2 {
722 iter: self,
723 pos,
724 target,
725 range: 0..0,
726 }
727 }
728}
729
730impl<'a, C: ColumnCursor> Iterator for RunIter<'a, C>
731where
732 C::Item: 'a,
733{
734 type Item = Run<'a, C::Item>;
735
736 fn next(&mut self) -> Option<Self::Item> {
737 let run = self.cursor.next(self.slab)?;
738 self.pos_left -= run.count;
739 self.acc_left -= run.acc();
740 Some(run)
741 }
742}
743
744pub(crate) struct RunIterWithCursor<'a, C: ColumnCursor>(RunIter<'a, C>);
745
746impl<'a, C: ColumnCursor> Iterator for RunIterWithCursor<'a, C>
747where
748 C::Item: 'a,
749{
750 type Item = (Run<'a, C::Item>, C);
751
752 fn next(&mut self) -> Option<Self::Item> {
753 let run = self.0.next()?;
754 Some((run, self.0.cursor))
755 }
756}