differential_dataflow/columnar/
batcher.rs1use std::collections::VecDeque;
10
11use timely::progress::frontier::AntichainRef;
12use timely::progress::{frontier::Antichain, Timestamp};
13use timely::container::PushInto;
14
15use crate::logging::Logger;
16use crate::trace::{Batcher, Description};
17
18use super::layout::ColumnarUpdate as Update;
19use super::updates::UpdatesTyped;
20use super::arrangement::trie_merger;
21use super::spill::{Entry, SpillPolicy};
22
23pub struct MergeBatcher<U: Update> {
25 chains: Vec<VecDeque<Entry<UpdatesTyped<U>>>>,
28 lower: Antichain<U::Time>,
30 frontier: Antichain<U::Time>,
32 policy: Option<Box<dyn SpillPolicy<UpdatesTyped<U>>>>,
35}
36
37impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
38 type Time = U::Time;
39 type Output = UpdatesTyped<U>;
40
41 fn new(_logger: Option<Logger>, _operator_id: usize) -> Self {
42 Self {
43 chains: Vec::new(),
44 frontier: Antichain::new(),
45 lower: Antichain::from_elem(U::Time::minimum()),
46 policy: None,
47 }
48 }
49
50 fn seal(&mut self, upper: Antichain<U::Time>) -> (Vec<Self::Output>, Description<U::Time>) {
55 while self.chains.len() > 1 {
57 let list1 = self.chains.pop().unwrap();
58 let list2 = self.chains.pop().unwrap();
59 let merged = self.merge_by(list1, list2);
60 self.push_chain(merged);
61 }
62 let merged = self.chains.pop().unwrap_or_default();
63
64 let mut readied: Vec<UpdatesTyped<U>> = Vec::new();
70 let mut kept_chain: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
71 self.frontier.clear();
72 {
73 let policy = &mut self.policy;
74 let frontier = &mut self.frontier;
75 let ship = |chunk: UpdatesTyped<U>| readied.push(chunk);
76 let keep = |chunk: UpdatesTyped<U>| {
77 kept_chain.push_back(Entry::Typed(chunk));
78 if let Some(p) = policy.as_mut() {
79 p.apply(&mut kept_chain);
80 }
81 };
82 trie_merger::extract(
83 FetchIter::new(merged),
84 upper.borrow(),
85 frontier,
86 ship,
87 keep,
88 );
89 }
90
91 if !kept_chain.is_empty() {
92 self.push_chain(kept_chain);
93 }
94
95 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum()));
96 self.lower = upper;
97 (readied, description)
98 }
99
100 #[inline]
102 fn frontier(&mut self) -> AntichainRef<'_, U::Time> {
103 self.frontier.borrow()
104 }
105}
106
107impl<U: Update> PushInto<UpdatesTyped<U>> for MergeBatcher<U> {
108 fn push_into(&mut self, chunk: UpdatesTyped<U>) {
109 self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
110 }
111}
112
113impl<U: Update> MergeBatcher<U> {
114 pub fn set_spill_policy(&mut self, policy: Box<dyn SpillPolicy<UpdatesTyped<U>>>) {
116 self.policy = Some(policy);
117 }
118
119 pub fn resident_records(&self) -> usize {
124 self.chains
125 .iter()
126 .flat_map(|c| c.iter())
127 .map(|e| match e {
128 Entry::Typed(c) => {
129 use timely::Accountable;
130 c.record_count() as usize
131 }
132 Entry::Paged(_) => 0,
133 })
134 .sum()
135 }
136
137 fn insert_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
140 if !chain.is_empty() {
141 self.push_chain(chain);
142 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
143 let list1 = self.chains.pop().unwrap();
144 let list2 = self.chains.pop().unwrap();
145 let merged = self.merge_by(list1, list2);
146 self.push_chain(merged);
147 }
148 }
149 }
150
151 fn push_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
156 self.chains.push(chain);
157 if let Some(policy) = self.policy.as_mut() {
158 if let Some(top) = self.chains.last_mut() {
159 policy.apply(top);
160 }
161 }
162 }
163
164 fn merge_by(
170 &mut self,
171 list1: VecDeque<Entry<UpdatesTyped<U>>>,
172 list2: VecDeque<Entry<UpdatesTyped<U>>>,
173 ) -> VecDeque<Entry<UpdatesTyped<U>>> {
174 let mut output: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
175 let policy = &mut self.policy;
176 let sink = |chunk: UpdatesTyped<U>| {
177 output.push_back(Entry::Typed(chunk));
178 if let Some(p) = policy.as_mut() {
179 p.apply(&mut output);
180 }
181 };
182 trie_merger::merge_batches(
183 FetchIter::new(list1),
184 FetchIter::new(list2),
185 sink,
186 );
187 output
188 }
189
190}
191
192struct FetchIter<U: Update> {
197 queue: VecDeque<Entry<UpdatesTyped<U>>>,
198 pending: VecDeque<UpdatesTyped<U>>,
199}
200
201impl<U: Update> FetchIter<U> {
202 fn new(queue: VecDeque<Entry<UpdatesTyped<U>>>) -> Self {
203 Self { queue, pending: VecDeque::new() }
204 }
205}
206
207impl<U: Update> Iterator for FetchIter<U> {
208 type Item = UpdatesTyped<U>;
209 fn next(&mut self) -> Option<UpdatesTyped<U>> {
210 loop {
211 if let Some(c) = self.pending.pop_front() {
212 return Some(c);
213 }
214 match self.queue.pop_front()? {
215 Entry::Typed(c) => return Some(c),
216 Entry::Paged(handle) => match handle.fetch() {
217 Ok(chunks) => self.pending.extend(chunks),
218 Err(_) => panic!("Fetch::fetch failed; retry path not yet wired"),
219 },
220 }
221 }
222 }
223}