1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
extern crate futures;
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use futures::*;
pub trait Successor {
fn next(&self) -> Self;
}
pub struct OrderedNoGaps<S, F, K> where S: Stream {
stream: S,
key: F,
last_key_polled: K,
buffer: BinaryHeap<Reverse<S::Item>>,
}
impl<S, F, K> Stream for OrderedNoGaps<S, F, K> where
S: Stream,
S::Item: Ord,
F: Fn(&S::Item) -> K,
K: Successor + PartialEq {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
if self.buffer_peek_is_next() {
match self.buffer.pop() {
Some(Reverse(v)) => {
self.last_key_polled = self.last_key_polled.next();
Ok(Async::Ready(Some(v)))
},
None => self.poll_from_stream()
}
} else {
self.poll_from_stream()
}
}
}
impl<S, F, K> OrderedNoGaps<S, F, K> where
S: Stream,
S::Item: Ord,
F: Fn(&S::Item) -> K,
K: Successor + PartialEq {
fn poll_from_stream(&mut self) -> Result<Async<Option<<Self as Stream>::Item>>, <Self as Stream>::Error> {
match self.stream.poll() {
Ok(Async::Ready(Some(v))) => {
if self.is_next(&v) {
self.last_key_polled = self.last_key_polled.next();
Ok(Async::Ready(Some(v)))
} else {
self.buffer.push(Reverse(v));
self.poll()
}
}
otherwise => otherwise
}
}
fn is_next(&self, v: &<Self as Stream>::Item) -> bool {
let key = &self.key;
self.last_key_polled.next() == key(v)
}
fn buffer_peek_is_next(&self) -> bool {
match self.buffer.peek() {
Some(Reverse(v)) => self.is_next(v),
None => false
}
}
}
pub fn ordered_no_gaps<S, F, K>(stream: S, zero: K, key: F) -> OrderedNoGaps<S, F, K> where
S: Stream,
S::Item: Ord,
F: Fn(&S::Item) -> K,
K: Successor + PartialEq {
OrderedNoGaps { stream, key, last_key_polled: zero, buffer: BinaryHeap::default() }
}
#[cfg(test)]
mod test {
extern crate rand;
use super::*;
use super::test::rand::Rng;
impl Successor for u8 {
fn next(&self) -> Self { self + 1 }
}
#[test]
fn orders_without_gaps() {
let mut ns: Vec<u8> = (1..100).collect();
rand::thread_rng().shuffle(&mut ns);
assert!(!is_ordered(ns.clone()));
let stream: Box<Stream<Item=u8, Error=()>> = Box::new(stream::iter_ok(ns.into_iter()));
let ordered: Vec<u8> = ordered_no_gaps(stream, 0, |&x| x).collect().wait().unwrap();
assert!(is_ordered(ordered.clone()));
assert!(ordered.len() == 99);
}
fn is_ordered(i: Vec<u8>) -> bool {
i.into_iter().fold((true, 0), |(b, z), x| (b && x >= z, x)).0
}
}