1use collate::Collate;
2use std::cmp::Ordering;
3use std::collections::VecDeque;
4use std::io;
5use std::pin::Pin;
6use std::task::{ready, Context, Poll};
7
8use futures::stream::{Fuse, Stream, StreamExt};
9use pin_project::pin_project;
10
11use crate::tree::Leaf;
12use crate::{Collator, Key, NODE_STACK_SIZE};
13
14#[pin_project]
15pub struct GroupBy<C, S, V> {
16 #[pin]
17 source: Fuse<S>,
18 collator: Collator<C>,
19 len: usize,
20 reverse: bool,
21 pending: VecDeque<Key<V>>,
22}
23
24impl<C, S, FE, V> GroupBy<C, S, V>
25where
26 S: Stream<Item = Result<Leaf<FE, V>, io::Error>>,
27{
28 pub fn new(collator: Collator<C>, source: S, n: usize, reverse: bool) -> Self {
29 Self {
30 source: source.fuse(),
31 collator,
32 len: n,
33 reverse,
34 pending: VecDeque::with_capacity(NODE_STACK_SIZE),
35 }
36 }
37}
38
39impl<C, S, V> GroupBy<C, S, V>
40where
41 C: Collate<Value = V>,
42 V: Clone,
43{
44 fn append_leaf<'a, I>(pending: &mut VecDeque<Key<V>>, collator: &Collator<C>, n: usize, leaf: I)
45 where
46 I: IntoIterator<Item = &'a Vec<V>>,
47 V: 'a,
48 {
49 for key in leaf {
50 let key = &key[..n];
51
52 let key: Option<Key<V>> = if let Some(back) = pending.back() {
53 if collator.cmp_slices(back, key) == Ordering::Equal {
54 None
55 } else {
56 Some(key.iter().cloned().collect())
57 }
58 } else {
59 Some(key.iter().cloned().collect())
60 };
61
62 if let Some(key) = key {
63 pending.push_back(key.iter().cloned().collect());
64 }
65 }
66 }
67}
68
69impl<C, S, FE, V> Stream for GroupBy<C, S, V>
70where
71 C: Collate<Value = V>,
72 S: Stream<Item = Result<Leaf<FE, V>, io::Error>> + Unpin,
73 V: Clone,
74{
75 type Item = Result<Key<V>, io::Error>;
76
77 fn poll_next(self: Pin<&mut Self>, cxt: &mut Context) -> Poll<Option<Self::Item>> {
78 let mut this = self.project();
79
80 Poll::Ready({
81 loop {
82 if let Some(key) = this.pending.pop_front() {
83 break Some(Ok(key));
84 }
85
86 match ready!(this.source.as_mut().poll_next(cxt)) {
87 Some(Ok(leaf)) => {
88 if *this.reverse {
89 Self::append_leaf(
90 this.pending,
91 this.collator,
92 *this.len,
93 leaf.as_ref().iter().rev(),
94 )
95 } else {
96 Self::append_leaf(this.pending, this.collator, *this.len, leaf.as_ref())
97 }
98 }
99 Some(Err(cause)) => break Some(Err(cause)),
100 None => break None,
101 }
102 }
103 })
104 }
105}