Skip to main content

int_interval_stack/int_co_stack/
impls_for_construction.rs

1use super::*;
2
3use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
4
5const BATCH_SIZE: usize = 128;
6
7impl<I> Default for IntCOStack<I>
8where
9    I: IntCO,
10{
11    #[inline]
12    fn default() -> Self {
13        Self {
14            change_points: Arc::from([]),
15            covered: IntCOSet::default(),
16            height_stats: StackHeightStats::default(),
17        }
18    }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum EndpointKind {
23    Enter,
24    Leave,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28struct Endpoint<C> {
29    at: C,
30    kind: EndpointKind,
31}
32
33#[derive(Debug, Default)]
34struct StackParts<C>
35where
36    C: Default,
37{
38    points: Vec<ChangePoint<C>>,
39    height_stats: StackHeightStats,
40}
41
42/// Builds a canonical stack-height function from raw interval endpoint events.
43///
44/// Each half-open interval contributes two endpoint events:
45///
46/// ```text
47/// [start, end) -> Enter at start, Leave at end
48/// ```
49///
50/// Events at the same coordinate are applied together because a half-open
51/// boundary may contain both intervals ending and intervals starting. Only
52/// the resulting net height matters for the canonical representation.
53///
54/// The returned change points describe a piecewise-constant height function:
55/// after each `at`, the active interval count becomes `height_after`.
56///
57/// # Input assumptions
58///
59/// The endpoint events must originate from valid finite half-open intervals.
60/// Consequently:
61///
62/// - the accumulated height never becomes negative;
63/// - after all events are consumed, the accumulated height returns to zero.
64///
65/// # Canonical output
66///
67/// The returned points satisfy:
68///
69/// - coordinates are strictly increasing;
70/// - adjacent points always have different `height_after` values;
71/// - redundant coordinates whose events cause no net height change are omitted.
72///
73/// # Complexity
74///
75/// Sorting dominates the computation: `O(n log n)` time for `n` endpoints.
76/// The output allocates at most `n` change points.
77fn build_parts_from_endpoints<C>(mut endpoints: Vec<Endpoint<C>>) -> StackParts<C>
78where
79    C: Default + Copy + Ord,
80{
81    // Ordered endpoints allow all events at the same coordinate to be
82    // consumed together and emitted as at most one canonical change point.
83    endpoints.sort_unstable_by_key(|endpoint| endpoint.at);
84
85    let mut points = Vec::with_capacity(endpoints.len());
86    let mut height_stats = StackHeightStats::default();
87
88    // Height of the piecewise-constant function immediately before the next
89    // unprocessed coordinate. The height before the first endpoint is zero.
90    let mut height_after = 0usize;
91    let mut cursor = 0usize;
92
93    while cursor < endpoints.len() {
94        let at = endpoints[cursor].at;
95        let mut enters = 0usize;
96        let mut leaves = 0usize;
97
98        // Apply every boundary event at `at` atomically. For half-open
99        // intervals, intervals leaving at `at` are already inactive there,
100        // while intervals entering at `at` are active from there onward.
101        while cursor < endpoints.len() && endpoints[cursor].at == at {
102            match endpoints[cursor].kind {
103                EndpointKind::Enter => enters += 1,
104                EndpointKind::Leave => leaves += 1,
105            }
106            cursor += 1;
107        }
108
109        // Compute the height on the segment beginning at `at`. The split
110        // between addition and subtraction keeps the stored height unsigned
111        // while still detecting malformed events that would go below zero.
112        let next_height = if enters >= leaves {
113            height_after.checked_add(enters - leaves)
114        } else {
115            height_after.checked_sub(leaves - enters)
116        }
117        .expect("valid intervals must never produce a negative stack height");
118
119        height_stats.observe(next_height);
120
121        // Emit only real changes of the height function. Equal numbers of
122        // entering and leaving layers at the same coordinate cancel out and
123        // must not create a redundant canonical boundary.
124        if next_height != height_after {
125            points.push(ChangePoint {
126                at,
127                height_after: next_height,
128            });
129        }
130
131        height_after = next_height;
132    }
133
134    // Every finite half-open interval contributes one enter and one leave, so
135    // a complete valid event stream must end outside all intervals.
136    debug_assert_eq!(
137        height_after, 0,
138        "all finite half-open intervals must eventually close"
139    );
140
141    StackParts {
142        points,
143        height_stats,
144    }
145}
146
147/// Merges two canonical change-point sequences by adding their stack heights.
148///
149/// Each input slice represents a piecewise-constant stack-height function:
150/// after a change point at `at`, the function takes the value
151/// `height_after` until the next change point.
152///
153/// The merged sequence represents the pointwise sum of the two functions:
154///
155/// ```text
156/// merged_height(x) = lhs_height(x) + rhs_height(x)
157/// ```
158///
159/// Change points that would not change the merged height are omitted. This
160/// preserves the canonical representation, including cases where a boundary
161/// in one input is exactly cancelled by a boundary in the other input.
162///
163/// # Input invariants
164///
165/// Both input slices must be canonical:
166///
167/// - change points are ordered by strictly increasing coordinates;
168/// - adjacent change points have different `height_after` values;
169/// - the height before the first change point is zero;
170/// - the final change point, when present, restores the height to zero.
171///
172/// # Panics
173///
174/// Panics if the sum of the two active heights exceeds [`usize::MAX`].
175///
176/// # Complexity
177///
178/// Runs in `O(lhs.len() + rhs.len())` time and allocates at most
179/// `lhs.len() + rhs.len()` output change points.
180fn merge_parts<C>(lhs: &StackParts<C>, rhs: &StackParts<C>) -> StackParts<C>
181where
182    C: Default + Copy + Ord,
183{
184    let lhs_points_len = lhs.points.len();
185    let rhs_points_len = rhs.points.len();
186
187    let mut out_points = Vec::with_capacity(lhs_points_len + rhs_points_len);
188    let mut out_stats = StackHeightStats::default();
189
190    let mut lhs_height = 0usize;
191    let mut rhs_height = 0usize;
192    let mut merged_height = 0usize;
193
194    let mut lhs_cursor = 0usize;
195    let mut rhs_cursor = 0usize;
196
197    while lhs_cursor < lhs_points_len || rhs_cursor < rhs_points_len {
198        let at = match (lhs.points.get(lhs_cursor), rhs.points.get(rhs_cursor)) {
199            // Only `lhs` changes at this coordinate; keep the current
200            // height contributed by `rhs`.
201            (Some(l), Some(r)) if l.at < r.at => {
202                lhs_height = l.height_after;
203                lhs_cursor += 1;
204                l.at
205            }
206
207            // Only `rhs` changes at this coordinate; keep the current
208            // height contributed by `lhs`.
209            (Some(l), Some(r)) if r.at < l.at => {
210                rhs_height = r.height_after;
211                rhs_cursor += 1;
212                r.at
213            }
214
215            // Both functions change at the same coordinate. Apply both
216            // changes before computing the merged height after `at`.
217            (Some(l), Some(r)) => {
218                lhs_height = l.height_after;
219                rhs_height = r.height_after;
220                lhs_cursor += 1;
221                rhs_cursor += 1;
222                l.at
223            }
224
225            // `rhs` has been exhausted; append the remaining changes from
226            // `lhs` while preserving the final height contributed by `rhs`.
227            (Some(l), None) => {
228                lhs_height = l.height_after;
229                lhs_cursor += 1;
230                l.at
231            }
232
233            // `lhs` has been exhausted; append the remaining changes from
234            // `rhs` while preserving the final height contributed by `lhs`.
235            (None, Some(r)) => {
236                rhs_height = r.height_after;
237                rhs_cursor += 1;
238                r.at
239            }
240
241            // The loop condition guarantees that at least one input still
242            // contains an unprocessed change point.
243            (None, None) => unreachable!(),
244        };
245
246        let next_merged_height = lhs_height
247            .checked_add(rhs_height)
248            .expect("stack height overflow");
249
250        out_stats.observe(next_merged_height);
251
252        // A coordinate belongs to the canonical output exactly when the
253        // pointwise sum changes its value at that coordinate.
254        if next_merged_height != merged_height {
255            out_points.push(ChangePoint {
256                at,
257                height_after: next_merged_height,
258            });
259            merged_height = next_merged_height;
260        }
261    }
262
263    debug_assert_eq!(merged_height, 0);
264
265    StackParts {
266        points: out_points,
267        height_stats: out_stats,
268    }
269}
270
271#[derive(Debug)]
272struct StackBuildAcc<C>
273where
274    C: Default + Copy + Ord,
275{
276    endpoints: Vec<Endpoint<C>>,
277    levels: Vec<Option<StackParts<C>>>,
278}
279
280impl<C> StackBuildAcc<C>
281where
282    C: Default + Copy + Ord,
283{
284    #[inline]
285    fn new() -> Self {
286        Self {
287            // Buffer one bounded batch of raw interval boundary events.
288            //
289            // Each interval contributes two endpoints, so the endpoint
290            // capacity is twice the interval batch size.
291            endpoints: Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
292
293            // Balanced partial canonical change-point sequences.
294            //
295            // This acts like a binary carry chain: inserting into an occupied
296            // level merges two equal-rank partial results and carries the
297            // merged sequence upward.
298            levels: Vec::new(),
299        }
300    }
301
302    #[inline]
303    fn push_interval<I>(&mut self, interval: I)
304    where
305        I: IntCO<CoordType = C>,
306    {
307        // Convert one half-open interval into raw boundary events:
308        // [start, end) -> Enter at start, Leave at end.
309        self.endpoints.push(Endpoint {
310            at: interval.start(),
311            kind: EndpointKind::Enter,
312        });
313        self.endpoints.push(Endpoint {
314            at: interval.end_excl(),
315            kind: EndpointKind::Leave,
316        });
317
318        // Once the local endpoint buffer reaches one full batch, compact it
319        // into canonical change points and insert it into the level chain.
320        if self.endpoints.len() >= BATCH_SIZE.saturating_mul(2) {
321            self.flush();
322        }
323    }
324
325    #[inline]
326    fn finish(mut self) -> StackParts<C> {
327        // Make sure the final partial batch is included.
328        self.flush();
329
330        // Merge all remaining partial sequences into one canonical stack.
331        // Empty input naturally produces an empty change-point sequence.
332        self.levels
333            .into_iter()
334            .flatten()
335            .reduce(|lhs, rhs| merge_parts(&lhs, &rhs))
336            .unwrap_or_default()
337    }
338
339    #[inline]
340    fn flush(&mut self) {
341        if self.endpoints.is_empty() {
342            return;
343        }
344
345        // Move out the current endpoint batch while leaving a fresh buffer for
346        // subsequent pushes. The old buffer is consumed by sorting and
347        // canonicalization.
348        let endpoints = core::mem::replace(
349            &mut self.endpoints,
350            Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
351        );
352
353        // Canonicalize this endpoint batch, then insert the resulting
354        // stack-height function into the balanced level chain.
355        self.push_points(build_parts_from_endpoints(endpoints));
356    }
357
358    fn push_points(&mut self, mut carry: StackParts<C>) {
359        let mut level = 0usize;
360
361        loop {
362            if level == self.levels.len() {
363                // No level exists yet, so append a new top level.
364                self.levels.push(Some(carry));
365                break;
366            }
367
368            match self.levels[level].take() {
369                // Empty level: store the current carry here.
370                None => {
371                    self.levels[level] = Some(carry);
372                    break;
373                }
374
375                // Occupied level: merge two equal-rank canonical sequences and
376                // carry the combined result into the next level.
377                Some(parts) => {
378                    carry = merge_parts(&parts, &carry);
379                    level += 1;
380                }
381            }
382        }
383    }
384}
385
386impl<I> FromIterator<I> for IntCOStack<I>
387where
388    I: IntCO + Copy,
389{
390    #[inline]
391    fn from_iter<T>(iter: T) -> Self
392    where
393        T: IntoIterator<Item = I>,
394    {
395        let mut acc = StackBuildAcc::new();
396
397        let covered = iter
398            .into_iter()
399            .inspect(|interval| acc.push_interval(*interval))
400            .collect::<IntCOSet<I>>();
401
402        let StackParts {
403            points,
404            height_stats,
405        } = acc.finish();
406
407        Self {
408            change_points: points.into(),
409            covered,
410            height_stats,
411        }
412    }
413}
414
415/// Projects the covered interval set from canonical stack change points.
416///
417/// A stack change-point sequence describes a piecewise-constant height
418/// function. The covered set is exactly the union of all coordinate ranges
419/// whose height is positive.
420///
421/// This function scans positive-height runs:
422///
423/// ```text
424/// height: 0 -> positive    opens a covered interval
425/// height: positive -> 0    closes a covered interval
426/// positive -> positive     keeps the current covered interval open
427/// ```
428///
429/// # Input invariants
430///
431/// `points` must be the canonical output of stack construction:
432///
433/// - coordinates are strictly increasing;
434/// - adjacent points have different `height_after` values;
435/// - the final height, when non-empty, is zero.
436///
437/// # Output
438///
439/// The returned set is canonical. Positive-height runs are emitted in
440/// ascending order and are separated by zero-height gaps.
441#[inline]
442fn covered_from_change_points<I>(points: &[ChangePoint<I::CoordType>]) -> IntCOSet<I>
443where
444    I: IntCO,
445{
446    let mut out = Vec::new();
447    let mut start = None;
448
449    for p in points {
450        match (start, p.height_after) {
451            (None, h) if h > 0 => {
452                start = Some(p.at);
453            }
454            (Some(s), 0) => {
455                // SAFETY:
456                // Canonical change points are strictly increasing, and a
457                // positive-height run can only close at a later coordinate.
458                out.push(unsafe { I::new_unchecked(s, p.at) });
459                start = None;
460            }
461            _ => {}
462        }
463    }
464
465    debug_assert!(
466        start.is_none(),
467        "canonical stack change points must end at zero height"
468    );
469
470    // SAFETY:
471    // Positive-height runs are emitted in ascending order and are separated by
472    // zero-height gaps, so the result is canonical.
473    unsafe { IntCOSet::new_unchecked(out) }
474}
475
476impl<I> FromParallelIterator<I> for IntCOStack<I>
477where
478    I: IntCO + Copy + Send,
479{
480    /// Builds a stack from intervals in parallel.
481    ///
482    /// Each worker accumulates endpoint-derived stack parts locally. The final
483    /// reduction merges those parts into one canonical height function.
484    ///
485    /// The covered set is then projected from the final change points instead
486    /// of being built from a second raw interval collection. This keeps the
487    /// stack and its covered view derived from the same canonical height
488    /// function and avoids retaining all input intervals during reduction.
489    #[inline]
490    fn from_par_iter<T>(par_iter: T) -> Self
491    where
492        T: IntoParallelIterator<Item = I>,
493    {
494        let StackParts {
495            points,
496            height_stats,
497        } = par_iter
498            .into_par_iter()
499            .fold(StackBuildAcc::new, |mut acc, interval| {
500                acc.push_interval(interval);
501                acc
502            })
503            .map(StackBuildAcc::finish)
504            .reduce(StackParts::default, |lhs, rhs| merge_parts(&lhs, &rhs));
505
506        let covered = covered_from_change_points::<I>(&points);
507
508        Self {
509            change_points: points.into(),
510            covered,
511            height_stats,
512        }
513    }
514}
515
516#[cfg(test)]
517pub(crate) mod test_support;
518
519#[cfg(test)]
520mod tests_for_build_parts_from_endpoints;
521
522#[cfg(test)]
523mod tests_for_merge_parts;
524
525#[cfg(test)]
526mod tests_for_stack_build_acc;
527
528#[cfg(test)]
529mod tests_for_from_iter_and_from_par_iter;
530
531#[cfg(test)]
532mod tests_for_covered_from_change_points;