Skip to main content

int_interval_stack/int_co_stack/
impls_for_construction.rs

1use super::*;
2
3use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
4
5const BATCH_SIZE: usize = 128;
6
7impl<I> Default for IntCOStack<I>
8where
9    I: IntCO,
10{
11    #[inline]
12    fn default() -> Self {
13        Self {
14            points: Arc::from([]),
15        }
16    }
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20enum EndpointKind {
21    Enter,
22    Leave,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26struct Endpoint<C> {
27    at: C,
28    kind: EndpointKind,
29}
30
31/// Builds a canonical stack-height function from raw interval endpoint events.
32///
33/// Each half-open interval contributes two endpoint events:
34///
35/// ```text
36/// [start, end) -> Enter at start, Leave at end
37/// ```
38///
39/// Events at the same coordinate are applied together because a half-open
40/// boundary may contain both intervals ending and intervals starting. Only
41/// the resulting net height matters for the canonical representation.
42///
43/// The returned change points describe a piecewise-constant height function:
44/// after each `at`, the active interval count becomes `height_after`.
45///
46/// # Input assumptions
47///
48/// The endpoint events must originate from valid finite half-open intervals.
49/// Consequently:
50///
51/// - the accumulated height never becomes negative;
52/// - after all events are consumed, the accumulated height returns to zero.
53///
54/// # Canonical output
55///
56/// The returned points satisfy:
57///
58/// - coordinates are strictly increasing;
59/// - adjacent points always have different `height_after` values;
60/// - redundant coordinates whose events cause no net height change are omitted.
61///
62/// # Complexity
63///
64/// Sorting dominates the computation: `O(n log n)` time for `n` endpoints.
65/// The output allocates at most `n` change points.
66fn build_points_from_endpoints<C>(mut endpoints: Vec<Endpoint<C>>) -> Vec<ChangePoint<C>>
67where
68    C: Copy + Ord,
69{
70    // Ordered endpoints allow all events at the same coordinate to be
71    // consumed together and emitted as at most one canonical change point.
72    endpoints.sort_unstable_by_key(|endpoint| endpoint.at);
73
74    let mut points = Vec::with_capacity(endpoints.len());
75
76    // Height of the piecewise-constant function immediately before the next
77    // unprocessed coordinate. The height before the first endpoint is zero.
78    let mut height_after = 0usize;
79    let mut cursor = 0usize;
80
81    while cursor < endpoints.len() {
82        let at = endpoints[cursor].at;
83        let mut enters = 0usize;
84        let mut leaves = 0usize;
85
86        // Apply every boundary event at `at` atomically. For half-open
87        // intervals, intervals leaving at `at` are already inactive there,
88        // while intervals entering at `at` are active from there onward.
89        while cursor < endpoints.len() && endpoints[cursor].at == at {
90            match endpoints[cursor].kind {
91                EndpointKind::Enter => enters += 1,
92                EndpointKind::Leave => leaves += 1,
93            }
94            cursor += 1;
95        }
96
97        // Compute the height on the segment beginning at `at`. The split
98        // between addition and subtraction keeps the stored height unsigned
99        // while still detecting malformed events that would go below zero.
100        let next_height = if enters >= leaves {
101            height_after.checked_add(enters - leaves)
102        } else {
103            height_after.checked_sub(leaves - enters)
104        }
105        .expect("valid intervals must never produce a negative stack height");
106
107        // Emit only real changes of the height function. Equal numbers of
108        // entering and leaving layers at the same coordinate cancel out and
109        // must not create a redundant canonical boundary.
110        if next_height != height_after {
111            points.push(ChangePoint {
112                at,
113                height_after: next_height,
114            });
115        }
116
117        height_after = next_height;
118    }
119
120    // Every finite half-open interval contributes one enter and one leave, so
121    // a complete valid event stream must end outside all intervals.
122    debug_assert_eq!(
123        height_after, 0,
124        "all finite half-open intervals must eventually close"
125    );
126
127    points
128}
129
130#[cfg(test)]
131mod tests_for_build_points_from_endpoints;
132
133/// Merges two canonical change-point sequences by adding their stack heights.
134///
135/// Each input slice represents a piecewise-constant stack-height function:
136/// after a change point at `at`, the function takes the value
137/// `height_after` until the next change point.
138///
139/// The merged sequence represents the pointwise sum of the two functions:
140///
141/// ```text
142/// merged_height(x) = lhs_height(x) + rhs_height(x)
143/// ```
144///
145/// Change points that would not change the merged height are omitted. This
146/// preserves the canonical representation, including cases where a boundary
147/// in one input is exactly cancelled by a boundary in the other input.
148///
149/// # Input invariants
150///
151/// Both input slices must be canonical:
152///
153/// - change points are ordered by strictly increasing coordinates;
154/// - adjacent change points have different `height_after` values;
155/// - the height before the first change point is zero;
156/// - the final change point, when present, restores the height to zero.
157///
158/// # Panics
159///
160/// Panics if the sum of the two active heights exceeds [`usize::MAX`].
161///
162/// # Complexity
163///
164/// Runs in `O(lhs.len() + rhs.len())` time and allocates at most
165/// `lhs.len() + rhs.len()` output change points.
166fn merge_points<C>(lhs: &[ChangePoint<C>], rhs: &[ChangePoint<C>]) -> Vec<ChangePoint<C>>
167where
168    C: Copy + Ord,
169{
170    let mut out = Vec::with_capacity(lhs.len() + rhs.len());
171    let mut lhs_height = 0usize;
172    let mut rhs_height = 0usize;
173    let mut merged_height = 0usize;
174    let mut lhs_cursor = 0usize;
175    let mut rhs_cursor = 0usize;
176
177    while lhs_cursor < lhs.len() || rhs_cursor < rhs.len() {
178        let at = match (lhs.get(lhs_cursor), rhs.get(rhs_cursor)) {
179            // Only `lhs` changes at this coordinate; keep the current
180            // height contributed by `rhs`.
181            (Some(l), Some(r)) if l.at < r.at => {
182                lhs_height = l.height_after;
183                lhs_cursor += 1;
184                l.at
185            }
186
187            // Only `rhs` changes at this coordinate; keep the current
188            // height contributed by `lhs`.
189            (Some(l), Some(r)) if r.at < l.at => {
190                rhs_height = r.height_after;
191                rhs_cursor += 1;
192                r.at
193            }
194
195            // Both functions change at the same coordinate. Apply both
196            // changes before computing the merged height after `at`.
197            (Some(l), Some(r)) => {
198                lhs_height = l.height_after;
199                rhs_height = r.height_after;
200                lhs_cursor += 1;
201                rhs_cursor += 1;
202                l.at
203            }
204
205            // `rhs` has been exhausted; append the remaining changes from
206            // `lhs` while preserving the final height contributed by `rhs`.
207            (Some(l), None) => {
208                lhs_height = l.height_after;
209                lhs_cursor += 1;
210                l.at
211            }
212
213            // `lhs` has been exhausted; append the remaining changes from
214            // `rhs` while preserving the final height contributed by `lhs`.
215            (None, Some(r)) => {
216                rhs_height = r.height_after;
217                rhs_cursor += 1;
218                r.at
219            }
220
221            // The loop condition guarantees that at least one input still
222            // contains an unprocessed change point.
223            (None, None) => unreachable!(),
224        };
225
226        let next_merged_height = lhs_height
227            .checked_add(rhs_height)
228            .expect("stack height overflow");
229
230        // A coordinate belongs to the canonical output exactly when the
231        // pointwise sum changes its value at that coordinate.
232        if next_merged_height != merged_height {
233            out.push(ChangePoint {
234                at,
235                height_after: next_merged_height,
236            });
237            merged_height = next_merged_height;
238        }
239    }
240
241    debug_assert_eq!(merged_height, 0);
242
243    out
244}
245
246#[cfg(test)]
247mod tests_for_merge_points;
248
249#[derive(Debug)]
250struct StackBuildAcc<C>
251where
252    C: Copy + Ord,
253{
254    endpoints: Vec<Endpoint<C>>,
255    levels: Vec<Option<Vec<ChangePoint<C>>>>,
256}
257
258impl<C> StackBuildAcc<C>
259where
260    C: Copy + Ord,
261{
262    #[inline]
263    fn new() -> Self {
264        Self {
265            // Buffer one bounded batch of raw interval boundary events.
266            //
267            // Each interval contributes two endpoints, so the endpoint
268            // capacity is twice the interval batch size.
269            endpoints: Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
270
271            // Balanced partial canonical change-point sequences.
272            //
273            // This acts like a binary carry chain: inserting into an occupied
274            // level merges two equal-rank partial results and carries the
275            // merged sequence upward.
276            levels: Vec::new(),
277        }
278    }
279
280    #[inline]
281    fn push_interval<I>(&mut self, interval: I)
282    where
283        I: IntCO<CoordType = C>,
284    {
285        // Convert one half-open interval into raw boundary events:
286        // [start, end) -> Enter at start, Leave at end.
287        self.endpoints.push(Endpoint {
288            at: interval.start(),
289            kind: EndpointKind::Enter,
290        });
291        self.endpoints.push(Endpoint {
292            at: interval.end_excl(),
293            kind: EndpointKind::Leave,
294        });
295
296        // Once the local endpoint buffer reaches one full batch, compact it
297        // into canonical change points and insert it into the level chain.
298        if self.endpoints.len() >= BATCH_SIZE.saturating_mul(2) {
299            self.flush();
300        }
301    }
302
303    #[inline]
304    fn finish(mut self) -> Vec<ChangePoint<C>> {
305        // Make sure the final partial batch is included.
306        self.flush();
307
308        // Merge all remaining partial sequences into one canonical stack.
309        // Empty input naturally produces an empty change-point sequence.
310        self.levels
311            .into_iter()
312            .flatten()
313            .reduce(|lhs, rhs| merge_points(&lhs, &rhs))
314            .unwrap_or_default()
315    }
316
317    #[inline]
318    fn flush(&mut self) {
319        if self.endpoints.is_empty() {
320            return;
321        }
322
323        // Move out the current endpoint batch while leaving a fresh buffer for
324        // subsequent pushes. The old buffer is consumed by sorting and
325        // canonicalization.
326        let endpoints = core::mem::replace(
327            &mut self.endpoints,
328            Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
329        );
330
331        // Canonicalize this endpoint batch, then insert the resulting
332        // stack-height function into the balanced level chain.
333        self.push_points(build_points_from_endpoints(endpoints));
334    }
335
336    fn push_points(&mut self, mut carry: Vec<ChangePoint<C>>) {
337        let mut level = 0usize;
338
339        loop {
340            if level == self.levels.len() {
341                // No level exists yet, so append a new top level.
342                self.levels.push(Some(carry));
343                break;
344            }
345
346            match self.levels[level].take() {
347                // Empty level: store the current carry here.
348                None => {
349                    self.levels[level] = Some(carry);
350                    break;
351                }
352
353                // Occupied level: merge two equal-rank canonical sequences and
354                // carry the combined result into the next level.
355                Some(points) => {
356                    carry = merge_points(&points, &carry);
357                    level += 1;
358                }
359            }
360        }
361    }
362}
363
364#[cfg(test)]
365mod tests_for_stack_build_acc;
366
367impl<I> FromIterator<I> for IntCOStack<I>
368where
369    I: IntCO,
370{
371    #[inline]
372    fn from_iter<T>(iter: T) -> Self
373    where
374        T: IntoIterator<Item = I>,
375    {
376        let mut acc = StackBuildAcc::new();
377
378        for interval in iter {
379            acc.push_interval(interval);
380        }
381
382        Self {
383            points: acc.finish().into(),
384        }
385    }
386}
387
388impl<I> FromParallelIterator<I> for IntCOStack<I>
389where
390    I: IntCO + Send,
391    I::CoordType: Send,
392{
393    #[inline]
394    fn from_par_iter<T>(par_iter: T) -> Self
395    where
396        T: IntoParallelIterator<Item = I>,
397    {
398        let points = par_iter
399            .into_par_iter()
400            // Each Rayon worker builds a local canonical stack from its own
401            // input shard. This keeps endpoint sorting and batch compaction
402            // embarrassingly parallel.
403            .fold(StackBuildAcc::new, |mut acc, interval| {
404                acc.push_interval(interval);
405                acc
406            })
407            // Finalize each local builder before global reduction.
408            .map(StackBuildAcc::finish)
409            // Merge partial stack-height functions by pointwise addition.
410            .reduce(Vec::new, |lhs, rhs| merge_points(&lhs, &rhs));
411
412        Self {
413            points: points.into(),
414        }
415    }
416}
417#[cfg(test)]
418mod tests_for_from_iter_and_from_par_iter;