Skip to main content

differential_dataflow/columnar/
batcher.rs

1//! A `Batcher` for columnar streams that merges sorted chains via the free
2//! functions in `trie_merger`.
3//!
4//! Callers feed already-chunked, sorted-and-consolidated `UpdatesTyped<U>` into
5//! the batcher via [`PushInto`]; forming such chunks from `RecordedUpdates<U>`
6//! is the responsibility of the surrounding dataflow operator's chunker
7//! (`TrieChunker`).
8
9use std::collections::VecDeque;
10
11use timely::progress::frontier::AntichainRef;
12use timely::progress::{frontier::Antichain, Timestamp};
13use timely::container::PushInto;
14
15use crate::logging::Logger;
16use crate::trace::{Batcher, Description};
17
18use super::layout::ColumnarUpdate as Update;
19use super::updates::UpdatesTyped;
20use super::arrangement::trie_merger;
21use super::spill::{Entry, SpillPolicy};
22
23/// Creates batches from chunks of sorted, consolidated columnar updates.
24pub struct MergeBatcher<U: Update> {
25    /// A sequence of power-of-two length chains of sorted, consolidated entries.
26    /// Each entry is either an in-memory chunk or a handle to a paged-out chunk.
27    chains: Vec<VecDeque<Entry<UpdatesTyped<U>>>>,
28    /// Current lower frontier, we sealed up to here.
29    lower: Antichain<U::Time>,
30    /// The lower-bound frontier of the data, after the last call to seal.
31    frontier: Antichain<U::Time>,
32    /// Optional spill policy, consulted after each chain insert. `None` keeps
33    /// everything resident.
34    policy: Option<Box<dyn SpillPolicy<UpdatesTyped<U>>>>,
35}
36
37impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
38    type Time = U::Time;
39    type Output = UpdatesTyped<U>;
40
41    fn new(_logger: Option<Logger>, _operator_id: usize) -> Self {
42        Self {
43            chains: Vec::new(),
44            frontier: Antichain::new(),
45            lower: Antichain::from_elem(U::Time::minimum()),
46            policy: None,
47        }
48    }
49
50    // Sealing a batch means finding those updates with times not greater or equal to any time
51    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
52    // which we call `lower`, by assumption that after sealing a batcher we receive no more
53    // updates with times not greater or equal to `upper`.
54    fn seal(&mut self, upper: Antichain<U::Time>) -> (Vec<Self::Output>, Description<U::Time>) {
55        // Merge all remaining chains into a single chain.
56        while self.chains.len() > 1 {
57            let list1 = self.chains.pop().unwrap();
58            let list2 = self.chains.pop().unwrap();
59            let merged = self.merge_by(list1, list2);
60            self.push_chain(merged);
61        }
62        let merged = self.chains.pop().unwrap_or_default();
63
64        // Extract readied data, streaming. `merged` is consumed lazily via
65        // `FetchIter`; ship-side chunks flow into `readied` for the
66        // builder; kept-side chunks flow into a fresh chain that is offered
67        // to the spill policy as each chunk lands, so kept never accumulates
68        // resident in full.
69        let mut readied: Vec<UpdatesTyped<U>> = Vec::new();
70        let mut kept_chain: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
71        self.frontier.clear();
72        {
73            let policy = &mut self.policy;
74            let frontier = &mut self.frontier;
75            let ship = |chunk: UpdatesTyped<U>| readied.push(chunk);
76            let keep = |chunk: UpdatesTyped<U>| {
77                kept_chain.push_back(Entry::Typed(chunk));
78                if let Some(p) = policy.as_mut() {
79                    p.apply(&mut kept_chain);
80                }
81            };
82            trie_merger::extract(
83                FetchIter::new(merged),
84                upper.borrow(),
85                frontier,
86                ship,
87                keep,
88            );
89        }
90
91        if !kept_chain.is_empty() {
92            self.push_chain(kept_chain);
93        }
94
95        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum()));
96        self.lower = upper;
97        (readied, description)
98    }
99
100    /// The frontier of elements remaining after the most recent call to `self.seal`.
101    #[inline]
102    fn frontier(&mut self) -> AntichainRef<'_, U::Time> {
103        self.frontier.borrow()
104    }
105}
106
107impl<U: Update> PushInto<UpdatesTyped<U>> for MergeBatcher<U> {
108    fn push_into(&mut self, chunk: UpdatesTyped<U>) {
109        self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
110    }
111}
112
113impl<U: Update> MergeBatcher<U> {
114    /// Install a spill policy. Consulted after each chain insert.
115    pub fn set_spill_policy(&mut self, policy: Box<dyn SpillPolicy<UpdatesTyped<U>>>) {
116        self.policy = Some(policy);
117    }
118
119    /// Sum of records currently held in `Entry::Typed` chunks across all
120    /// chains. `Entry::Paged` entries are excluded — they live on backing
121    /// storage, not in the process heap. Spill policies bound this quantity;
122    /// RSS may still grow due to materialize-on-merge.
123    pub fn resident_records(&self) -> usize {
124        self.chains
125            .iter()
126            .flat_map(|c| c.iter())
127            .map(|e| match e {
128                Entry::Typed(c) => {
129                    use timely::Accountable;
130                    c.record_count() as usize
131                }
132                Entry::Paged(_) => 0,
133            })
134            .sum()
135    }
136
137    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
138    /// by decreasing length.
139    fn insert_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
140        if !chain.is_empty() {
141            self.push_chain(chain);
142            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
143                let list1 = self.chains.pop().unwrap();
144                let list2 = self.chains.pop().unwrap();
145                let merged = self.merge_by(list1, list2);
146                self.push_chain(merged);
147            }
148        }
149    }
150
151    /// Push a chain onto `chains` and consult the spill policy on the result.
152    /// Following TD's `MergeQueue::extend`, which calls `policy.apply` after
153    /// each queue extension. Applied to inputs and to merge / extract results
154    /// alike, so threshold-style policies see multi-chunk chains.
155    fn push_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
156        self.chains.push(chain);
157        if let Some(policy) = self.policy.as_mut() {
158            if let Some(top) = self.chains.last_mut() {
159                policy.apply(top);
160            }
161        }
162    }
163
164    /// Merge two sorted chains. Inputs are streamed lazily through
165    /// `FetchIter` so paged entries are fetched one group at a time.
166    /// Output chunks flow through a sink that pushes into a fresh chain and
167    /// invokes the spill policy after each emission, so the merge result can
168    /// be paged out as it's produced rather than buffered in full.
169    fn merge_by(
170        &mut self,
171        list1: VecDeque<Entry<UpdatesTyped<U>>>,
172        list2: VecDeque<Entry<UpdatesTyped<U>>>,
173    ) -> VecDeque<Entry<UpdatesTyped<U>>> {
174        let mut output: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
175        let policy = &mut self.policy;
176        let sink = |chunk: UpdatesTyped<U>| {
177            output.push_back(Entry::Typed(chunk));
178            if let Some(p) = policy.as_mut() {
179                p.apply(&mut output);
180            }
181        };
182        trie_merger::merge_batches(
183            FetchIter::new(list1),
184            FetchIter::new(list2),
185            sink,
186        );
187        output
188    }
189
190}
191
192/// Streaming iterator over a chain's chunks. Yields `Entry::Typed` chunks
193/// directly; for `Entry::Paged`, calls `Fetch::fetch` on demand and yields
194/// the resulting chunks one by one. Bounds materialized chunks to one fetch
195/// group at a time (plus whatever the consumer is holding).
196struct FetchIter<U: Update> {
197    queue: VecDeque<Entry<UpdatesTyped<U>>>,
198    pending: VecDeque<UpdatesTyped<U>>,
199}
200
201impl<U: Update> FetchIter<U> {
202    fn new(queue: VecDeque<Entry<UpdatesTyped<U>>>) -> Self {
203        Self { queue, pending: VecDeque::new() }
204    }
205}
206
207impl<U: Update> Iterator for FetchIter<U> {
208    type Item = UpdatesTyped<U>;
209    fn next(&mut self) -> Option<UpdatesTyped<U>> {
210        loop {
211            if let Some(c) = self.pending.pop_front() {
212                return Some(c);
213            }
214            match self.queue.pop_front()? {
215                Entry::Typed(c) => return Some(c),
216                Entry::Paged(handle) => match handle.fetch() {
217                    Ok(chunks) => self.pending.extend(chunks),
218                    Err(_) => panic!("Fetch::fetch failed; retry path not yet wired"),
219                },
220            }
221        }
222    }
223}