b_tree/
group.rs

1use collate::Collate;
2use std::cmp::Ordering;
3use std::collections::VecDeque;
4use std::io;
5use std::pin::Pin;
6use std::task::{ready, Context, Poll};
7
8use futures::stream::{Fuse, Stream, StreamExt};
9use pin_project::pin_project;
10
11use crate::tree::Leaf;
12use crate::{Collator, Key, NODE_STACK_SIZE};
13
14#[pin_project]
15pub struct GroupBy<C, S, V> {
16    #[pin]
17    source: Fuse<S>,
18    collator: Collator<C>,
19    len: usize,
20    reverse: bool,
21    pending: VecDeque<Key<V>>,
22}
23
24impl<C, S, FE, V> GroupBy<C, S, V>
25where
26    S: Stream<Item = Result<Leaf<FE, V>, io::Error>>,
27{
28    pub fn new(collator: Collator<C>, source: S, n: usize, reverse: bool) -> Self {
29        Self {
30            source: source.fuse(),
31            collator,
32            len: n,
33            reverse,
34            pending: VecDeque::with_capacity(NODE_STACK_SIZE),
35        }
36    }
37}
38
39impl<C, S, V> GroupBy<C, S, V>
40where
41    C: Collate<Value = V>,
42    V: Clone,
43{
44    fn append_leaf<'a, I>(pending: &mut VecDeque<Key<V>>, collator: &Collator<C>, n: usize, leaf: I)
45    where
46        I: IntoIterator<Item = &'a Vec<V>>,
47        V: 'a,
48    {
49        for key in leaf {
50            let key = &key[..n];
51
52            let key: Option<Key<V>> = if let Some(back) = pending.back() {
53                if collator.cmp_slices(back, key) == Ordering::Equal {
54                    None
55                } else {
56                    Some(key.iter().cloned().collect())
57                }
58            } else {
59                Some(key.iter().cloned().collect())
60            };
61
62            if let Some(key) = key {
63                pending.push_back(key.iter().cloned().collect());
64            }
65        }
66    }
67}
68
69impl<C, S, FE, V> Stream for GroupBy<C, S, V>
70where
71    C: Collate<Value = V>,
72    S: Stream<Item = Result<Leaf<FE, V>, io::Error>> + Unpin,
73    V: Clone,
74{
75    type Item = Result<Key<V>, io::Error>;
76
77    fn poll_next(self: Pin<&mut Self>, cxt: &mut Context) -> Poll<Option<Self::Item>> {
78        let mut this = self.project();
79
80        Poll::Ready({
81            loop {
82                if let Some(key) = this.pending.pop_front() {
83                    break Some(Ok(key));
84                }
85
86                match ready!(this.source.as_mut().poll_next(cxt)) {
87                    Some(Ok(leaf)) => {
88                        if *this.reverse {
89                            Self::append_leaf(
90                                this.pending,
91                                this.collator,
92                                *this.len,
93                                leaf.as_ref().iter().rev(),
94                            )
95                        } else {
96                            Self::append_leaf(this.pending, this.collator, *this.len, leaf.as_ref())
97                        }
98                    }
99                    Some(Err(cause)) => break Some(Err(cause)),
100                    None => break None,
101                }
102            }
103        })
104    }
105}