differential_dataflow/trace/implementations/
chunker.rs1use std::collections::VecDeque;
4
5use timely::Container;
6use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
7
8use crate::consolidation::Consolidate;
9
10pub struct ContainerChunker<Output> {
12 pending: Output,
13 ready: VecDeque<Output>,
14 empty: Output,
15}
16
17impl<Output: Default> Default for ContainerChunker<Output> {
18 fn default() -> Self {
19 Self {
20 pending: Output::default(),
21 ready: VecDeque::default(),
22 empty: Output::default(),
23 }
24 }
25}
26
27impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
28where
29 Input: DrainContainer,
30 Output: Default
31 + SizableContainer
32 + Consolidate
33 + PushInto<Input::Item<'a>>,
34{
35 fn push_into(&mut self, container: &'a mut Input) {
36 self.pending.ensure_capacity(&mut None);
37
38 for item in container.drain() {
39 self.pending.push_into(item);
40 if self.pending.at_capacity() {
41 let starting_len = self.pending.len();
42 self.pending.consolidate_into(&mut self.empty);
43 std::mem::swap(&mut self.pending, &mut self.empty);
44 self.empty.clear();
45 if self.pending.len() > starting_len / 2 {
46 self.ready.push_back(std::mem::take(&mut self.pending));
50 }
51 }
52 }
53 }
54}
55
56impl<Output> ContainerBuilder for ContainerChunker<Output>
57where
58 Output: SizableContainer + Consolidate + Container,
59{
60 type Container = Output;
61
62 fn extract(&mut self) -> Option<&mut Self::Container> {
63 if let Some(ready) = self.ready.pop_front() {
64 self.empty = ready;
65 Some(&mut self.empty)
66 } else {
67 None
68 }
69 }
70
71 fn finish(&mut self) -> Option<&mut Self::Container> {
72 if !self.pending.is_empty() {
73 self.pending.consolidate_into(&mut self.empty);
74 std::mem::swap(&mut self.pending, &mut self.empty);
75 self.empty.clear();
76 if !self.pending.is_empty() {
77 self.ready.push_back(std::mem::take(&mut self.pending));
78 }
79 }
80 if let Some(ready) = self.ready.pop_front() {
81 self.empty = ready;
82 Some(&mut self.empty)
83 } else {
84 None
85 }
86 }
87}