debtmap/observability/parallel.rs
1//! Parallel context propagation for rayon iterators.
2//!
3//! This module provides utilities for propagating tracing spans and analysis context
4//! into rayon worker threads. Without explicit propagation, context is lost because
5//! rayon's thread pool doesn't inherit thread-local storage from the calling thread.
6//!
7//! ## Problem
8//!
9//! Rayon uses a thread pool. When you call `par_iter()`, the closure runs on
10//! arbitrary worker threads, not the calling thread. This means:
11//!
12//! 1. Thread-locals (like our AnalysisContext) aren't inherited
13//! 2. Tracing's current span isn't inherited
14//! 3. The worker has no knowledge of the parent's context
15//!
16//! ## Solution
17//!
18//! Capture context before entering parallel section, then explicitly enter it in each worker.
19//!
20//! ## Usage
21//!
22//! ```ignore
23//! use debtmap::observability::parallel::{ParallelContext, process_file_with_context};
24//!
25//! // Capture context before parallel execution
26//! let ctx = ParallelContext::capture();
27//!
28//! files
29//! .par_iter()
30//! .map(|path| {
31//! process_file_with_context(path, &ctx, || {
32//! // Context propagated from parent thread
33//! // If panic here, crash report shows file being processed
34//! analyze_file(path)
35//! })
36//! })
37//! .collect()
38//! ```
39//!
40//! ## Performance Considerations
41//!
42//! - Context capture is cheap (clone a few pointers)
43//! - Context entry is cheap (set thread-local, enter span)
44//! - Main overhead is the per-item closure call
45//! - For large items, overhead is negligible
46//! - For tiny items (e.g., summing numbers), use raw par_iter
47//!
48//! ## When to Use Context Propagation
49//!
50//! **Use context propagation** when:
51//! - Processing files or significant work items
52//! - Debugging/observability is important
53//! - Crash context is valuable
54//!
55//! **Use raw `map`** when:
56//! - Processing trivial items (numbers, small transforms)
57//! - Maximum performance is critical
58//! - No need for crash context
59
60use super::context::{get_current_context, increment_processed, set_current_file, AnalysisContext};
61use rayon::prelude::*;
62use std::path::Path;
63use tracing::{debug_span, Span};
64
65// Re-export thread-local for tests
66thread_local! {
67 /// Thread-local storage for the current analysis context.
68 /// This allows each rayon worker to have its own copy of the context.
69 pub(crate) static PARALLEL_CONTEXT: std::cell::RefCell<Option<AnalysisContext>> =
70 const { std::cell::RefCell::new(None) };
71}
72
73/// Combined context for parallel propagation.
74///
75/// Captures both the tracing span and analysis context for propagation
76/// into rayon worker threads.
77#[derive(Clone)]
78pub struct ParallelContext {
79 /// Tracing span to propagate
80 span: Span,
81 /// Analysis context to propagate
82 analysis_context: AnalysisContext,
83}
84
85impl ParallelContext {
86 /// Capture current context for propagation.
87 ///
88 /// Call this on the main thread before entering a parallel section.
89 /// The captured context can then be entered in each worker thread.
90 ///
91 /// # Example
92 ///
93 /// ```ignore
94 /// let _phase = set_phase(AnalysisPhase::Parsing);
95 /// let ctx = ParallelContext::capture();
96 ///
97 /// files.par_iter().for_each(|file| {
98 /// let _guard = ctx.enter();
99 /// // Context is now available in this worker
100 /// });
101 /// ```
102 #[must_use]
103 pub fn capture() -> Self {
104 Self {
105 span: Span::current(),
106 analysis_context: get_current_context(),
107 }
108 }
109
110 /// Enter this context in the current thread.
111 ///
112 /// Returns a guard that restores the previous context on drop.
113 /// Use this at the start of each parallel task.
114 ///
115 /// # Example
116 ///
117 /// ```ignore
118 /// let ctx = ParallelContext::capture();
119 ///
120 /// files.par_iter().for_each(|file| {
121 /// let _guard = ctx.enter();
122 /// // Work with context...
123 /// });
124 /// ```
125 #[must_use]
126 pub fn enter(&self) -> ParallelContextGuard {
127 let span_guard = self.span.clone().entered();
128
129 // Set thread-local analysis context
130 super::context::CURRENT_CONTEXT.with(|ctx| {
131 *ctx.borrow_mut() = self.analysis_context.clone();
132 });
133
134 ParallelContextGuard { _span: span_guard }
135 }
136
137 /// Get the captured analysis context.
138 ///
139 /// Useful for inspecting what context was captured.
140 #[must_use]
141 pub fn analysis_context(&self) -> &AnalysisContext {
142 &self.analysis_context
143 }
144
145 /// Get the captured span.
146 ///
147 /// Useful for creating child spans.
148 #[must_use]
149 pub fn span(&self) -> &Span {
150 &self.span
151 }
152}
153
154/// RAII guard for entered parallel context.
155///
156/// When dropped, the span exit is handled automatically.
157/// Thread-local context remains until next `enter()` call on this thread.
158pub struct ParallelContextGuard {
159 _span: tracing::span::EnteredSpan,
160}
161
162/// Execute closure with propagated context.
163///
164/// This is a convenience function for the common pattern of entering
165/// context and immediately executing a closure.
166///
167/// # Example
168///
169/// ```ignore
170/// let ctx = ParallelContext::capture();
171///
172/// files.par_iter().map(|file| {
173/// with_parallel_context(&ctx, || analyze_file(file))
174/// }).collect()
175/// ```
176#[inline]
177pub fn with_parallel_context<T, F>(ctx: &ParallelContext, f: F) -> T
178where
179 F: FnOnce() -> T,
180{
181 let _guard = ctx.enter();
182 f()
183}
184
185/// Process a file with full context setup.
186///
187/// This combines:
188/// 1. Entering the parent context (span + analysis context)
189/// 2. Setting the current file in context
190/// 3. Creating a debug span for the file
191/// 4. Incrementing the processed count
192///
193/// Use this for the common pattern of processing files in parallel.
194///
195/// # Arguments
196///
197/// * `path` - The file being processed
198/// * `parent_ctx` - The context captured from the parent thread
199/// * `f` - The closure to execute
200///
201/// # Example
202///
203/// ```ignore
204/// let ctx = ParallelContext::capture();
205///
206/// files.par_iter().map(|path| {
207/// process_file_with_context(path, &ctx, || {
208/// // If panic here, crash report shows:
209/// // - Phase from parent context
210/// // - File: /path/to/current/file.rs
211/// // - Span: parent_span > process_file
212/// analyze_file(path)
213/// })
214/// }).collect()
215/// ```
216pub fn process_file_with_context<T, F>(path: &Path, parent_ctx: &ParallelContext, f: F) -> T
217where
218 F: FnOnce() -> T,
219{
220 let _parent = parent_ctx.enter();
221 let _file = set_current_file(path);
222 let _span = debug_span!("process_file", path = %path.display()).entered();
223
224 increment_processed();
225
226 f()
227}
228
229/// Extension trait for parallel iterators with full context propagation.
230///
231/// This trait provides convenience methods for common parallel patterns
232/// with automatic context propagation.
233///
234/// # Example
235///
236/// ```ignore
237/// use debtmap::observability::parallel::ParallelContextExt;
238///
239/// files
240/// .par_iter()
241/// .map_with_context(|path| {
242/// // Context propagated from parent thread
243/// analyze_file(path)
244/// })
245/// .collect()
246/// ```
247pub trait ParallelContextExt<T>: ParallelIterator<Item = T> + Sized {
248 /// Map with context propagation.
249 ///
250 /// Each item is processed with the parent context propagated.
251 fn map_with_context<R, F>(self, f: F) -> impl ParallelIterator<Item = R>
252 where
253 F: Fn(T) -> R + Sync + Send,
254 R: Send,
255 {
256 let ctx = ParallelContext::capture();
257 self.map(move |item| with_parallel_context(&ctx, || f(item)))
258 }
259
260 /// Filter-map with context propagation.
261 ///
262 /// Each item is processed with the parent context propagated.
263 fn filter_map_with_context<R, F>(self, f: F) -> impl ParallelIterator<Item = R>
264 where
265 F: Fn(T) -> Option<R> + Sync + Send,
266 R: Send,
267 {
268 let ctx = ParallelContext::capture();
269 self.filter_map(move |item| with_parallel_context(&ctx, || f(item)))
270 }
271
272 /// For-each with context propagation.
273 ///
274 /// Each item is processed with the parent context propagated.
275 fn for_each_with_context<F>(self, f: F)
276 where
277 F: Fn(T) + Sync + Send,
278 {
279 let ctx = ParallelContext::capture();
280 self.for_each(move |item| with_parallel_context(&ctx, || f(item)));
281 }
282}
283
284impl<T, I: ParallelIterator<Item = T> + Sized> ParallelContextExt<T> for I {}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::observability::context::{reset_context, reset_progress, set_phase, AnalysisPhase};
290 use std::path::PathBuf;
291
292 #[test]
293 fn test_context_capture() {
294 reset_context();
295 reset_progress();
296
297 let _phase = set_phase(AnalysisPhase::Parsing);
298 let ctx = ParallelContext::capture();
299
300 assert_eq!(
301 ctx.analysis_context().phase,
302 Some(AnalysisPhase::Parsing),
303 "Captured context should have Parsing phase"
304 );
305 }
306
307 #[test]
308 fn test_context_propagates_to_workers() {
309 reset_context();
310 reset_progress();
311
312 let _phase = set_phase(AnalysisPhase::DebtScoring);
313 let ctx = ParallelContext::capture();
314
315 let results: Vec<_> = (0..10)
316 .into_par_iter()
317 .map(|i| {
318 let _guard = ctx.enter();
319 // Context should be available
320 let context = get_current_context();
321 (i, context.phase)
322 })
323 .collect();
324
325 for (_, phase) in results {
326 assert_eq!(
327 phase,
328 Some(AnalysisPhase::DebtScoring),
329 "Phase should propagate to workers"
330 );
331 }
332 }
333
334 #[test]
335 fn test_file_context_per_item() {
336 reset_context();
337 reset_progress();
338
339 let ctx = ParallelContext::capture();
340 let files = vec![
341 PathBuf::from("a.rs"),
342 PathBuf::from("b.rs"),
343 PathBuf::from("c.rs"),
344 ];
345
346 files.par_iter().for_each(|path| {
347 process_file_with_context(path, &ctx, || {
348 let context = get_current_context();
349 assert_eq!(
350 context.current_file.as_ref(),
351 Some(path),
352 "Current file should be set in context"
353 );
354 });
355 });
356 }
357
358 #[test]
359 fn test_with_parallel_context_helper() {
360 reset_context();
361 reset_progress();
362
363 let _phase = set_phase(AnalysisPhase::Parsing);
364 let ctx = ParallelContext::capture();
365
366 let result: i32 = (0..100)
367 .into_par_iter()
368 .map(|x| with_parallel_context(&ctx, || x * 2))
369 .sum();
370
371 assert_eq!(
372 result, 9900,
373 "Computation should work correctly with context"
374 );
375 }
376
377 #[test]
378 fn test_map_with_context_extension() {
379 reset_context();
380 reset_progress();
381
382 let _phase = set_phase(AnalysisPhase::PurityAnalysis);
383
384 let results: Vec<_> = (0..10)
385 .into_par_iter()
386 .map_with_context(|i| {
387 let ctx = get_current_context();
388 (i, ctx.phase)
389 })
390 .collect();
391
392 for (_, phase) in results {
393 assert_eq!(
394 phase,
395 Some(AnalysisPhase::PurityAnalysis),
396 "map_with_context should propagate phase"
397 );
398 }
399 }
400
401 #[test]
402 fn test_filter_map_with_context_extension() {
403 reset_context();
404 reset_progress();
405
406 let _phase = set_phase(AnalysisPhase::CoverageLoading);
407
408 let results: Vec<_> = (0..20)
409 .into_par_iter()
410 .filter_map_with_context(|i| {
411 let ctx = get_current_context();
412 if i % 2 == 0 {
413 Some((i, ctx.phase))
414 } else {
415 None
416 }
417 })
418 .collect();
419
420 assert_eq!(results.len(), 10, "Should filter half the items");
421 for (_, phase) in results {
422 assert_eq!(
423 phase,
424 Some(AnalysisPhase::CoverageLoading),
425 "filter_map_with_context should propagate phase"
426 );
427 }
428 }
429
430 #[test]
431 fn test_for_each_with_context_extension() {
432 use std::sync::atomic::{AtomicUsize, Ordering};
433
434 reset_context();
435 reset_progress();
436
437 let _phase = set_phase(AnalysisPhase::OutputGeneration);
438 let count = AtomicUsize::new(0);
439
440 (0..10).into_par_iter().for_each_with_context(|_| {
441 let ctx = get_current_context();
442 if ctx.phase == Some(AnalysisPhase::OutputGeneration) {
443 count.fetch_add(1, Ordering::Relaxed);
444 }
445 });
446
447 assert_eq!(
448 count.load(Ordering::Relaxed),
449 10,
450 "All items should have correct phase"
451 );
452 }
453
454 #[test]
455 fn test_nested_context_in_parallel() {
456 reset_context();
457 reset_progress();
458
459 let _phase = set_phase(AnalysisPhase::Parsing);
460 let ctx = ParallelContext::capture();
461
462 let results: Vec<_> = (0..5)
463 .into_par_iter()
464 .map(|i| {
465 let _guard = ctx.enter();
466 let _file = set_current_file(format!("file_{}.rs", i));
467
468 let inner_ctx = get_current_context();
469 (
470 i,
471 inner_ctx.phase,
472 inner_ctx
473 .current_file
474 .map(|p| p.to_string_lossy().to_string()),
475 )
476 })
477 .collect();
478
479 for (i, phase, file) in results {
480 assert_eq!(phase, Some(AnalysisPhase::Parsing));
481 assert_eq!(file, Some(format!("file_{}.rs", i)));
482 }
483 }
484}