int_interval_stack/int_co_stack/impls_for_construction.rs
1use super::*;
2
3use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
4
5const BATCH_SIZE: usize = 128;
6
7impl<I> Default for IntCOStack<I>
8where
9 I: IntCO,
10{
11 #[inline]
12 fn default() -> Self {
13 Self {
14 change_points: Arc::from([]),
15 covered: OnceLock::default(),
16 height_stats: HeightStats::default(),
17 }
18 }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum EndpointKind {
23 Enter,
24 Leave,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28struct Endpoint<C> {
29 at: C,
30 kind: EndpointKind,
31}
32
33#[derive(Debug, Default)]
34struct StackParts<C>
35where
36 C: Default,
37{
38 points: Vec<ChangePoint<C>>,
39 height_stats: HeightStats,
40}
41
42/// Builds a canonical stack-height function from raw interval endpoint events.
43///
44/// Each half-open interval contributes two endpoint events:
45///
46/// ```text
47/// [start, end) -> Enter at start, Leave at end
48/// ```
49///
50/// Events at the same coordinate are applied together because a half-open
51/// boundary may contain both intervals ending and intervals starting. Only
52/// the resulting net height matters for the canonical representation.
53///
54/// The returned change points describe a piecewise-constant height function:
55/// after each `at`, the active interval count becomes `height_after`.
56///
57/// # Input assumptions
58///
59/// The endpoint events must originate from valid finite half-open intervals.
60/// Consequently:
61///
62/// - the accumulated height never becomes negative;
63/// - after all events are consumed, the accumulated height returns to zero.
64///
65/// # Canonical output
66///
67/// The returned points satisfy:
68///
69/// - coordinates are strictly increasing;
70/// - adjacent points always have different `height_after` values;
71/// - redundant coordinates whose events cause no net height change are omitted.
72///
73/// # Complexity
74///
75/// Sorting dominates the computation: `O(n log n)` time for `n` endpoints.
76/// The output allocates at most `n` change points.
77fn build_parts_from_endpoints<C>(mut endpoints: Vec<Endpoint<C>>) -> StackParts<C>
78where
79 C: Default + Copy + Ord,
80{
81 // Ordered endpoints allow all events at the same coordinate to be
82 // consumed together and emitted as at most one canonical change point.
83 endpoints.sort_unstable_by_key(|endpoint| endpoint.at);
84
85 let mut points = Vec::with_capacity(endpoints.len());
86 let mut height_stats = HeightStats::default();
87
88 // Height of the piecewise-constant function immediately before the next
89 // unprocessed coordinate. The height before the first endpoint is zero.
90 let mut height_after = 0usize;
91 let mut cursor = 0usize;
92
93 while cursor < endpoints.len() {
94 let at = endpoints[cursor].at;
95 let mut enters = 0usize;
96 let mut leaves = 0usize;
97
98 // Apply every boundary event at `at` atomically. For half-open
99 // intervals, intervals leaving at `at` are already inactive there,
100 // while intervals entering at `at` are active from there onward.
101 while cursor < endpoints.len() && endpoints[cursor].at == at {
102 match endpoints[cursor].kind {
103 EndpointKind::Enter => enters += 1,
104 EndpointKind::Leave => leaves += 1,
105 }
106 cursor += 1;
107 }
108
109 // Compute the height on the segment beginning at `at`. The split
110 // between addition and subtraction keeps the stored height unsigned
111 // while still detecting malformed events that would go below zero.
112 let next_height = if enters >= leaves {
113 height_after.checked_add(enters - leaves)
114 } else {
115 height_after.checked_sub(leaves - enters)
116 }
117 .expect("valid intervals must never produce a negative stack height");
118
119 height_stats.observe(next_height);
120
121 // Emit only real changes of the height function. Equal numbers of
122 // entering and leaving layers at the same coordinate cancel out and
123 // must not create a redundant canonical boundary.
124 if next_height != height_after {
125 points.push(ChangePoint {
126 at,
127 height_after: next_height,
128 });
129 }
130
131 height_after = next_height;
132 }
133
134 // Every finite half-open interval contributes one enter and one leave, so
135 // a complete valid event stream must end outside all intervals.
136 debug_assert_eq!(
137 height_after, 0,
138 "all finite half-open intervals must eventually close"
139 );
140
141 StackParts {
142 points,
143 height_stats,
144 }
145}
146
147/// Merges two canonical change-point sequences by adding their stack heights.
148///
149/// Each input slice represents a piecewise-constant stack-height function:
150/// after a change point at `at`, the function takes the value
151/// `height_after` until the next change point.
152///
153/// The merged sequence represents the pointwise sum of the two functions:
154///
155/// ```text
156/// merged_height(x) = lhs_height(x) + rhs_height(x)
157/// ```
158///
159/// Change points that would not change the merged height are omitted. This
160/// preserves the canonical representation, including cases where a boundary
161/// in one input is exactly cancelled by a boundary in the other input.
162///
163/// # Input invariants
164///
165/// Both input slices must be canonical:
166///
167/// - change points are ordered by strictly increasing coordinates;
168/// - adjacent change points have different `height_after` values;
169/// - the height before the first change point is zero;
170/// - the final change point, when present, restores the height to zero.
171///
172/// # Panics
173///
174/// Panics if the sum of the two active heights exceeds [`usize::MAX`].
175///
176/// # Complexity
177///
178/// Runs in `O(lhs.len() + rhs.len())` time and allocates at most
179/// `lhs.len() + rhs.len()` output change points.
180fn merge_parts<C>(lhs: &StackParts<C>, rhs: &StackParts<C>) -> StackParts<C>
181where
182 C: Default + Copy + Ord,
183{
184 let lhs_points_len = lhs.points.len();
185 let rhs_points_len = rhs.points.len();
186
187 let mut out_points = Vec::with_capacity(lhs_points_len + rhs_points_len);
188 let mut out_stats = HeightStats::default();
189
190 let mut lhs_height = 0usize;
191 let mut rhs_height = 0usize;
192 let mut merged_height = 0usize;
193
194 let mut lhs_cursor = 0usize;
195 let mut rhs_cursor = 0usize;
196
197 while lhs_cursor < lhs_points_len || rhs_cursor < rhs_points_len {
198 let at = match (lhs.points.get(lhs_cursor), rhs.points.get(rhs_cursor)) {
199 (Some(l), Some(r)) => match l.at.cmp(&r.at) {
200 // Only `lhs` changes at this coordinate; keep the current
201 // height contributed by `rhs`.
202 std::cmp::Ordering::Less => {
203 lhs_height = l.height_after;
204 lhs_cursor += 1;
205 l.at
206 }
207
208 // Only `rhs` changes at this coordinate; keep the current
209 // height contributed by `lhs`.
210 std::cmp::Ordering::Greater => {
211 rhs_height = r.height_after;
212 rhs_cursor += 1;
213 r.at
214 }
215
216 // Both functions change at the same coordinate. Apply both
217 // changes before computing the merged height after `at`.
218 std::cmp::Ordering::Equal => {
219 lhs_height = l.height_after;
220 rhs_height = r.height_after;
221 lhs_cursor += 1;
222 rhs_cursor += 1;
223 l.at
224 }
225 },
226
227 // `rhs` has been exhausted; append the remaining changes from
228 // `lhs` while preserving the final height contributed by `rhs`.
229 (Some(l), None) => {
230 lhs_height = l.height_after;
231 lhs_cursor += 1;
232 l.at
233 }
234
235 // `lhs` has been exhausted; append the remaining changes from
236 // `rhs` while preserving the final height contributed by `lhs`.
237 (None, Some(r)) => {
238 rhs_height = r.height_after;
239 rhs_cursor += 1;
240 r.at
241 }
242
243 // The loop condition guarantees that at least one input still
244 // contains an unprocessed change point.
245 (None, None) => unreachable!(),
246 };
247
248 let next_merged_height = lhs_height
249 .checked_add(rhs_height)
250 .expect("stack height overflow");
251
252 out_stats.observe(next_merged_height);
253
254 // A coordinate belongs to the canonical output exactly when the
255 // pointwise sum changes its value at that coordinate.
256 if next_merged_height != merged_height {
257 out_points.push(ChangePoint {
258 at,
259 height_after: next_merged_height,
260 });
261 merged_height = next_merged_height;
262 }
263 }
264
265 debug_assert_eq!(merged_height, 0);
266
267 StackParts {
268 points: out_points,
269 height_stats: out_stats,
270 }
271}
272
273#[derive(Debug)]
274struct StackBuildAcc<C>
275where
276 C: Default + Copy + Ord,
277{
278 endpoints: Vec<Endpoint<C>>,
279 levels: Vec<Option<StackParts<C>>>,
280}
281
282impl<C> StackBuildAcc<C>
283where
284 C: Default + Copy + Ord,
285{
286 #[inline]
287 fn new() -> Self {
288 Self {
289 // Buffer one bounded batch of raw interval boundary events.
290 //
291 // Each interval contributes two endpoints, so the endpoint
292 // capacity is twice the interval batch size.
293 endpoints: Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
294
295 // Balanced partial canonical change-point sequences.
296 //
297 // This acts like a binary carry chain: inserting into an occupied
298 // level merges two equal-rank partial results and carries the
299 // merged sequence upward.
300 levels: Vec::new(),
301 }
302 }
303
304 #[inline]
305 fn push_interval<I>(&mut self, interval: I)
306 where
307 I: IntCO<CoordType = C>,
308 {
309 // Convert one half-open interval into raw boundary events:
310 // [start, end) -> Enter at start, Leave at end.
311 self.endpoints.push(Endpoint {
312 at: interval.start(),
313 kind: EndpointKind::Enter,
314 });
315 self.endpoints.push(Endpoint {
316 at: interval.end_excl(),
317 kind: EndpointKind::Leave,
318 });
319
320 // Once the local endpoint buffer reaches one full batch, compact it
321 // into canonical change points and insert it into the level chain.
322 if self.endpoints.len() >= BATCH_SIZE.saturating_mul(2) {
323 self.flush();
324 }
325 }
326
327 #[inline]
328 fn finish(mut self) -> StackParts<C> {
329 // Make sure the final partial batch is included.
330 self.flush();
331
332 // Merge all remaining partial sequences into one canonical stack.
333 // Empty input naturally produces an empty change-point sequence.
334 self.levels
335 .into_iter()
336 .flatten()
337 .reduce(|lhs, rhs| merge_parts(&lhs, &rhs))
338 .unwrap_or_default()
339 }
340
341 #[inline]
342 fn flush(&mut self) {
343 if self.endpoints.is_empty() {
344 return;
345 }
346
347 // Move out the current endpoint batch while leaving a fresh buffer for
348 // subsequent pushes. The old buffer is consumed by sorting and
349 // canonicalization.
350 let endpoints = core::mem::replace(
351 &mut self.endpoints,
352 Vec::with_capacity(BATCH_SIZE.saturating_mul(2)),
353 );
354
355 // Canonicalize this endpoint batch, then insert the resulting
356 // stack-height function into the balanced level chain.
357 self.push_points(build_parts_from_endpoints(endpoints));
358 }
359
360 fn push_points(&mut self, mut carry: StackParts<C>) {
361 let mut level = 0usize;
362
363 loop {
364 if level == self.levels.len() {
365 // No level exists yet, so append a new top level.
366 self.levels.push(Some(carry));
367 break;
368 }
369
370 match self.levels[level].take() {
371 // Empty level: store the current carry here.
372 None => {
373 self.levels[level] = Some(carry);
374 break;
375 }
376
377 // Occupied level: merge two equal-rank canonical sequences and
378 // carry the combined result into the next level.
379 Some(parts) => {
380 carry = merge_parts(&parts, &carry);
381 level += 1;
382 }
383 }
384 }
385 }
386}
387
388impl<I> FromIterator<I> for IntCOStack<I>
389where
390 I: IntCO + Copy,
391{
392 #[inline]
393 fn from_iter<T>(iter: T) -> Self
394 where
395 T: IntoIterator<Item = I>,
396 {
397 let mut acc = StackBuildAcc::new();
398
399 for interval in iter {
400 acc.push_interval(interval);
401 }
402
403 let StackParts {
404 points,
405 height_stats,
406 } = acc.finish();
407
408 Self {
409 change_points: points.into(),
410 covered: OnceLock::new(),
411 height_stats,
412 }
413 }
414}
415
416impl<I> FromParallelIterator<I> for IntCOStack<I>
417where
418 I: IntCO + Copy + Send,
419{
420 /// Builds a stack from intervals in parallel.
421 ///
422 /// Each worker accumulates endpoint-derived stack parts locally. The final
423 /// reduction merges those parts into one canonical height function.
424 ///
425 /// The covered set is not built during construction. It is a derived cache
426 /// and is initialized lazily when the covered view is first requested.
427 #[inline]
428 fn from_par_iter<T>(par_iter: T) -> Self
429 where
430 T: IntoParallelIterator<Item = I>,
431 {
432 let StackParts {
433 points,
434 height_stats,
435 } = par_iter
436 .into_par_iter()
437 .fold(StackBuildAcc::new, |mut acc, interval| {
438 acc.push_interval(interval);
439 acc
440 })
441 .map(StackBuildAcc::finish)
442 .reduce(StackParts::default, |lhs, rhs| merge_parts(&lhs, &rhs));
443
444 Self {
445 change_points: points.into(),
446 covered: OnceLock::new(),
447 height_stats,
448 }
449 }
450}
451
452#[cfg(test)]
453pub(crate) mod test_support;
454
455#[cfg(test)]
456mod tests_for_build_parts_from_endpoints;
457
458#[cfg(test)]
459mod tests_for_merge_parts;
460
461#[cfg(test)]
462mod tests_for_stack_build_acc;
463
464#[cfg(test)]
465mod tests_for_from_iter_and_from_par_iter;