Skip to main content

invalidation/
drain.rs

1// Copyright 2025 the Invalidation Authors
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Topologically sorted drain iterator.
5//!
6//! For more advanced drain workflows (determinism, targeted drains, tracing,
7//! scratch reuse), prefer the builder-based API via
8//! [`InvalidationTracker::drain`](crate::InvalidationTracker::drain) / [`DrainBuilder`].
9
10use alloc::collections::BinaryHeap;
11use alloc::collections::VecDeque;
12use alloc::vec::Vec;
13use core::cmp::Reverse;
14use core::hash::Hash;
15
16use hashbrown::HashMap;
17use hashbrown::hash_map::Entry;
18
19use crate::DrainBuilder;
20use crate::channel::Channel;
21use crate::graph::InvalidationGraph;
22use crate::scratch::TraversalScratch;
23use crate::trace::InvalidationTrace;
24
25/// Trait for keys that can be used as dense `Vec` indices.
26///
27/// This is used by [`DrainSortedDeterministic`] to replace `HashMap`-based
28/// in-degree tracking with a `Vec` indexed by key, eliminating hashing from
29/// the hot path.
30///
31/// Keys must map to compact sequential `usize` indices (typically starting
32/// from 0). Sparse integer key spaces are not supported; very large indices
33/// panic if the required dense storage would exceed addressable capacity or
34/// the available memory budget. Use
35/// [`intern::Interner`](crate::intern::Interner) when your natural key space
36/// is not already compact.
37///
38/// [`InternId`](crate::InternId) and `u32` implement this trait.
39pub trait DenseKey: Copy {
40    /// Returns this key as a `usize` index for dense `Vec` storage.
41    fn index(self) -> usize;
42}
43
44impl DenseKey for u32 {
45    #[inline]
46    fn index(self) -> usize {
47        self as usize
48    }
49}
50
51impl DenseKey for usize {
52    #[inline]
53    fn index(self) -> usize {
54        self
55    }
56}
57
58/// Sentinel value indicating a key is not in the invalidation set.
59const DENSE_SENTINEL: u32 = u32::MAX;
60
61#[inline]
62pub(crate) fn prepare_dense_growth<T>(vec: &mut Vec<T>, idx: usize, storage: &str) -> usize {
63    let target_len = idx.checked_add(1).unwrap_or_else(|| {
64        panic!("DenseKey index {idx} overflows addressable capacity for {storage}")
65    });
66
67    if target_len > vec.len() {
68        vec.try_reserve_exact(target_len - vec.len()).unwrap_or_else(|err| {
69            panic!(
70                "DenseKey index {idx} requires growing {storage} to length {target_len}: {err:?}; use a compact dense key space or intern::Interner"
71            )
72        });
73    }
74
75    target_len
76}
77
78/// Indicates whether a drain finished normally or stalled due to a cycle.
79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
80pub enum DrainCompletion {
81    /// All reachable keys were yielded.
82    Complete,
83    /// The drain stalled: some keys remained with non-zero in-degree (cycle).
84    Stalled {
85        /// Number of keys that could not be yielded.
86        remaining: usize,
87    },
88}
89
90/// Iterator that yields invalidated keys in topological order.
91///
92/// Uses Kahn's algorithm to yield dependencies before their dependents.
93/// This ensures that when processing the invalidation set, a node is only processed
94/// after all of its dependencies have been processed.
95///
96/// # Algorithm
97///
98/// 1. Collect all invalidated keys and their in-degrees (within the invalidated subset).
99/// 2. Initialize a queue with keys that have no invalidated dependencies.
100/// 3. Repeatedly dequeue a key, yield it, and decrement in-degrees of its
101///    invalidated dependents. When a dependent's in-degree reaches zero, enqueue it.
102///
103/// # Performance
104///
105/// - Time complexity: O(V + E) where V is the number of invalidated keys and E is
106///   the number of edges between them.
107/// - Space complexity: O(V) for the in-degree map and queue.
108///
109/// # Important Notes
110///
111/// - **Duplicates**: If `invalidated_keys` contains duplicates, they are deduplicated
112///   internally. Each key is yielded at most once.
113/// - **Cycles**: This iterator assumes the dependency subgraph induced by the
114///   invalidated keys is acyclic (a DAG). If cycles exist, keys involved in cycles
115///   will never have their in-degree reach zero and will not be yielded.
116///   You can detect this by exhausting the iterator and then checking
117///   [`is_stalled`](Self::is_stalled) / [`completion`](Self::completion), or by
118///   using [`collect_with_completion`](Self::collect_with_completion).
119///   Use [`CycleHandling::Error`](crate::CycleHandling::Error) when building the graph
120///   to prevent cycles, or ensure cycles are intentional.
121/// - **Nondeterminism**: When multiple keys have in-degree zero simultaneously,
122///   the order among them depends on hash iteration order and is not guaranteed
123///   to be deterministic across runs or platforms.
124///
125/// # Example
126///
127/// ```
128/// use invalidation::{drain_sorted, Channel, CycleHandling, InvalidationGraph, InvalidationSet};
129///
130/// const LAYOUT: Channel = Channel::new(0);
131///
132/// let mut graph = InvalidationGraph::<u32>::new();
133/// // 1 <- 2 <- 3
134/// graph.add_dependency(2, 1, LAYOUT, CycleHandling::Error).unwrap();
135/// graph.add_dependency(3, 2, LAYOUT, CycleHandling::Error).unwrap();
136///
137/// let mut invalidated = InvalidationSet::new();
138/// invalidated.mark(1, LAYOUT);
139/// invalidated.mark(2, LAYOUT);
140/// invalidated.mark(3, LAYOUT);
141///
142/// let sorted: Vec<_> = drain_sorted(&mut invalidated, &graph, LAYOUT).collect();
143/// assert_eq!(sorted, vec![1, 2, 3]); // Dependencies before dependents
144/// ```
145#[derive(Debug)]
146pub struct DrainSorted<'a, K>
147where
148    K: Copy + Eq + Hash + DenseKey,
149{
150    graph: &'a InvalidationGraph<K>,
151    channel: Channel,
152    /// Keys with zero in-degree, ready to yield.
153    queue: VecDeque<K>,
154    /// Remaining in-degree for each invalidated key (within the invalidated subset).
155    in_degree: HashMap<K, usize>,
156    stalled: bool,
157}
158
159/// Deterministic variant of [`DrainSorted`].
160///
161/// When multiple keys are simultaneously ready, this drain yields the smallest
162/// key first (according to `Ord`).
163///
164/// Uses a dense `Vec<u32>` indexed by [`DenseKey::index`] instead of a
165/// `HashMap` for in-degree tracking, eliminating all hashing from the hot path.
166#[derive(Debug)]
167pub struct DrainSortedDeterministic<'a, K>
168where
169    K: Copy + Eq + Hash + Ord + DenseKey,
170{
171    graph: &'a InvalidationGraph<K>,
172    channel: Channel,
173    /// Keys with zero in-degree, ready to yield (min-heap via `Reverse`).
174    ready: BinaryHeap<Reverse<K>>,
175    /// In-degree for each key, indexed by `key.index()`.
176    /// `DENSE_SENTINEL` means the key is not in the invalidation set.
177    in_degree: Vec<u32>,
178    /// Number of keys remaining (replaces `HashMap::len()`).
179    remaining: usize,
180    stalled: bool,
181}
182
183impl<'a, K> DrainSorted<'a, K>
184where
185    K: Copy + Eq + Hash + DenseKey,
186{
187    pub(crate) fn from_iter_with_capacity<I>(
188        invalidated_keys: I,
189        cap: usize,
190        graph: &'a InvalidationGraph<K>,
191        channel: Channel,
192    ) -> Self
193    where
194        I: Iterator<Item = K>,
195    {
196        // Deduplicate input keys via the in-degree map keys.
197        let mut in_degree: HashMap<K, usize> = HashMap::with_capacity(cap);
198        let mut unique_keys = Vec::with_capacity(cap);
199        for key in invalidated_keys {
200            if let Entry::Vacant(e) = in_degree.entry(key) {
201                e.insert(0);
202                unique_keys.push(key);
203            }
204        }
205
206        // Compute in-degrees: for each invalidated key, count how many of its
207        // dependencies are also invalidated.
208        for &key in &unique_keys {
209            for dep in graph.dependencies(key, channel) {
210                if in_degree.contains_key(&dep) {
211                    *in_degree.get_mut(&key).expect("key is in in_degree") += 1;
212                }
213            }
214        }
215
216        // Initialize queue with keys that have no invalidated dependencies
217        let mut queue = VecDeque::with_capacity(in_degree.len());
218        queue.extend(
219            unique_keys
220                .into_iter()
221                .filter(|&k| in_degree.get(&k).is_some_and(|&deg| deg == 0)),
222        );
223
224        Self {
225            graph,
226            channel,
227            queue,
228            in_degree,
229            stalled: false,
230        }
231    }
232
233    /// Returns `true` if there are no more keys to yield.
234    ///
235    /// Note: if this returns `true` while [`remaining`](Self::remaining) is
236    /// non-zero, the drain has stalled due to a cycle; see
237    /// [`is_stalled`](Self::is_stalled).
238    #[must_use]
239    pub fn is_empty(&self) -> bool {
240        self.queue.is_empty()
241    }
242
243    /// Returns an upper bound on the remaining keys.
244    #[must_use]
245    pub fn remaining(&self) -> usize {
246        self.in_degree.len()
247    }
248
249    /// Returns `true` if the drain has stalled due to a cycle.
250    ///
251    /// This becomes `true` once `next()` has returned `None` while there were
252    /// still keys remaining.
253    ///
254    /// This is only meaningful after the iterator has been exhausted.
255    #[must_use]
256    pub fn is_stalled(&self) -> bool {
257        self.stalled
258    }
259
260    /// Returns whether the drain completed or stalled due to a cycle.
261    ///
262    /// If the drain stalled, this returns how many keys could not be yielded.
263    ///
264    /// This is only meaningful after the iterator has been exhausted.
265    /// Prefer [`collect_with_completion`](Self::collect_with_completion) to
266    /// avoid accidental early checks.
267    #[must_use]
268    pub fn completion(&self) -> DrainCompletion {
269        if self.stalled {
270            DrainCompletion::Stalled {
271                remaining: self.remaining(),
272            }
273        } else {
274            DrainCompletion::Complete
275        }
276    }
277
278    /// Collects all yielded keys and returns completion status.
279    #[must_use]
280    pub fn collect_with_completion(mut self) -> (Vec<K>, DrainCompletion) {
281        let mut out = Vec::with_capacity(self.in_degree.len());
282        out.extend(&mut self);
283        let completion = self.completion();
284        (out, completion)
285    }
286}
287
288impl<'a, K> DrainSortedDeterministic<'a, K>
289where
290    K: Copy + Eq + Hash + Ord + DenseKey,
291{
292    pub(crate) fn from_iter_with_capacity<I>(
293        invalidated_keys: I,
294        cap: usize,
295        graph: &'a InvalidationGraph<K>,
296        channel: Channel,
297    ) -> Self
298    where
299        I: Iterator<Item = K>,
300    {
301        // Deduplicate input keys via the dense in-degree vec.
302        let mut in_degree: Vec<u32> = Vec::new();
303        let mut unique_keys = Vec::with_capacity(cap);
304        for key in invalidated_keys {
305            let idx = key.index();
306            if idx >= in_degree.len() {
307                let target_len =
308                    prepare_dense_growth(&mut in_degree, idx, "deterministic drain in-degree");
309                in_degree.resize(target_len, DENSE_SENTINEL);
310            }
311            if in_degree[idx] == DENSE_SENTINEL {
312                in_degree[idx] = 0;
313                unique_keys.push(key);
314            }
315        }
316
317        // Compute in-degrees within the invalidated subset.
318        for &key in &unique_keys {
319            for dep in graph.dependencies(key, channel) {
320                let dep_idx = dep.index();
321                if dep_idx < in_degree.len() && in_degree[dep_idx] != DENSE_SENTINEL {
322                    in_degree[key.index()] += 1;
323                }
324            }
325        }
326
327        let remaining = unique_keys.len();
328
329        // Initialize ready set with zero in-degree keys.
330        let mut ready = BinaryHeap::with_capacity(remaining);
331        for key in unique_keys {
332            if in_degree[key.index()] == 0 {
333                ready.push(Reverse(key));
334            }
335        }
336
337        Self {
338            graph,
339            channel,
340            ready,
341            in_degree,
342            remaining,
343            stalled: false,
344        }
345    }
346
347    /// Returns `true` if there are no more keys to yield.
348    ///
349    /// Note: if this returns `true` while [`remaining`](Self::remaining) is
350    /// non-zero, the drain has stalled due to a cycle.
351    #[must_use]
352    pub fn is_empty(&self) -> bool {
353        self.ready.is_empty()
354    }
355
356    /// Returns an upper bound on the remaining keys.
357    #[must_use]
358    pub fn remaining(&self) -> usize {
359        self.remaining
360    }
361
362    /// Returns `true` if the drain has stalled due to a cycle.
363    ///
364    /// This becomes `true` once `next()` has returned `None` while there were
365    /// still keys remaining.
366    ///
367    /// This is only meaningful after the iterator has been exhausted.
368    #[must_use]
369    pub fn is_stalled(&self) -> bool {
370        self.stalled
371    }
372
373    /// Returns whether the drain completed or stalled due to a cycle.
374    #[must_use]
375    pub fn completion(&self) -> DrainCompletion {
376        if self.stalled {
377            DrainCompletion::Stalled {
378                remaining: self.remaining(),
379            }
380        } else {
381            DrainCompletion::Complete
382        }
383    }
384
385    /// Collects all yielded keys and returns completion status.
386    #[must_use]
387    pub fn collect_with_completion(mut self) -> (Vec<K>, DrainCompletion) {
388        let mut out = Vec::with_capacity(self.remaining);
389        out.extend(&mut self);
390        let completion = self.completion();
391        (out, completion)
392    }
393}
394
395impl<K> Iterator for DrainSorted<'_, K>
396where
397    K: Copy + Eq + Hash + DenseKey,
398{
399    type Item = K;
400
401    fn next(&mut self) -> Option<Self::Item> {
402        let Some(key) = self.queue.pop_front() else {
403            if !self.in_degree.is_empty() {
404                self.stalled = true;
405            }
406            return None;
407        };
408
409        // Remove from in_degree to mark as processed
410        self.in_degree.remove(&key);
411
412        // Decrement in-degree of invalidated dependents
413        for dependent in self.graph.dependents(key, self.channel) {
414            if let Some(deg) = self.in_degree.get_mut(&dependent) {
415                *deg -= 1;
416                if *deg == 0 {
417                    self.queue.push_back(dependent);
418                }
419            }
420        }
421
422        Some(key)
423    }
424
425    fn size_hint(&self) -> (usize, Option<usize>) {
426        let remaining = self.in_degree.len();
427        (remaining, Some(remaining))
428    }
429}
430
431impl<K> Iterator for DrainSortedDeterministic<'_, K>
432where
433    K: Copy + Eq + Hash + Ord + DenseKey,
434{
435    type Item = K;
436
437    fn next(&mut self) -> Option<Self::Item> {
438        let Some(Reverse(key)) = self.ready.pop() else {
439            if self.remaining > 0 {
440                self.stalled = true;
441            }
442            return None;
443        };
444
445        self.in_degree[key.index()] = DENSE_SENTINEL;
446        self.remaining -= 1;
447
448        for dependent in self.graph.dependents(key, self.channel) {
449            let idx = dependent.index();
450            if idx < self.in_degree.len() && self.in_degree[idx] != DENSE_SENTINEL {
451                self.in_degree[idx] -= 1;
452                if self.in_degree[idx] == 0 {
453                    self.ready.push(Reverse(dependent));
454                }
455            }
456        }
457
458        Some(key)
459    }
460
461    fn size_hint(&self) -> (usize, Option<usize>) {
462        (self.remaining, Some(self.remaining))
463    }
464}
465
466/// Creates a topologically sorted drain from a invalidation set.
467///
468/// This is a convenience function that collects the invalidated keys from a
469/// [`InvalidationSet`](crate::InvalidationSet), clears the channel, and returns a
470/// [`DrainSorted`] iterator.
471///
472/// See [`DrainSorted`] for notes on cycles and nondeterministic ordering.
473///
474/// # Example
475///
476/// ```
477/// use invalidation::{
478///     Channel, CycleHandling, InvalidationGraph, InvalidationSet, drain_sorted,
479/// };
480///
481/// const LAYOUT: Channel = Channel::new(0);
482///
483/// let mut graph = InvalidationGraph::<u32>::new();
484/// graph.add_dependency(2, 1, LAYOUT, CycleHandling::Error).unwrap();
485///
486/// let mut invalidated = InvalidationSet::new();
487/// invalidated.mark(1, LAYOUT);
488/// invalidated.mark(2, LAYOUT);
489///
490/// // Drain in topological order
491/// let sorted: Vec<_> = drain_sorted(&mut invalidated, &graph, LAYOUT).collect();
492/// assert_eq!(sorted, vec![1, 2]);
493///
494/// // Channel is now empty
495/// assert!(!invalidated.has_invalidated(LAYOUT));
496/// ```
497pub fn drain_sorted<'a, K>(
498    invalidated: &mut crate::InvalidationSet<K>,
499    graph: &'a InvalidationGraph<K>,
500    channel: Channel,
501) -> DrainSorted<'a, K>
502where
503    K: Copy + Eq + Hash + DenseKey,
504{
505    DrainBuilder::new(invalidated, graph, channel)
506        .invalidated_only()
507        .run()
508}
509
510/// Creates a deterministic, topologically sorted drain from a invalidation set.
511///
512/// This is equivalent to [`drain_sorted`], but when multiple keys are ready
513/// simultaneously it yields them in ascending key order (`Ord`).
514pub fn drain_sorted_deterministic<'a, K>(
515    invalidated: &mut crate::InvalidationSet<K>,
516    graph: &'a InvalidationGraph<K>,
517    channel: Channel,
518) -> DrainSortedDeterministic<'a, K>
519where
520    K: Copy + Eq + Hash + Ord + DenseKey,
521{
522    DrainBuilder::new(invalidated, graph, channel)
523        .invalidated_only()
524        .deterministic()
525        .run()
526}
527
528/// Creates a topologically sorted drain that includes all affected keys.
529///
530/// Unlike [`drain_sorted`], this function expands the invalidation set to include
531/// all transitive dependents of the marked keys before sorting. This is the
532/// correct drain function to use with [`LazyPolicy`](crate::LazyPolicy),
533/// which only marks roots at mark-time.
534///
535/// # Algorithm
536///
537/// 1. Collect all keys currently marked invalidated in the channel.
538/// 2. For each invalidated key, compute all transitive dependents.
539/// 3. Build the affected set: invalidated keys ∪ all transitive dependents.
540/// 4. Return a topologically sorted drain over the affected set.
541///
542/// See [`DrainSorted`] for notes on cycles and nondeterministic ordering.
543///
544/// # Example
545///
546/// ```
547/// use invalidation::{
548///     Channel, CycleHandling, InvalidationGraph, InvalidationSet, drain_affected_sorted,
549/// };
550///
551/// const LAYOUT: Channel = Channel::new(0);
552///
553/// let mut graph = InvalidationGraph::<u32>::new();
554/// // 1 <- 2 <- 3
555/// graph.add_dependency(2, 1, LAYOUT, CycleHandling::Error).unwrap();
556/// graph.add_dependency(3, 2, LAYOUT, CycleHandling::Error).unwrap();
557///
558/// let mut invalidated = InvalidationSet::new();
559/// // Only mark the root
560/// invalidated.mark(1, LAYOUT);
561///
562/// // drain_affected_sorted expands to include all dependents
563/// let sorted: Vec<_> = drain_affected_sorted(&mut invalidated, &graph, LAYOUT).collect();
564/// assert_eq!(sorted, vec![1, 2, 3]);
565///
566/// // Channel is now empty
567/// assert!(!invalidated.has_invalidated(LAYOUT));
568/// ```
569pub fn drain_affected_sorted<'a, K>(
570    invalidated: &mut crate::InvalidationSet<K>,
571    graph: &'a InvalidationGraph<K>,
572    channel: Channel,
573) -> DrainSorted<'a, K>
574where
575    K: Copy + Eq + Hash + DenseKey,
576{
577    DrainBuilder::new(invalidated, graph, channel)
578        .affected()
579        .run()
580}
581
582/// Creates a topologically sorted drain of all affected keys, while recording a trace.
583///
584/// This is a lazy-friendly explainability helper: it records one plausible
585/// cause edge for each affected key **during** the drain-time expansion step.
586///
587/// Like [`drain_affected_sorted`], this clears the channel and returns a drain
588/// over: roots ∪ transitive dependents.
589///
590/// The trace is best-effort: when a key is reachable from multiple roots or via
591/// multiple paths, this records the first path discovered by the traversal.
592pub fn drain_affected_sorted_with_trace<'a, K, T>(
593    invalidated: &mut crate::InvalidationSet<K>,
594    graph: &'a InvalidationGraph<K>,
595    channel: Channel,
596    scratch: &mut TraversalScratch<K>,
597    trace: &mut T,
598) -> DrainSorted<'a, K>
599where
600    K: Copy + Eq + Hash + DenseKey,
601    T: InvalidationTrace<K>,
602{
603    DrainBuilder::new(invalidated, graph, channel)
604        .affected()
605        .trace(scratch, trace)
606        .run()
607}
608
609/// Creates a deterministic, topologically sorted drain that includes all
610/// affected keys.
611///
612/// This is equivalent to [`drain_affected_sorted`], but when multiple keys are
613/// ready simultaneously it yields them in ascending key order (`Ord`).
614pub fn drain_affected_sorted_deterministic<'a, K>(
615    invalidated: &mut crate::InvalidationSet<K>,
616    graph: &'a InvalidationGraph<K>,
617    channel: Channel,
618) -> DrainSortedDeterministic<'a, K>
619where
620    K: Copy + Eq + Hash + Ord + DenseKey,
621{
622    DrainBuilder::new(invalidated, graph, channel)
623        .affected()
624        .deterministic()
625        .run()
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use alloc::vec;
632    use alloc::vec::Vec;
633
634    use crate::TraversalScratch;
635    use crate::graph::CycleHandling;
636    use crate::set::InvalidationSet;
637    use crate::trace::OneParentRecorder;
638
639    const LAYOUT: Channel = Channel::new(0);
640
641    #[test]
642    fn topological_order_chain() {
643        let mut graph = InvalidationGraph::<u32>::new();
644        // 1 <- 2 <- 3 <- 4
645        graph
646            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
647            .unwrap();
648        graph
649            .add_dependency(3, 2, LAYOUT, CycleHandling::Error)
650            .unwrap();
651        graph
652            .add_dependency(4, 3, LAYOUT, CycleHandling::Error)
653            .unwrap();
654
655        let invalidated_keys = vec![4, 2, 1, 3]; // Out of order
656        let cap = invalidated_keys.len();
657        let sorted: Vec<_> =
658            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
659                .collect();
660
661        // Must be in topological order
662        assert_eq!(sorted, vec![1, 2, 3, 4]);
663    }
664
665    #[test]
666    fn topological_order_diamond() {
667        let mut graph = InvalidationGraph::<u32>::new();
668        // 1 <- 2, 1 <- 3, 2 <- 4, 3 <- 4
669        graph
670            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
671            .unwrap();
672        graph
673            .add_dependency(3, 1, LAYOUT, CycleHandling::Error)
674            .unwrap();
675        graph
676            .add_dependency(4, 2, LAYOUT, CycleHandling::Error)
677            .unwrap();
678        graph
679            .add_dependency(4, 3, LAYOUT, CycleHandling::Error)
680            .unwrap();
681
682        let invalidated_keys = vec![4, 3, 2, 1];
683        let cap = invalidated_keys.len();
684        let sorted: Vec<_> =
685            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
686                .collect();
687
688        // 1 must come first, 4 must come last, 2 and 3 can be in either order
689        assert_eq!(sorted[0], 1);
690        assert_eq!(sorted[3], 4);
691        assert!(sorted[1] == 2 || sorted[1] == 3);
692        assert!(sorted[2] == 2 || sorted[2] == 3);
693    }
694
695    #[test]
696    fn partial_invalidated_set() {
697        let mut graph = InvalidationGraph::<u32>::new();
698        // 1 <- 2 <- 3
699        graph
700            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
701            .unwrap();
702        graph
703            .add_dependency(3, 2, LAYOUT, CycleHandling::Error)
704            .unwrap();
705
706        // Only 2 and 3 are invalidated (not 1)
707        let invalidated_keys = vec![3, 2];
708        let cap = invalidated_keys.len();
709        let sorted: Vec<_> =
710            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
711                .collect();
712
713        // 2 should come before 3, but 1 is not in the invalidation set
714        assert_eq!(sorted, vec![2, 3]);
715    }
716
717    #[test]
718    fn no_dependencies() {
719        let graph = InvalidationGraph::<u32>::new();
720        let invalidated_keys = vec![3, 1, 2];
721        let cap = invalidated_keys.len();
722        let sorted: Vec<_> =
723            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
724                .collect();
725
726        // All have in-degree 0, so order is based on iteration order (set-dependent)
727        assert_eq!(sorted.len(), 3);
728    }
729
730    #[test]
731    fn drain_sorted_function() {
732        let mut graph = InvalidationGraph::<u32>::new();
733        graph
734            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
735            .unwrap();
736
737        let mut invalidated = InvalidationSet::new();
738        invalidated.mark(1, LAYOUT);
739        invalidated.mark(2, LAYOUT);
740
741        let sorted: Vec<_> = drain_sorted(&mut invalidated, &graph, LAYOUT).collect();
742        assert_eq!(sorted, vec![1, 2]);
743
744        // Channel should be empty
745        assert!(!invalidated.has_invalidated(LAYOUT));
746    }
747
748    #[test]
749    fn empty_invalidated_set() {
750        let graph = InvalidationGraph::<u32>::new();
751        let invalidated_keys: Vec<u32> = vec![];
752        let cap = invalidated_keys.len();
753        let sorted: Vec<_> =
754            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
755                .collect();
756        assert!(sorted.is_empty());
757    }
758
759    #[test]
760    fn size_hint_accurate() {
761        let mut graph = InvalidationGraph::<u32>::new();
762        graph
763            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
764            .unwrap();
765
766        let invalidated_keys = vec![1, 2];
767        let cap = invalidated_keys.len();
768        let mut drain =
769            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT);
770
771        assert_eq!(drain.size_hint(), (2, Some(2)));
772        assert_eq!(drain.remaining(), 2);
773
774        let _ = drain.next();
775        assert_eq!(drain.size_hint(), (1, Some(1)));
776
777        let _ = drain.next();
778        assert_eq!(drain.size_hint(), (0, Some(0)));
779        assert!(drain.is_empty());
780    }
781
782    #[test]
783    fn duplicate_keys_deduplicated() {
784        let mut graph = InvalidationGraph::<u32>::new();
785        graph
786            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
787            .unwrap();
788
789        // Duplicates in input
790        let invalidated_keys = vec![1, 2, 1, 2, 1];
791        let cap = invalidated_keys.len();
792        let sorted: Vec<_> =
793            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT)
794                .collect();
795
796        // Should deduplicate to just [1, 2]
797        assert_eq!(sorted.len(), 2);
798        assert_eq!(sorted, vec![1, 2]);
799    }
800
801    #[test]
802    fn cycles_stall_drain() {
803        let mut graph = InvalidationGraph::<u32>::new();
804        // Create a cycle: 1 <- 2 <- 3 <- 1 (with Allow)
805        graph
806            .add_dependency(2, 1, LAYOUT, CycleHandling::Allow)
807            .unwrap();
808        graph
809            .add_dependency(3, 2, LAYOUT, CycleHandling::Allow)
810            .unwrap();
811        graph
812            .add_dependency(1, 3, LAYOUT, CycleHandling::Allow)
813            .unwrap();
814
815        // All three are invalidated
816        let invalidated_keys = vec![1, 2, 3];
817        let cap = invalidated_keys.len();
818        let mut drain =
819            DrainSorted::from_iter_with_capacity(invalidated_keys.into_iter(), cap, &graph, LAYOUT);
820        let sorted: Vec<_> = drain.by_ref().collect();
821
822        // All keys are in a cycle, so no key has in-degree 0, nothing is yielded
823        assert!(
824            sorted.is_empty(),
825            "cycle should prevent any keys from being yielded"
826        );
827        assert!(drain.is_stalled());
828        assert_eq!(
829            drain.completion(),
830            DrainCompletion::Stalled { remaining: 3 }
831        );
832    }
833
834    #[test]
835    fn cycles_stall_drain_collect_with_completion() {
836        let mut graph = InvalidationGraph::<u32>::new();
837        graph
838            .add_dependency(2, 1, LAYOUT, CycleHandling::Allow)
839            .unwrap();
840        graph
841            .add_dependency(3, 2, LAYOUT, CycleHandling::Allow)
842            .unwrap();
843        graph
844            .add_dependency(1, 3, LAYOUT, CycleHandling::Allow)
845            .unwrap();
846
847        let mut invalidated = InvalidationSet::new();
848        invalidated.mark(1, LAYOUT);
849        invalidated.mark(2, LAYOUT);
850        invalidated.mark(3, LAYOUT);
851
852        let (sorted, completion) =
853            drain_sorted(&mut invalidated, &graph, LAYOUT).collect_with_completion();
854        assert!(sorted.is_empty());
855        assert_eq!(completion, DrainCompletion::Stalled { remaining: 3 });
856    }
857
858    #[test]
859    fn drain_affected_sorted_expands_dependents() {
860        let mut graph = InvalidationGraph::<u32>::new();
861        // 1 <- 2 <- 3 <- 4
862        graph
863            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
864            .unwrap();
865        graph
866            .add_dependency(3, 2, LAYOUT, CycleHandling::Error)
867            .unwrap();
868        graph
869            .add_dependency(4, 3, LAYOUT, CycleHandling::Error)
870            .unwrap();
871
872        let mut invalidated = InvalidationSet::new();
873        // Only mark the root
874        invalidated.mark(1, LAYOUT);
875
876        // drain_affected_sorted should expand to include all dependents
877        let sorted: Vec<_> = drain_affected_sorted(&mut invalidated, &graph, LAYOUT).collect();
878        assert_eq!(sorted, vec![1, 2, 3, 4]);
879
880        // Channel should be empty
881        assert!(!invalidated.has_invalidated(LAYOUT));
882    }
883
884    #[test]
885    fn drain_affected_sorted_multiple_roots() {
886        let mut graph = InvalidationGraph::<u32>::new();
887        // Two chains: 1 <- 2, 3 <- 4
888        graph
889            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
890            .unwrap();
891        graph
892            .add_dependency(4, 3, LAYOUT, CycleHandling::Error)
893            .unwrap();
894
895        let mut invalidated = InvalidationSet::new();
896        invalidated.mark(1, LAYOUT);
897        invalidated.mark(3, LAYOUT);
898
899        let sorted: Vec<_> = drain_affected_sorted(&mut invalidated, &graph, LAYOUT).collect();
900        // Should include all 4 keys (order between chains is nondeterministic)
901        assert_eq!(sorted.len(), 4);
902    }
903
904    #[test]
905    fn deterministic_topological_order_diamond_is_total() {
906        let mut graph = InvalidationGraph::<u32>::new();
907        // 1 <- 2, 1 <- 3, 2 <- 4, 3 <- 4
908        graph
909            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
910            .unwrap();
911        graph
912            .add_dependency(3, 1, LAYOUT, CycleHandling::Error)
913            .unwrap();
914        graph
915            .add_dependency(4, 2, LAYOUT, CycleHandling::Error)
916            .unwrap();
917        graph
918            .add_dependency(4, 3, LAYOUT, CycleHandling::Error)
919            .unwrap();
920
921        let invalidated_keys: Vec<u32> = vec![4, 3, 2, 1];
922        let cap = invalidated_keys.len();
923        let sorted: Vec<_> = DrainSortedDeterministic::from_iter_with_capacity(
924            invalidated_keys.into_iter(),
925            cap,
926            &graph,
927            LAYOUT,
928        )
929        .collect();
930
931        // Deterministic tie-breaker yields 2 before 3.
932        assert_eq!(sorted, vec![1, 2, 3, 4]);
933    }
934
935    #[test]
936    #[should_panic(expected = "DenseKey index")]
937    fn deterministic_drain_rejects_sparse_key_space() {
938        let graph = InvalidationGraph::<usize>::new();
939        let mut invalidated = InvalidationSet::new();
940        invalidated.mark(usize::MAX, LAYOUT);
941
942        let _: Vec<_> = drain_sorted_deterministic(&mut invalidated, &graph, LAYOUT).collect();
943    }
944
945    #[test]
946    fn affected_sorted_with_trace_records_one_path() {
947        let mut graph = InvalidationGraph::<u32>::new();
948        // 1 <- 2 <- 3
949        graph
950            .add_dependency(2, 1, LAYOUT, CycleHandling::Error)
951            .unwrap();
952        graph
953            .add_dependency(3, 2, LAYOUT, CycleHandling::Error)
954            .unwrap();
955
956        let mut invalidated = InvalidationSet::new();
957        invalidated.mark(1, LAYOUT);
958
959        let mut scratch = TraversalScratch::new();
960        let mut rec = OneParentRecorder::new();
961        let sorted: Vec<_> = drain_affected_sorted_with_trace(
962            &mut invalidated,
963            &graph,
964            LAYOUT,
965            &mut scratch,
966            &mut rec,
967        )
968        .collect();
969
970        assert_eq!(sorted, vec![1, 2, 3]);
971        assert_eq!(rec.explain_path(3, LAYOUT).unwrap(), vec![1, 2, 3]);
972    }
973}