Skip to main content

debtmap/observability/
parallel.rs

1//! Parallel context propagation for rayon iterators.
2//!
3//! This module provides utilities for propagating tracing spans and analysis context
4//! into rayon worker threads. Without explicit propagation, context is lost because
5//! rayon's thread pool doesn't inherit thread-local storage from the calling thread.
6//!
7//! ## Problem
8//!
9//! Rayon uses a thread pool. When you call `par_iter()`, the closure runs on
10//! arbitrary worker threads, not the calling thread. This means:
11//!
12//! 1. Thread-locals (like our AnalysisContext) aren't inherited
13//! 2. Tracing's current span isn't inherited
14//! 3. The worker has no knowledge of the parent's context
15//!
16//! ## Solution
17//!
18//! Capture context before entering parallel section, then explicitly enter it in each worker.
19//!
20//! ## Usage
21//!
22//! ```ignore
23//! use debtmap::observability::parallel::{ParallelContext, process_file_with_context};
24//!
25//! // Capture context before parallel execution
26//! let ctx = ParallelContext::capture();
27//!
28//! files
29//!     .par_iter()
30//!     .map(|path| {
31//!         process_file_with_context(path, &ctx, || {
32//!             // Context propagated from parent thread
33//!             // If panic here, crash report shows file being processed
34//!             analyze_file(path)
35//!         })
36//!     })
37//!     .collect()
38//! ```
39//!
40//! ## Performance Considerations
41//!
42//! - Context capture is cheap (clone a few pointers)
43//! - Context entry is cheap (set thread-local, enter span)
44//! - Main overhead is the per-item closure call
45//! - For large items, overhead is negligible
46//! - For tiny items (e.g., summing numbers), use raw par_iter
47//!
48//! ## When to Use Context Propagation
49//!
50//! **Use context propagation** when:
51//! - Processing files or significant work items
52//! - Debugging/observability is important
53//! - Crash context is valuable
54//!
55//! **Use raw `map`** when:
56//! - Processing trivial items (numbers, small transforms)
57//! - Maximum performance is critical
58//! - No need for crash context
59
60use super::context::{get_current_context, increment_processed, set_current_file, AnalysisContext};
61use rayon::prelude::*;
62use std::path::Path;
63use tracing::{debug_span, Span};
64
65// Re-export thread-local for tests
66thread_local! {
67    /// Thread-local storage for the current analysis context.
68    /// This allows each rayon worker to have its own copy of the context.
69    pub(crate) static PARALLEL_CONTEXT: std::cell::RefCell<Option<AnalysisContext>> =
70        const { std::cell::RefCell::new(None) };
71}
72
73/// Combined context for parallel propagation.
74///
75/// Captures both the tracing span and analysis context for propagation
76/// into rayon worker threads.
77#[derive(Clone)]
78pub struct ParallelContext {
79    /// Tracing span to propagate
80    span: Span,
81    /// Analysis context to propagate
82    analysis_context: AnalysisContext,
83}
84
85impl ParallelContext {
86    /// Capture current context for propagation.
87    ///
88    /// Call this on the main thread before entering a parallel section.
89    /// The captured context can then be entered in each worker thread.
90    ///
91    /// # Example
92    ///
93    /// ```ignore
94    /// let _phase = set_phase(AnalysisPhase::Parsing);
95    /// let ctx = ParallelContext::capture();
96    ///
97    /// files.par_iter().for_each(|file| {
98    ///     let _guard = ctx.enter();
99    ///     // Context is now available in this worker
100    /// });
101    /// ```
102    #[must_use]
103    pub fn capture() -> Self {
104        Self {
105            span: Span::current(),
106            analysis_context: get_current_context(),
107        }
108    }
109
110    /// Enter this context in the current thread.
111    ///
112    /// Returns a guard that restores the previous context on drop.
113    /// Use this at the start of each parallel task.
114    ///
115    /// # Example
116    ///
117    /// ```ignore
118    /// let ctx = ParallelContext::capture();
119    ///
120    /// files.par_iter().for_each(|file| {
121    ///     let _guard = ctx.enter();
122    ///     // Work with context...
123    /// });
124    /// ```
125    #[must_use]
126    pub fn enter(&self) -> ParallelContextGuard {
127        let span_guard = self.span.clone().entered();
128
129        // Set thread-local analysis context
130        super::context::CURRENT_CONTEXT.with(|ctx| {
131            *ctx.borrow_mut() = self.analysis_context.clone();
132        });
133
134        ParallelContextGuard { _span: span_guard }
135    }
136
137    /// Get the captured analysis context.
138    ///
139    /// Useful for inspecting what context was captured.
140    #[must_use]
141    pub fn analysis_context(&self) -> &AnalysisContext {
142        &self.analysis_context
143    }
144
145    /// Get the captured span.
146    ///
147    /// Useful for creating child spans.
148    #[must_use]
149    pub fn span(&self) -> &Span {
150        &self.span
151    }
152}
153
154/// RAII guard for entered parallel context.
155///
156/// When dropped, the span exit is handled automatically.
157/// Thread-local context remains until next `enter()` call on this thread.
158pub struct ParallelContextGuard {
159    _span: tracing::span::EnteredSpan,
160}
161
162/// Execute closure with propagated context.
163///
164/// This is a convenience function for the common pattern of entering
165/// context and immediately executing a closure.
166///
167/// # Example
168///
169/// ```ignore
170/// let ctx = ParallelContext::capture();
171///
172/// files.par_iter().map(|file| {
173///     with_parallel_context(&ctx, || analyze_file(file))
174/// }).collect()
175/// ```
176#[inline]
177pub fn with_parallel_context<T, F>(ctx: &ParallelContext, f: F) -> T
178where
179    F: FnOnce() -> T,
180{
181    let _guard = ctx.enter();
182    f()
183}
184
185/// Process a file with full context setup.
186///
187/// This combines:
188/// 1. Entering the parent context (span + analysis context)
189/// 2. Setting the current file in context
190/// 3. Creating a debug span for the file
191/// 4. Incrementing the processed count
192///
193/// Use this for the common pattern of processing files in parallel.
194///
195/// # Arguments
196///
197/// * `path` - The file being processed
198/// * `parent_ctx` - The context captured from the parent thread
199/// * `f` - The closure to execute
200///
201/// # Example
202///
203/// ```ignore
204/// let ctx = ParallelContext::capture();
205///
206/// files.par_iter().map(|path| {
207///     process_file_with_context(path, &ctx, || {
208///         // If panic here, crash report shows:
209///         // - Phase from parent context
210///         // - File: /path/to/current/file.rs
211///         // - Span: parent_span > process_file
212///         analyze_file(path)
213///     })
214/// }).collect()
215/// ```
216pub fn process_file_with_context<T, F>(path: &Path, parent_ctx: &ParallelContext, f: F) -> T
217where
218    F: FnOnce() -> T,
219{
220    let _parent = parent_ctx.enter();
221    let _file = set_current_file(path);
222    let _span = debug_span!("process_file", path = %path.display()).entered();
223
224    increment_processed();
225
226    f()
227}
228
229/// Extension trait for parallel iterators with full context propagation.
230///
231/// This trait provides convenience methods for common parallel patterns
232/// with automatic context propagation.
233///
234/// # Example
235///
236/// ```ignore
237/// use debtmap::observability::parallel::ParallelContextExt;
238///
239/// files
240///     .par_iter()
241///     .map_with_context(|path| {
242///         // Context propagated from parent thread
243///         analyze_file(path)
244///     })
245///     .collect()
246/// ```
247pub trait ParallelContextExt<T>: ParallelIterator<Item = T> + Sized {
248    /// Map with context propagation.
249    ///
250    /// Each item is processed with the parent context propagated.
251    fn map_with_context<R, F>(self, f: F) -> impl ParallelIterator<Item = R>
252    where
253        F: Fn(T) -> R + Sync + Send,
254        R: Send,
255    {
256        let ctx = ParallelContext::capture();
257        self.map(move |item| with_parallel_context(&ctx, || f(item)))
258    }
259
260    /// Filter-map with context propagation.
261    ///
262    /// Each item is processed with the parent context propagated.
263    fn filter_map_with_context<R, F>(self, f: F) -> impl ParallelIterator<Item = R>
264    where
265        F: Fn(T) -> Option<R> + Sync + Send,
266        R: Send,
267    {
268        let ctx = ParallelContext::capture();
269        self.filter_map(move |item| with_parallel_context(&ctx, || f(item)))
270    }
271
272    /// For-each with context propagation.
273    ///
274    /// Each item is processed with the parent context propagated.
275    fn for_each_with_context<F>(self, f: F)
276    where
277        F: Fn(T) + Sync + Send,
278    {
279        let ctx = ParallelContext::capture();
280        self.for_each(move |item| with_parallel_context(&ctx, || f(item)));
281    }
282}
283
284impl<T, I: ParallelIterator<Item = T> + Sized> ParallelContextExt<T> for I {}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::observability::context::{reset_context, reset_progress, set_phase, AnalysisPhase};
290    use std::path::PathBuf;
291
292    #[test]
293    fn test_context_capture() {
294        reset_context();
295        reset_progress();
296
297        let _phase = set_phase(AnalysisPhase::Parsing);
298        let ctx = ParallelContext::capture();
299
300        assert_eq!(
301            ctx.analysis_context().phase,
302            Some(AnalysisPhase::Parsing),
303            "Captured context should have Parsing phase"
304        );
305    }
306
307    #[test]
308    fn test_context_propagates_to_workers() {
309        reset_context();
310        reset_progress();
311
312        let _phase = set_phase(AnalysisPhase::DebtScoring);
313        let ctx = ParallelContext::capture();
314
315        let results: Vec<_> = (0..10)
316            .into_par_iter()
317            .map(|i| {
318                let _guard = ctx.enter();
319                // Context should be available
320                let context = get_current_context();
321                (i, context.phase)
322            })
323            .collect();
324
325        for (_, phase) in results {
326            assert_eq!(
327                phase,
328                Some(AnalysisPhase::DebtScoring),
329                "Phase should propagate to workers"
330            );
331        }
332    }
333
334    #[test]
335    fn test_file_context_per_item() {
336        reset_context();
337        reset_progress();
338
339        let ctx = ParallelContext::capture();
340        let files = vec![
341            PathBuf::from("a.rs"),
342            PathBuf::from("b.rs"),
343            PathBuf::from("c.rs"),
344        ];
345
346        files.par_iter().for_each(|path| {
347            process_file_with_context(path, &ctx, || {
348                let context = get_current_context();
349                assert_eq!(
350                    context.current_file.as_ref(),
351                    Some(path),
352                    "Current file should be set in context"
353                );
354            });
355        });
356    }
357
358    #[test]
359    fn test_with_parallel_context_helper() {
360        reset_context();
361        reset_progress();
362
363        let _phase = set_phase(AnalysisPhase::Parsing);
364        let ctx = ParallelContext::capture();
365
366        let result: i32 = (0..100)
367            .into_par_iter()
368            .map(|x| with_parallel_context(&ctx, || x * 2))
369            .sum();
370
371        assert_eq!(
372            result, 9900,
373            "Computation should work correctly with context"
374        );
375    }
376
377    #[test]
378    fn test_map_with_context_extension() {
379        reset_context();
380        reset_progress();
381
382        let _phase = set_phase(AnalysisPhase::PurityAnalysis);
383
384        let results: Vec<_> = (0..10)
385            .into_par_iter()
386            .map_with_context(|i| {
387                let ctx = get_current_context();
388                (i, ctx.phase)
389            })
390            .collect();
391
392        for (_, phase) in results {
393            assert_eq!(
394                phase,
395                Some(AnalysisPhase::PurityAnalysis),
396                "map_with_context should propagate phase"
397            );
398        }
399    }
400
401    #[test]
402    fn test_filter_map_with_context_extension() {
403        reset_context();
404        reset_progress();
405
406        let _phase = set_phase(AnalysisPhase::CoverageLoading);
407
408        let results: Vec<_> = (0..20)
409            .into_par_iter()
410            .filter_map_with_context(|i| {
411                let ctx = get_current_context();
412                if i % 2 == 0 {
413                    Some((i, ctx.phase))
414                } else {
415                    None
416                }
417            })
418            .collect();
419
420        assert_eq!(results.len(), 10, "Should filter half the items");
421        for (_, phase) in results {
422            assert_eq!(
423                phase,
424                Some(AnalysisPhase::CoverageLoading),
425                "filter_map_with_context should propagate phase"
426            );
427        }
428    }
429
430    #[test]
431    fn test_for_each_with_context_extension() {
432        use std::sync::atomic::{AtomicUsize, Ordering};
433
434        reset_context();
435        reset_progress();
436
437        let _phase = set_phase(AnalysisPhase::OutputGeneration);
438        let count = AtomicUsize::new(0);
439
440        (0..10).into_par_iter().for_each_with_context(|_| {
441            let ctx = get_current_context();
442            if ctx.phase == Some(AnalysisPhase::OutputGeneration) {
443                count.fetch_add(1, Ordering::Relaxed);
444            }
445        });
446
447        assert_eq!(
448            count.load(Ordering::Relaxed),
449            10,
450            "All items should have correct phase"
451        );
452    }
453
454    #[test]
455    fn test_nested_context_in_parallel() {
456        reset_context();
457        reset_progress();
458
459        let _phase = set_phase(AnalysisPhase::Parsing);
460        let ctx = ParallelContext::capture();
461
462        let results: Vec<_> = (0..5)
463            .into_par_iter()
464            .map(|i| {
465                let _guard = ctx.enter();
466                let _file = set_current_file(format!("file_{}.rs", i));
467
468                let inner_ctx = get_current_context();
469                (
470                    i,
471                    inner_ctx.phase,
472                    inner_ctx
473                        .current_file
474                        .map(|p| p.to_string_lossy().to_string()),
475                )
476            })
477            .collect();
478
479        for (i, phase, file) in results {
480            assert_eq!(phase, Some(AnalysisPhase::Parsing));
481            assert_eq!(file, Some(format!("file_{}.rs", i)));
482        }
483    }
484}