tanu_core/runner.rs
1//! # Test Runner Module
2//!
3//! The core test execution engine for tanu. This module provides the `Runner` struct
4//! that orchestrates test discovery, execution, filtering, reporting, and event publishing.
5//! It supports concurrent test execution with retry capabilities and comprehensive
6//! event-driven reporting.
7//!
8//! ## Key Components
9//!
10//! - **`Runner`**: Main test execution engine
11//! - **Event System**: Real-time test execution events via channels
12//! - **Filtering**: Project, module, and test name filtering
13//! - **Reporting**: Pluggable reporter system for test output
14//! - **Retry Logic**: Configurable retry with exponential backoff
15//!
16//! ## Execution Flow (block diagram)
17//!
18//! ```text
19//! +-------------------+ +-------------------+ +---------------------+
20//! | Test registry | --> | Filter chain | --> | Semaphore |
21//! | add_test() | | project/module | | (concurrency ctrl) |
22//! +-------------------+ | test name/ignore | +---------------------+
23//! +-------------------+ |
24//! v
25//! +---------------------+
26//! | Tokio task spawn |
27//! | + task-local ctx |
28//! +---------------------+
29//! |
30//! v
31//! +---------------------+
32//! | Test execution |
33//! | + panic recovery |
34//! | + retry/backoff |
35//! +---------------------+
36//! |
37//! +----------------------------------------------------+
38//! v
39//! +-------------------+ +-------------------+ +-------------------+
40//! | Event channel | --> | Broadcast to all | --> | Reporter(s) |
41//! | Start/Check/HTTP | | subscribers | | List/Table/Null |
42//! | Retry/End/Summary | | | | (format output) |
43//! +-------------------+ +-------------------+ +-------------------+
44//! ```
45//!
46//! ## Basic Usage
47//!
48//! ```rust,ignore
49//! use tanu_core::Runner;
50//!
51//! let mut runner = Runner::new();
52//! runner.add_test("my_test", "my_module", None, test_factory);
53//! runner.run(&[], &[], &[]).await?;
54//! ```
55use backon::Retryable;
56use eyre::WrapErr;
57use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
58use itertools::Itertools;
59use once_cell::sync::Lazy;
60use std::{
61 collections::HashMap,
62 ops::Deref,
63 pin::Pin,
64 sync::{
65 atomic::{AtomicUsize, Ordering},
66 Arc, Mutex,
67 },
68 time::{Duration, SystemTime},
69};
70use tokio::sync::broadcast;
71use tracing::*;
72
73use crate::{
74 config::{self, get_tanu_config, ProjectConfig},
75 http,
76 reporter::Reporter,
77 Config, ModuleName, ProjectName,
78};
79
80tokio::task_local! {
81 pub(crate) static TEST_INFO: Arc<TestInfo>;
82}
83
84pub(crate) fn get_test_info() -> Arc<TestInfo> {
85 TEST_INFO.with(Arc::clone)
86}
87
88/// Runs a future in the current tanu test context (project + test info), if any.
89///
90/// This is useful when spawning additional Tokio tasks (e.g. via `tokio::spawn`/`JoinSet`)
91/// from inside a `#[tanu::test]`, because Tokio task-locals are not propagated
92/// automatically.
93pub fn scope_current<F>(fut: F) -> impl std::future::Future<Output = F::Output> + Send
94where
95 F: std::future::Future + Send,
96 F::Output: Send,
97{
98 let project = crate::config::PROJECT.try_with(Arc::clone).ok();
99 let test_info = TEST_INFO.try_with(Arc::clone).ok();
100
101 async move {
102 match (project, test_info) {
103 (Some(project), Some(test_info)) => {
104 crate::config::PROJECT
105 .scope(project, TEST_INFO.scope(test_info, fut))
106 .await
107 }
108 (Some(project), None) => crate::config::PROJECT.scope(project, fut).await,
109 (None, Some(test_info)) => TEST_INFO.scope(test_info, fut).await,
110 (None, None) => fut.await,
111 }
112 }
113}
114
115// NOTE: Keep the runner receiver alive here so that sender never fails to send.
116#[allow(clippy::type_complexity)]
117pub(crate) static CHANNEL: Lazy<
118 Mutex<Option<(broadcast::Sender<Event>, broadcast::Receiver<Event>)>>,
119> = Lazy::new(|| Mutex::new(Some(broadcast::channel(1000))));
120
121/// Barrier to synchronize reporter subscription before test execution starts.
122/// This prevents the race condition where tests publish events before reporters subscribe.
123pub(crate) static REPORTER_BARRIER: Lazy<Mutex<Option<Arc<tokio::sync::Barrier>>>> =
124 Lazy::new(|| Mutex::new(None));
125
126/// Publishes an event to the runner's event channel.
127///
128/// This function is used throughout the test execution pipeline to broadcast
129/// real-time events including test starts, check results, HTTP logs, retries,
130/// and test completions. All events are timestamped and include test context.
131///
132/// # Examples
133///
134/// ```rust,ignore
135/// use tanu_core::runner::{publish, EventBody, Check};
136///
137/// // Publish a successful check
138/// let check = Check::success("response.status() == 200");
139/// publish(EventBody::Check(Box::new(check)))?;
140///
141/// // Publish test start
142/// publish(EventBody::Start)?;
143/// ```
144///
145/// # Errors
146///
147/// Returns an error if:
148/// - The channel lock cannot be acquired
149/// - The channel has been closed
150/// - The send operation fails
151pub fn publish(e: impl Into<Event>) -> eyre::Result<()> {
152 let Ok(guard) = CHANNEL.lock() else {
153 eyre::bail!("failed to acquire runner channel lock");
154 };
155 let Some((tx, _)) = guard.deref() else {
156 eyre::bail!("runner channel has been already closed");
157 };
158
159 tx.send(e.into())
160 .wrap_err("failed to publish message to the runner channel")?;
161
162 Ok(())
163}
164
165/// Subscribe to the channel to see the real-time test execution events.
166pub fn subscribe() -> eyre::Result<broadcast::Receiver<Event>> {
167 let Ok(guard) = CHANNEL.lock() else {
168 eyre::bail!("failed to acquire runner channel lock");
169 };
170 let Some((tx, _)) = guard.deref() else {
171 eyre::bail!("runner channel has been already closed");
172 };
173
174 Ok(tx.subscribe())
175}
176
177/// Set up barrier for N reporters (called before spawning reporters).
178///
179/// This ensures all reporters subscribe before tests start executing,
180/// preventing the race condition where Start events are published before
181/// reporters are ready to receive them.
182pub(crate) fn setup_reporter_barrier(count: usize) -> eyre::Result<()> {
183 let Ok(mut barrier) = REPORTER_BARRIER.lock() else {
184 eyre::bail!("failed to acquire reporter barrier lock");
185 };
186 *barrier = Some(Arc::new(tokio::sync::Barrier::new(count + 1)));
187 Ok(())
188}
189
190/// Wait on barrier (called by reporters after subscribing, and by runner before tests).
191///
192/// If no barrier is set (standalone reporter use), this is a no-op.
193pub(crate) async fn wait_reporter_barrier() {
194 let barrier = match REPORTER_BARRIER.lock() {
195 Ok(guard) => guard.clone(),
196 Err(e) => {
197 error!("failed to acquire reporter barrier lock (poisoned): {e}");
198 return;
199 }
200 };
201
202 if let Some(b) = barrier {
203 b.wait().await;
204 }
205}
206
207async fn execute_test(
208 project: Arc<ProjectConfig>,
209 info: Arc<TestInfo>,
210 factory: TestCaseFactory,
211 serial_mutex: Option<Arc<tokio::sync::Mutex<()>>>,
212 worker_id: isize,
213) -> eyre::Result<Test> {
214 let project_for_scope = Arc::clone(&project);
215 let info_for_scope = Arc::clone(&info);
216 config::PROJECT
217 .scope(project_for_scope, async {
218 TEST_INFO
219 .scope(info_for_scope, async {
220 let test_name = info.name.clone();
221 publish(EventBody::Start)?;
222
223 let retry_count = AtomicUsize::new(project.retry.count.unwrap_or(0));
224 let serial_mutex_clone = serial_mutex.clone();
225 let f = || async {
226 // Acquire serial guard just before test execution
227 let _serial_guard = if let Some(ref mutex) = serial_mutex_clone {
228 Some(mutex.lock().await)
229 } else {
230 None
231 };
232
233 let started_at = SystemTime::now();
234 let request_started = std::time::Instant::now();
235 let res = factory().await;
236 let ended_at = SystemTime::now();
237
238 if res.is_err() && retry_count.load(Ordering::SeqCst) > 0 {
239 let test_result = match &res {
240 Ok(_) => Ok(()),
241 Err(e) => Err(Error::ErrorReturned(format!("{e:?}"))),
242 };
243 let test = Test {
244 result: test_result,
245 info: Arc::clone(&info),
246 worker_id,
247 started_at,
248 ended_at,
249 request_time: request_started.elapsed(),
250 };
251 publish(EventBody::Retry(test))?;
252 retry_count.fetch_sub(1, Ordering::SeqCst);
253 };
254 res
255 };
256 let started_at = SystemTime::now();
257 let started = std::time::Instant::now();
258 let fut = f.retry(project.retry.backoff());
259 let fut = std::panic::AssertUnwindSafe(fut).catch_unwind();
260 let res = fut.await;
261 let request_time = started.elapsed();
262 let ended_at = SystemTime::now();
263
264 let result = match res {
265 Ok(Ok(_)) => {
266 debug!("{test_name} ok");
267 Ok(())
268 }
269 Ok(Err(e)) => {
270 debug!("{test_name} failed: {e:#}");
271 Err(Error::ErrorReturned(format!("{e:?}")))
272 }
273 Err(e) => {
274 let panic_message =
275 if let Some(panic_message) = e.downcast_ref::<&str>() {
276 format!("{test_name} failed with message: {panic_message}")
277 } else if let Some(panic_message) = e.downcast_ref::<String>() {
278 format!("{test_name} failed with message: {panic_message}")
279 } else {
280 format!("{test_name} failed with unknown message")
281 };
282 let e = eyre::eyre!(panic_message);
283 Err(Error::Panicked(format!("{e:?}")))
284 }
285 };
286
287 let test = Test {
288 result,
289 info: Arc::clone(&info),
290 worker_id,
291 started_at,
292 ended_at,
293 request_time,
294 };
295
296 publish(EventBody::End(test.clone()))?;
297
298 eyre::Ok(test)
299 })
300 .await
301 })
302 .await
303}
304
305/// Clear barrier after use.
306pub(crate) fn clear_reporter_barrier() {
307 match REPORTER_BARRIER.lock() {
308 Ok(mut barrier) => {
309 *barrier = None;
310 }
311 Err(e) => {
312 error!("failed to clear reporter barrier (poisoned lock): {e}");
313 }
314 }
315}
316
317/// Test execution errors.
318///
319/// Represents the different ways a test can fail during execution.
320/// These errors are captured and reported by the runner system.
321#[derive(Debug, Clone, thiserror::Error)]
322pub enum Error {
323 #[error("panic: {0}")]
324 Panicked(String),
325 #[error("error: {0}")]
326 ErrorReturned(String),
327}
328
329/// Represents the result of a check/assertion within a test.
330///
331/// Checks are created by assertion macros (`check!`, `check_eq!`, etc.) and
332/// track both the success/failure status and the original expression that
333/// was evaluated. This information is used for detailed test reporting.
334///
335/// # Examples
336///
337/// ```rust,ignore
338/// use tanu_core::runner::Check;
339///
340/// // Create a successful check
341/// let check = Check::success("response.status() == 200");
342/// assert!(check.result);
343///
344/// // Create a failed check
345/// let check = Check::error("user_count != 0");
346/// assert!(!check.result);
347/// ```
348#[derive(Debug, Clone)]
349pub struct Check {
350 pub result: bool,
351 pub expr: String,
352}
353
354impl Check {
355 pub fn success(expr: impl Into<String>) -> Check {
356 Check {
357 result: true,
358 expr: expr.into(),
359 }
360 }
361
362 pub fn error(expr: impl Into<String>) -> Check {
363 Check {
364 result: false,
365 expr: expr.into(),
366 }
367 }
368}
369
370/// A test execution event with full context.
371///
372/// Events are published throughout test execution and include the project,
373/// module, and test name for complete traceability. The event body contains
374/// the specific event data (start, check, HTTP, retry, or end).
375///
376/// # Event Flow
377///
378/// 1. `Start` - Test begins execution
379/// 2. `Check` - Assertion results (can be multiple per test)
380/// 3. `Http` - HTTP request/response logs (can be multiple per test)
381/// 4. `Retry` - Test retry attempts (if configured)
382/// 5. `End` - Test completion with final result
383#[derive(Debug, Clone)]
384pub struct Event {
385 pub project: ProjectName,
386 pub module: ModuleName,
387 pub test: ModuleName,
388 pub body: EventBody,
389}
390
391/// The specific event data published during test execution.
392///
393/// Each event type carries different information:
394/// - `Start`: Signals test execution beginning
395/// - `Check`: Contains assertion results with expression details
396/// - `Call`: HTTP/gRPC request/response logs for debugging
397/// - `Retry`: Indicates a test retry attempt
398/// - `End`: Final test result with timing and outcome
399/// - `Summary`: Overall test execution summary with counts and timing
400///
401/// A log from a call (HTTP, gRPC, etc.)
402#[derive(Debug, Clone)]
403pub enum CallLog {
404 Http(Box<http::Log>),
405 #[cfg(feature = "grpc")]
406 Grpc(Box<crate::grpc::Log>),
407}
408
409#[derive(Debug, Clone)]
410pub enum EventBody {
411 Start,
412 Check(Box<Check>),
413 Call(CallLog),
414 Retry(Test),
415 End(Test),
416 Summary(TestSummary),
417}
418
419impl From<EventBody> for Event {
420 fn from(body: EventBody) -> Self {
421 let project = crate::config::get_config();
422 let test_info = crate::runner::get_test_info();
423 Event {
424 project: project.name.clone(),
425 module: test_info.module.clone(),
426 test: test_info.name.clone(),
427 body,
428 }
429 }
430}
431
432/// Final test execution result.
433///
434/// Contains the complete outcome of a test execution including metadata,
435/// execution time, and the final result (success or specific error type).
436/// This is published in the `End` event when a test completes.
437#[derive(Debug, Clone)]
438pub struct Test {
439 pub info: Arc<TestInfo>,
440 pub worker_id: isize,
441 pub started_at: SystemTime,
442 pub ended_at: SystemTime,
443 pub request_time: Duration,
444 pub result: Result<(), Error>,
445}
446
447/// Overall test execution summary.
448///
449/// Contains aggregate information about the entire test run including
450/// total counts, timing, and success/failure statistics.
451/// This is published in the `Summary` event when all tests complete.
452#[derive(Debug, Clone)]
453pub struct TestSummary {
454 pub total_tests: usize,
455 pub passed_tests: usize,
456 pub failed_tests: usize,
457 pub total_time: Duration,
458 pub test_prep_time: Duration,
459}
460
461/// Test metadata and identification.
462///
463/// Contains the module and test name for a test case. This information
464/// is used for test filtering, reporting, and event context throughout
465/// the test execution pipeline.
466#[derive(Debug, Clone, Default)]
467pub struct TestInfo {
468 pub module: String,
469 pub name: String,
470 pub serial_group: Option<String>,
471 pub line: u32,
472 pub ordered: bool,
473}
474
475impl TestInfo {
476 /// Full test name including module
477 pub fn full_name(&self) -> String {
478 format!("{}::{}", self.module, self.name)
479 }
480
481 /// Unique test name including project and module names
482 pub fn unique_name(&self, project: &str) -> String {
483 format!("{project}::{}::{}", self.module, self.name)
484 }
485}
486
487/// Pool of reusable worker IDs for timeline visualization.
488///
489/// Worker IDs are assigned to tests when they start executing and returned
490/// to the pool when they complete. This allows timeline visualization tools
491/// to display tests in lanes based on which worker executed them.
492#[derive(Debug)]
493pub struct WorkerIds {
494 enabled: bool,
495 ids: Mutex<Vec<isize>>,
496}
497
498impl WorkerIds {
499 /// Creates a new worker ID pool with IDs from 0 to concurrency-1.
500 ///
501 /// If `concurrency` is `None`, the pool is disabled and `acquire()` always returns -1.
502 pub fn new(concurrency: Option<usize>) -> Self {
503 match concurrency {
504 Some(c) => Self {
505 enabled: true,
506 ids: Mutex::new((0..c as isize).collect()),
507 },
508 None => Self {
509 enabled: false,
510 ids: Mutex::new(Vec::new()),
511 },
512 }
513 }
514
515 /// Acquires a worker ID from the pool.
516 ///
517 /// Returns -1 if the pool is disabled, empty, or the mutex is poisoned.
518 pub fn acquire(&self) -> isize {
519 if !self.enabled {
520 return -1;
521 }
522 self.ids
523 .lock()
524 .ok()
525 .and_then(|mut guard| guard.pop())
526 .unwrap_or(-1)
527 }
528
529 /// Returns a worker ID to the pool.
530 ///
531 /// Does nothing if the pool is disabled, the mutex is poisoned, or id is negative.
532 pub fn release(&self, id: isize) {
533 if !self.enabled || id < 0 {
534 return;
535 }
536 if let Ok(mut guard) = self.ids.lock() {
537 guard.push(id);
538 }
539 }
540}
541
542type TestCaseFactory = Arc<
543 dyn Fn() -> Pin<Box<dyn futures::Future<Output = eyre::Result<()>> + Send + 'static>>
544 + Sync
545 + Send
546 + 'static,
547>;
548
549/// Configuration options for test runner behavior.
550///
551/// Controls various aspects of test execution including logging,
552/// concurrency, and channel management. These options can be set
553/// via the builder pattern on the `Runner`.
554///
555/// # Examples
556///
557/// ```rust,ignore
558/// use tanu_core::Runner;
559///
560/// let mut runner = Runner::new();
561/// runner.capture_http(); // Enable HTTP logging
562/// runner.set_concurrency(4); // Limit to 4 concurrent tests
563/// ```
564#[derive(Debug, Clone)]
565pub struct Options {
566 pub debug: bool,
567 pub capture_http: bool,
568 pub capture_rust: bool,
569 pub terminate_channel: bool,
570 pub concurrency: Option<usize>,
571 /// Whether to mask sensitive data (API keys, tokens) in HTTP logs.
572 /// Defaults to `true` (masked). Set to `false` with `--show-sensitive` flag.
573 pub mask_sensitive: bool,
574}
575
576impl Default for Options {
577 fn default() -> Self {
578 Self {
579 debug: false,
580 capture_http: false,
581 capture_rust: false,
582 terminate_channel: false,
583 concurrency: None,
584 mask_sensitive: true, // Masked by default for security
585 }
586 }
587}
588
589/// Trait for filtering test cases during execution.
590///
591/// Filters allow selective test execution based on project configuration
592/// and test metadata. Multiple filters can be applied simultaneously,
593/// and a test must pass all filters to be executed.
594///
595/// # Examples
596///
597/// ```rust,ignore
598/// use tanu_core::runner::{Filter, TestInfo, ProjectConfig};
599///
600/// struct CustomFilter;
601///
602/// impl Filter for CustomFilter {
603/// fn filter(&self, project: &ProjectConfig, info: &TestInfo) -> bool {
604/// // Only run tests with "integration" in the name
605/// info.name.contains("integration")
606/// }
607/// }
608/// ```
609pub trait Filter {
610 fn filter(&self, project: &ProjectConfig, info: &TestInfo) -> bool;
611}
612
613/// Filters tests to only run from specified projects.
614///
615/// When project names are provided, only tests from those projects
616/// will be executed. If the list is empty, all projects are included.
617///
618/// # Examples
619///
620/// ```rust,ignore
621/// use tanu_core::runner::ProjectFilter;
622///
623/// let filter = ProjectFilter { project_names: &["staging".to_string()] };
624/// // Only tests from "staging" project will run
625/// ```
626pub struct ProjectFilter<'a> {
627 project_names: &'a [String],
628}
629
630impl Filter for ProjectFilter<'_> {
631 fn filter(&self, project: &ProjectConfig, _info: &TestInfo) -> bool {
632 if self.project_names.is_empty() {
633 return true;
634 }
635
636 self.project_names
637 .iter()
638 .any(|project_name| &project.name == project_name)
639 }
640}
641
642/// Filters tests to only run from specified modules.
643///
644/// When module names are provided, only tests from those modules
645/// will be executed. If the list is empty, all modules are included.
646/// Module names correspond to Rust module paths.
647///
648/// # Examples
649///
650/// ```rust,ignore
651/// use tanu_core::runner::ModuleFilter;
652///
653/// let filter = ModuleFilter { module_names: &["api".to_string(), "auth".to_string()] };
654/// // Only tests from "api" and "auth" modules will run
655/// ```
656pub struct ModuleFilter<'a> {
657 module_names: &'a [String],
658}
659
660impl Filter for ModuleFilter<'_> {
661 fn filter(&self, _project: &ProjectConfig, info: &TestInfo) -> bool {
662 if self.module_names.is_empty() {
663 return true;
664 }
665
666 self.module_names
667 .iter()
668 .any(|module_name| &info.module == module_name)
669 }
670}
671
672/// Filters tests to only run specific named tests.
673///
674/// When test names are provided, only those exact tests will be executed.
675/// Test names should include the module (e.g., "api::health_check").
676/// If the list is empty, all tests are included.
677///
678/// # Examples
679///
680/// ```rust,ignore
681/// use tanu_core::runner::TestNameFilter;
682///
683/// let filter = TestNameFilter {
684/// test_names: &["api::health_check".to_string(), "auth::login".to_string()]
685/// };
686/// // Only the specified tests will run
687/// ```
688pub struct TestNameFilter<'a> {
689 test_names: &'a [String],
690}
691
692impl Filter for TestNameFilter<'_> {
693 fn filter(&self, _project: &ProjectConfig, info: &TestInfo) -> bool {
694 if self.test_names.is_empty() {
695 return true;
696 }
697
698 self.test_names
699 .iter()
700 .any(|test_name| &info.full_name() == test_name)
701 }
702}
703
704/// Filters out tests that are configured to be ignored.
705///
706/// This filter reads the `test_ignore` configuration from each project
707/// and excludes those tests from execution. Tests are matched by their
708/// full name (module::test_name).
709///
710/// # Configuration
711///
712/// In `tanu.toml`:
713/// ```toml
714/// [[projects]]
715/// name = "staging"
716/// test_ignore = ["flaky_test", "slow_integration_test"]
717/// ```
718///
719/// # Examples
720///
721/// ```rust,ignore
722/// use tanu_core::runner::TestIgnoreFilter;
723///
724/// let filter = TestIgnoreFilter::default();
725/// // Tests listed in test_ignore config will be skipped
726/// ```
727pub struct TestIgnoreFilter {
728 test_ignores: HashMap<String, Vec<String>>,
729}
730
731impl Default for TestIgnoreFilter {
732 fn default() -> TestIgnoreFilter {
733 TestIgnoreFilter {
734 test_ignores: get_tanu_config()
735 .projects
736 .iter()
737 .map(|proj| (proj.name.clone(), proj.test_ignore.clone()))
738 .collect(),
739 }
740 }
741}
742
743impl Filter for TestIgnoreFilter {
744 fn filter(&self, project: &ProjectConfig, info: &TestInfo) -> bool {
745 let Some(test_ignore) = self.test_ignores.get(&project.name) else {
746 return true;
747 };
748
749 test_ignore
750 .iter()
751 .all(|test_name| &info.full_name() != test_name)
752 }
753}
754
755/// The main test execution engine for tanu.
756///
757/// `Runner` is responsible for orchestrating the entire test execution pipeline:
758/// test discovery, filtering, concurrent execution, retry handling, event publishing,
759/// and result reporting. It supports multiple projects, configurable concurrency,
760/// and pluggable reporters.
761///
762/// # Features
763///
764/// - **Concurrent Execution**: Tests run in parallel with configurable limits
765/// - **Retry Logic**: Automatic retry with exponential backoff for flaky tests
766/// - **Event System**: Real-time event publishing for UI integration
767/// - **Filtering**: Filter tests by project, module, or test name
768/// - **Reporting**: Support for multiple output formats via reporters
769/// - **HTTP Logging**: Capture and log all HTTP requests/responses
770///
771/// # Examples
772///
773/// ```rust,ignore
774/// use tanu_core::{Runner, reporter::TableReporter};
775///
776/// let mut runner = Runner::new();
777/// runner.capture_http();
778/// runner.set_concurrency(8);
779/// runner.add_reporter(TableReporter::new());
780///
781/// // Add tests (typically done by procedural macros)
782/// runner.add_test("health_check", "api", None, test_factory);
783///
784/// // Run all tests
785/// runner.run(&[], &[], &[]).await?;
786/// ```
787///
788/// # Architecture
789///
790/// Tests are executed in separate tokio tasks with:
791/// - Project-scoped configuration
792/// - Test-scoped context for event publishing
793/// - Semaphore-based concurrency control
794/// - Panic recovery and error handling
795/// - Automatic retry with configurable backoff
796#[derive(Default)]
797pub struct Runner {
798 cfg: Config,
799 options: Options,
800 test_cases: Vec<(Arc<TestInfo>, TestCaseFactory)>,
801 reporters: Vec<Box<dyn Reporter + Send>>,
802}
803
804impl Runner {
805 /// Creates a new runner with the global tanu configuration.
806 ///
807 /// This loads the configuration from `tanu.toml` and sets up
808 /// default options. Use `with_config()` for custom configuration.
809 ///
810 /// # Examples
811 ///
812 /// ```rust,ignore
813 /// use tanu_core::Runner;
814 ///
815 /// let runner = Runner::new();
816 /// ```
817 pub fn new() -> Runner {
818 Runner::with_config(get_tanu_config().clone())
819 }
820
821 /// Creates a new runner with the specified configuration.
822 ///
823 /// This allows for custom configuration beyond what's in `tanu.toml`,
824 /// useful for testing or programmatic setup.
825 ///
826 /// # Examples
827 ///
828 /// ```rust,ignore
829 /// use tanu_core::{Runner, Config};
830 ///
831 /// let config = Config::default();
832 /// let runner = Runner::with_config(config);
833 /// ```
834 pub fn with_config(cfg: Config) -> Runner {
835 Runner {
836 cfg,
837 options: Options::default(),
838 test_cases: Vec::new(),
839 reporters: Vec::new(),
840 }
841 }
842
843 /// Enables HTTP request/response logging.
844 ///
845 /// When enabled, all HTTP requests made via tanu's HTTP client
846 /// will be logged and included in test reports. This is useful
847 /// for debugging API tests and understanding request/response flow.
848 ///
849 /// # Examples
850 ///
851 /// ```rust,ignore
852 /// let mut runner = Runner::new();
853 /// runner.capture_http();
854 /// ```
855 pub fn capture_http(&mut self) {
856 self.options.capture_http = true;
857 }
858
859 /// Enables Rust logging output during test execution.
860 ///
861 /// This initializes the tracing subscriber to capture debug, info,
862 /// warn, and error logs from tests and the framework itself.
863 /// Useful for debugging test execution issues.
864 ///
865 /// # Examples
866 ///
867 /// ```rust,ignore
868 /// let mut runner = Runner::new();
869 /// runner.capture_rust();
870 /// ```
871 pub fn capture_rust(&mut self) {
872 self.options.capture_rust = true;
873 }
874
875 /// Configures the runner to close the event channel after test execution.
876 ///
877 /// By default, the event channel remains open for continued monitoring.
878 /// This option closes the channel when all tests complete, signaling
879 /// that no more events will be published.
880 ///
881 /// # Examples
882 ///
883 /// ```rust,ignore
884 /// let mut runner = Runner::new();
885 /// runner.terminate_channel();
886 /// ```
887 pub fn terminate_channel(&mut self) {
888 self.options.terminate_channel = true;
889 }
890
891 /// Adds a reporter for test output formatting.
892 ///
893 /// Reporters receive test events and format them for different output
894 /// destinations (console, files, etc.). Multiple reporters can be added
895 /// to generate multiple output formats simultaneously.
896 ///
897 /// # Examples
898 ///
899 /// ```rust,ignore
900 /// use tanu_core::{Runner, reporter::TableReporter};
901 ///
902 /// let mut runner = Runner::new();
903 /// runner.add_reporter(TableReporter::new());
904 /// ```
905 pub fn add_reporter(&mut self, reporter: impl Reporter + 'static + Send) {
906 self.reporters.push(Box::new(reporter));
907 }
908
909 /// Adds a boxed reporter for test output formatting.
910 ///
911 /// Similar to `add_reporter()` but accepts an already-boxed reporter.
912 /// Useful when working with dynamic reporter selection.
913 ///
914 /// # Examples
915 ///
916 /// ```rust,ignore
917 /// use tanu_core::{Runner, reporter::ListReporter};
918 ///
919 /// let mut runner = Runner::new();
920 /// let reporter: Box<dyn Reporter + Send> = Box::new(ListReporter::new());
921 /// runner.add_boxed_reporter(reporter);
922 /// ```
923 pub fn add_boxed_reporter(&mut self, reporter: Box<dyn Reporter + 'static + Send>) {
924 self.reporters.push(reporter);
925 }
926
927 /// Add a test case to the runner.
928 pub fn add_test(
929 &mut self,
930 name: &str,
931 module: &str,
932 serial_group: Option<&str>,
933 line: u32,
934 ordered: bool,
935 factory: TestCaseFactory,
936 ) {
937 self.test_cases.push((
938 Arc::new(TestInfo {
939 name: name.into(),
940 module: module.into(),
941 serial_group: serial_group.map(|s| s.to_string()),
942 line,
943 ordered,
944 }),
945 factory,
946 ));
947 }
948
949 /// Sets the maximum number of tests to run concurrently.
950 ///
951 /// By default, tests run with unlimited concurrency. This setting
952 /// allows you to limit concurrent execution to reduce resource usage
953 /// or avoid overwhelming external services.
954 ///
955 /// # Examples
956 ///
957 /// ```rust,ignore
958 /// let mut runner = Runner::new();
959 /// runner.set_concurrency(4); // Max 4 tests at once
960 /// ```
961 pub fn set_concurrency(&mut self, concurrency: usize) {
962 self.options.concurrency = Some(concurrency);
963 }
964
965 /// Disables sensitive data masking in HTTP logs.
966 ///
967 /// By default, sensitive data (Authorization headers, API keys in URLs, etc.)
968 /// is masked with `*****` when HTTP logging is enabled. Call this method
969 /// to show the actual values instead.
970 ///
971 /// # Examples
972 ///
973 /// ```rust,ignore
974 /// let mut runner = Runner::new();
975 /// runner.capture_http(); // Enable HTTP logging
976 /// runner.show_sensitive(); // Show actual values instead of *****
977 /// ```
978 pub fn show_sensitive(&mut self) {
979 self.options.mask_sensitive = false;
980 }
981
982 /// Executes all registered tests with optional filtering.
983 ///
984 /// Runs tests concurrently according to the configured options and filters.
985 /// Tests can be filtered by project name, module name, or specific test names.
986 /// Empty filter arrays mean "include all".
987 ///
988 /// # Parameters
989 ///
990 /// - `project_names`: Only run tests from these projects (empty = all projects)
991 /// - `module_names`: Only run tests from these modules (empty = all modules)
992 /// - `test_names`: Only run these specific tests (empty = all tests)
993 ///
994 /// # Examples
995 ///
996 /// ```rust,ignore
997 /// let mut runner = Runner::new();
998 ///
999 /// // Run all tests
1000 /// runner.run(&[], &[], &[]).await?;
1001 ///
1002 /// // Run only "staging" project tests
1003 /// runner.run(&["staging".to_string()], &[], &[]).await?;
1004 ///
1005 /// // Run specific test
1006 /// runner.run(&[], &[], &["api::health_check".to_string()]).await?;
1007 /// ```
1008 ///
1009 /// # Errors
1010 ///
1011 /// Returns an error if:
1012 /// - Any test fails (unless configured to continue on failure)
1013 /// - A test panics and cannot be recovered
1014 /// - Reporter setup or execution fails
1015 /// - Event channel operations fail
1016 #[allow(clippy::too_many_lines)]
1017 pub async fn run(
1018 &mut self,
1019 project_names: &[String],
1020 module_names: &[String],
1021 test_names: &[String],
1022 ) -> eyre::Result<()> {
1023 // Set masking configuration for HTTP logs
1024 crate::masking::set_mask_sensitive(self.options.mask_sensitive);
1025
1026 if self.options.capture_rust {
1027 tracing_subscriber::fmt::init();
1028 }
1029
1030 let reporters = std::mem::take(&mut self.reporters);
1031
1032 // Set up barrier for all reporters + runner
1033 // This ensures all reporters subscribe before tests start
1034 setup_reporter_barrier(reporters.len())?;
1035
1036 let reporter_handles: Vec<_> = reporters
1037 .into_iter()
1038 .map(|mut reporter| tokio::spawn(async move { reporter.run().await }))
1039 .collect();
1040
1041 // Wait for all reporters to subscribe before starting tests
1042 wait_reporter_barrier().await;
1043
1044 let project_filter = ProjectFilter { project_names };
1045 let module_filter = ModuleFilter { module_names };
1046 let test_name_filter = TestNameFilter { test_names };
1047 let test_ignore_filter = TestIgnoreFilter::default();
1048
1049 let start = std::time::Instant::now();
1050 let handles: FuturesUnordered<_> = {
1051 // Create a semaphore to limit concurrency
1052 let concurrency = self.options.concurrency;
1053 let semaphore = Arc::new(tokio::sync::Semaphore::new(
1054 concurrency.unwrap_or(tokio::sync::Semaphore::MAX_PERMITS),
1055 ));
1056
1057 // Worker ID pool for timeline visualization (only when concurrency is specified)
1058 let worker_ids = Arc::new(WorkerIds::new(concurrency));
1059
1060 // Per-group mutexes for serial execution (project-scoped)
1061 // Key format: "project_name::group_name"
1062 let serial_groups: Arc<
1063 tokio::sync::RwLock<std::collections::HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
1064 > = Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
1065
1066 let projects = self.cfg.projects.clone();
1067 let projects = if projects.is_empty() {
1068 vec![Arc::new(ProjectConfig {
1069 name: "default".into(),
1070 ..Default::default()
1071 })]
1072 } else {
1073 projects
1074 };
1075
1076 // Collect all tests and apply filters
1077 let mut all_tests: Vec<_> = self
1078 .test_cases
1079 .iter()
1080 .cartesian_product(projects.into_iter())
1081 .map(|((info, factory), project)| (project, Arc::clone(info), factory.clone()))
1082 .filter(move |(project, info, _)| test_name_filter.filter(project, info))
1083 .filter(move |(project, info, _)| module_filter.filter(project, info))
1084 .filter(move |(project, info, _)| project_filter.filter(project, info))
1085 .filter(move |(project, info, _)| test_ignore_filter.filter(project, info))
1086 .collect();
1087
1088 // Separate ordered and non-ordered tests
1089 let (mut ordered_tests, non_ordered_tests): (Vec<_>, Vec<_>) =
1090 all_tests.drain(..).partition(|(_, info, _)| info.ordered);
1091
1092 // Sort ordered tests by (serial_group, line) to ensure source order within groups
1093 ordered_tests
1094 .sort_by_key(|(_project, info, _factory)| (info.serial_group.clone(), info.line));
1095
1096 // Group ordered tests by (project_name, serial_group) for sequential execution
1097 let mut ordered_groups: std::collections::HashMap<String, Vec<_>> =
1098 std::collections::HashMap::new();
1099 for (project, info, factory) in ordered_tests {
1100 let key = format!(
1101 "{}::{}",
1102 project.name,
1103 info.serial_group.as_deref().unwrap_or("")
1104 );
1105 ordered_groups
1106 .entry(key)
1107 .or_default()
1108 .push((project, info, factory));
1109 }
1110
1111 // Create futures for ordered test groups (each group runs sequentially)
1112 let ordered_handles = ordered_groups.into_iter().map(|(group_key, tests)| {
1113 let semaphore = semaphore.clone();
1114 let worker_ids = worker_ids.clone();
1115 let serial_groups = serial_groups.clone();
1116
1117 tokio::spawn(async move {
1118 // Get serial mutex for this group once
1119 let serial_mutex = {
1120 let mut write_lock = serial_groups.write().await;
1121 write_lock
1122 .entry(group_key.clone())
1123 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1124 .clone()
1125 };
1126
1127 // Run all tests in this group sequentially (await each before starting next)
1128 let mut group_failed = false;
1129 let mut group_error: Option<eyre::Report> = None;
1130 for (project, info, factory) in tests {
1131 // Acquire semaphore for this test
1132 let _permit = semaphore
1133 .acquire()
1134 .await
1135 .map_err(|e| eyre::eyre!("failed to acquire semaphore: {e}"));
1136
1137 if _permit.is_err() {
1138 continue;
1139 }
1140
1141 // Acquire worker ID
1142 let worker_id = worker_ids.acquire();
1143
1144 let result = execute_test(
1145 project,
1146 info,
1147 factory,
1148 Some(serial_mutex.clone()),
1149 worker_id,
1150 )
1151 .await;
1152 worker_ids.release(worker_id);
1153
1154 match result {
1155 Ok(test) => {
1156 if test.result.is_err() {
1157 group_failed = true;
1158 }
1159 }
1160 Err(e) => {
1161 group_failed = true;
1162 if group_error.is_none() {
1163 group_error = Some(e);
1164 }
1165 }
1166 }
1167 }
1168 if group_failed {
1169 if let Some(e) = group_error {
1170 return Err(e);
1171 }
1172 eyre::bail!("one or more tests failed");
1173 }
1174 eyre::Ok(())
1175 })
1176 });
1177
1178 // Create futures for non-ordered tests (parallel execution as before)
1179 let non_ordered_handles =
1180 non_ordered_tests
1181 .into_iter()
1182 .map(|(project, info, factory)| {
1183 let semaphore = semaphore.clone();
1184 let worker_ids = worker_ids.clone();
1185 let serial_groups = serial_groups.clone();
1186 tokio::spawn(async move {
1187 // Step 1: Acquire serial group mutex FIRST (if needed) - project-scoped
1188 // This ensures tests in the same group don't hold semaphore permits unnecessarily
1189 let serial_mutex = match &info.serial_group {
1190 Some(group_name) => {
1191 // Create project-scoped key: "project_name::group_name"
1192 let key = format!("{}::{}", project.name, group_name);
1193
1194 // Get or create mutex for this project+group
1195 let read_lock = serial_groups.read().await;
1196 if let Some(mutex) = read_lock.get(&key) {
1197 Some(Arc::clone(mutex))
1198 } else {
1199 drop(read_lock);
1200 let mut write_lock = serial_groups.write().await;
1201 Some(
1202 write_lock
1203 .entry(key)
1204 .or_insert_with(|| {
1205 Arc::new(tokio::sync::Mutex::new(()))
1206 })
1207 .clone(),
1208 )
1209 }
1210 }
1211 None => None,
1212 };
1213
1214 // Step 2: Acquire global semaphore AFTER serial mutex
1215 // This prevents blocking other tests while waiting for serial group
1216 let _permit = semaphore
1217 .acquire()
1218 .await
1219 .map_err(|e| eyre::eyre!("failed to acquire semaphore: {e}"))?;
1220
1221 // Acquire worker ID from pool
1222 let worker_id = worker_ids.acquire();
1223
1224 let result = execute_test(
1225 project,
1226 info,
1227 factory,
1228 serial_mutex.clone(),
1229 worker_id,
1230 )
1231 .await
1232 .and_then(|test| {
1233 let is_err = test.result.is_err();
1234 eyre::ensure!(!is_err);
1235 eyre::Ok(())
1236 });
1237
1238 // Return worker ID to pool
1239 worker_ids.release(worker_id);
1240
1241 result
1242 })
1243 });
1244
1245 // Combine both ordered and non-ordered handles
1246 let all_handles = FuturesUnordered::new();
1247 for handle in ordered_handles {
1248 all_handles.push(handle);
1249 }
1250 for handle in non_ordered_handles {
1251 all_handles.push(handle);
1252 }
1253 all_handles
1254 };
1255 let test_prep_time = start.elapsed();
1256 debug!(
1257 "created handles for {} test cases",
1258 test_prep_time.as_secs_f32()
1259 );
1260
1261 let mut has_any_error = false;
1262 let total_tests = handles.len();
1263 let options = self.options.clone();
1264 let runner = async move {
1265 let results = handles.collect::<Vec<_>>().await;
1266 if results.is_empty() {
1267 console::Term::stdout().write_line("no test cases found")?;
1268 }
1269
1270 let mut failed_tests = 0;
1271 for result in results {
1272 match result {
1273 Ok(res) => {
1274 if let Err(e) = res {
1275 debug!("test case failed: {e:#}");
1276 has_any_error = true;
1277 failed_tests += 1;
1278 }
1279 }
1280 Err(e) => {
1281 if e.is_panic() {
1282 // Resume the panic on the main task
1283 error!("{e}");
1284 has_any_error = true;
1285 failed_tests += 1;
1286 }
1287 }
1288 }
1289 }
1290
1291 let passed_tests = total_tests - failed_tests;
1292 let total_time = start.elapsed();
1293
1294 // Publish summary event
1295 let summary = TestSummary {
1296 total_tests,
1297 passed_tests,
1298 failed_tests,
1299 total_time,
1300 test_prep_time,
1301 };
1302
1303 // Create a dummy event for summary (since it doesn't belong to a specific test)
1304 let summary_event = Event {
1305 project: "".to_string(),
1306 module: "".to_string(),
1307 test: "".to_string(),
1308 body: EventBody::Summary(summary),
1309 };
1310
1311 if let Ok(guard) = CHANNEL.lock() {
1312 if let Some((tx, _)) = guard.as_ref() {
1313 let _ = tx.send(summary_event);
1314 }
1315 }
1316 debug!("all test finished. sending stop signal to the background tasks.");
1317
1318 if options.terminate_channel {
1319 let Ok(mut guard) = CHANNEL.lock() else {
1320 eyre::bail!("failed to acquire runner channel lock");
1321 };
1322 guard.take(); // closing the runner channel.
1323 }
1324
1325 if has_any_error {
1326 eyre::bail!("one or more tests failed");
1327 }
1328
1329 eyre::Ok(())
1330 };
1331
1332 let runner_result = runner.await;
1333
1334 for handle in reporter_handles {
1335 match handle.await {
1336 Ok(Ok(())) => {}
1337 Ok(Err(e)) => error!("reporter failed: {e:#}"),
1338 Err(e) => error!("reporter task panicked: {e:#}"),
1339 }
1340 }
1341
1342 // Clean up barrier
1343 clear_reporter_barrier();
1344
1345 debug!("runner stopped");
1346
1347 runner_result
1348 }
1349
1350 /// Returns a list of all registered test metadata.
1351 ///
1352 /// This provides access to test information without executing the tests.
1353 /// Useful for building test UIs, generating reports, or implementing
1354 /// custom filtering logic.
1355 ///
1356 /// # Examples
1357 ///
1358 /// ```rust,ignore
1359 /// let runner = Runner::new();
1360 /// let tests = runner.list();
1361 ///
1362 /// for test in tests {
1363 /// println!("Test: {}", test.full_name());
1364 /// }
1365 /// ```
1366 pub fn list(&self) -> Vec<&TestInfo> {
1367 self.test_cases
1368 .iter()
1369 .map(|(meta, _test)| meta.as_ref())
1370 .collect::<Vec<_>>()
1371 }
1372}
1373
1374#[cfg(test)]
1375mod test {
1376 use super::*;
1377 use crate::config::RetryConfig;
1378 use crate::ProjectConfig;
1379 use std::sync::Arc;
1380
1381 fn create_config() -> Config {
1382 Config {
1383 projects: vec![Arc::new(ProjectConfig {
1384 name: "default".into(),
1385 ..Default::default()
1386 })],
1387 ..Default::default()
1388 }
1389 }
1390
1391 fn create_config_with_retry() -> Config {
1392 Config {
1393 projects: vec![Arc::new(ProjectConfig {
1394 name: "default".into(),
1395 retry: RetryConfig {
1396 count: Some(1),
1397 ..Default::default()
1398 },
1399 ..Default::default()
1400 })],
1401 ..Default::default()
1402 }
1403 }
1404
1405 #[tokio::test]
1406 async fn runner_fail_because_no_retry_configured() -> eyre::Result<()> {
1407 let mut server = mockito::Server::new_async().await;
1408 let m1 = server
1409 .mock("GET", "/")
1410 .with_status(500)
1411 .expect(1)
1412 .create_async()
1413 .await;
1414 let m2 = server
1415 .mock("GET", "/")
1416 .with_status(200)
1417 .expect(0)
1418 .create_async()
1419 .await;
1420
1421 let factory: TestCaseFactory = Arc::new(move || {
1422 let url = server.url();
1423 Box::pin(async move {
1424 let client = crate::http::Client::new();
1425 let res = client.get(&url).send().await?;
1426 if res.status().is_success() {
1427 Ok(())
1428 } else {
1429 eyre::bail!("request failed")
1430 }
1431 })
1432 });
1433
1434 let _runner_rx = subscribe()?;
1435 let mut runner = Runner::with_config(create_config());
1436 runner.add_test("retry_test", "module", None, 0, false, factory);
1437
1438 let result = runner.run(&[], &[], &[]).await;
1439 m1.assert_async().await;
1440 m2.assert_async().await;
1441
1442 assert!(result.is_err());
1443 Ok(())
1444 }
1445
1446 #[tokio::test]
1447 async fn runner_retry_successful_after_failure() -> eyre::Result<()> {
1448 let mut server = mockito::Server::new_async().await;
1449 let m1 = server
1450 .mock("GET", "/")
1451 .with_status(500)
1452 .expect(1)
1453 .create_async()
1454 .await;
1455 let m2 = server
1456 .mock("GET", "/")
1457 .with_status(200)
1458 .expect(1)
1459 .create_async()
1460 .await;
1461
1462 let factory: TestCaseFactory = Arc::new(move || {
1463 let url = server.url();
1464 Box::pin(async move {
1465 let client = crate::http::Client::new();
1466 let res = client.get(&url).send().await?;
1467 if res.status().is_success() {
1468 Ok(())
1469 } else {
1470 eyre::bail!("request failed")
1471 }
1472 })
1473 });
1474
1475 let _runner_rx = subscribe()?;
1476 let mut runner = Runner::with_config(create_config_with_retry());
1477 runner.add_test("retry_test", "module", None, 0, false, factory);
1478
1479 let result = runner.run(&[], &[], &[]).await;
1480 m1.assert_async().await;
1481 m2.assert_async().await;
1482
1483 assert!(result.is_ok());
1484
1485 Ok(())
1486 }
1487
1488 #[tokio::test]
1489 async fn spawned_task_panics_without_task_local_context() {
1490 let project = Arc::new(ProjectConfig {
1491 name: "default".to_string(),
1492 ..Default::default()
1493 });
1494 let test_info = Arc::new(TestInfo {
1495 module: "mod".to_string(),
1496 name: "test".to_string(),
1497 serial_group: None,
1498 line: 0,
1499 ordered: false,
1500 });
1501
1502 crate::config::PROJECT
1503 .scope(
1504 project,
1505 TEST_INFO.scope(test_info, async move {
1506 let handle = tokio::spawn(async move {
1507 let _ = crate::config::get_config();
1508 });
1509
1510 let join_err = handle.await.expect_err("spawned task should panic");
1511 assert!(join_err.is_panic());
1512 }),
1513 )
1514 .await;
1515 }
1516
1517 #[tokio::test]
1518 async fn scope_current_propagates_task_local_context_into_spawned_task() {
1519 let project = Arc::new(ProjectConfig {
1520 name: "default".to_string(),
1521 ..Default::default()
1522 });
1523 let test_info = Arc::new(TestInfo {
1524 module: "mod".to_string(),
1525 name: "test".to_string(),
1526 serial_group: None,
1527 line: 0,
1528 ordered: false,
1529 });
1530
1531 crate::config::PROJECT
1532 .scope(
1533 project,
1534 TEST_INFO.scope(test_info, async move {
1535 let handle = tokio::spawn(super::scope_current(async move {
1536 let _ = crate::config::get_config();
1537 let _ = super::get_test_info();
1538 }));
1539
1540 handle.await.expect("spawned task should not panic");
1541 }),
1542 )
1543 .await;
1544 }
1545
1546 #[tokio::test]
1547 #[serial_test::serial]
1548 async fn masking_masks_sensitive_query_params_in_http_logs() -> eyre::Result<()> {
1549 use crate::masking;
1550
1551 // Ensure masking is enabled
1552 masking::set_mask_sensitive(true);
1553
1554 let mut server = mockito::Server::new_async().await;
1555 let _mock = server
1556 .mock("GET", mockito::Matcher::Any)
1557 .with_status(200)
1558 .create_async()
1559 .await;
1560
1561 let factory: TestCaseFactory = Arc::new(move || {
1562 let url = server.url();
1563 Box::pin(async move {
1564 let client = crate::http::Client::new();
1565 // Make request with sensitive query param embedded in URL
1566 let _res = client
1567 .get(format!("{url}?access_token=secret_token_123&user=john"))
1568 .send()
1569 .await?;
1570 Ok(())
1571 })
1572 });
1573
1574 let mut rx = subscribe()?;
1575 let mut runner = Runner::with_config(create_config());
1576 runner.add_test(
1577 "masking_query_test",
1578 "masking_module",
1579 None,
1580 0,
1581 false,
1582 factory,
1583 );
1584
1585 runner.run(&[], &[], &[]).await?;
1586
1587 // Collect HTTP events for this specific test
1588 let mut found_http_event = false;
1589 while let Ok(event) = rx.try_recv() {
1590 // Filter to only our test's events
1591 if event.test != "masking_query_test" {
1592 continue;
1593 }
1594 if let EventBody::Call(CallLog::Http(log)) = event.body {
1595 found_http_event = true;
1596 let url_str = log.request.url.to_string();
1597
1598 // Verify sensitive param is masked
1599 assert!(
1600 url_str.contains("access_token=*****"),
1601 "access_token should be masked, got: {url_str}"
1602 );
1603 // Non-sensitive params should not be masked
1604 assert!(
1605 url_str.contains("user=john"),
1606 "user should not be masked, got: {url_str}"
1607 );
1608 }
1609 }
1610
1611 assert!(found_http_event, "Should have received HTTP event");
1612 Ok(())
1613 }
1614
1615 #[tokio::test]
1616 #[serial_test::serial]
1617 async fn masking_masks_sensitive_headers_in_http_logs() -> eyre::Result<()> {
1618 use crate::masking;
1619
1620 // Ensure masking is enabled
1621 masking::set_mask_sensitive(true);
1622
1623 let mut server = mockito::Server::new_async().await;
1624 let _mock = server
1625 .mock("GET", "/")
1626 .with_status(200)
1627 .create_async()
1628 .await;
1629
1630 let factory: TestCaseFactory = Arc::new(move || {
1631 let url = server.url();
1632 Box::pin(async move {
1633 let client = crate::http::Client::new();
1634 // Make request with sensitive headers
1635 let _res = client
1636 .get(&url)
1637 .header("authorization", "Bearer secret_bearer_token")
1638 .header("x-api-key", "my_secret_api_key")
1639 .header("content-type", "application/json")
1640 .send()
1641 .await?;
1642 Ok(())
1643 })
1644 });
1645
1646 let mut rx = subscribe()?;
1647 let mut runner = Runner::with_config(create_config());
1648 runner.add_test(
1649 "masking_headers_test",
1650 "masking_module",
1651 None,
1652 0,
1653 false,
1654 factory,
1655 );
1656
1657 runner.run(&[], &[], &[]).await?;
1658
1659 // Collect HTTP events for this specific test
1660 let mut found_http_event = false;
1661 while let Ok(event) = rx.try_recv() {
1662 // Filter to only our test's events
1663 if event.test != "masking_headers_test" {
1664 continue;
1665 }
1666 if let EventBody::Call(CallLog::Http(log)) = event.body {
1667 found_http_event = true;
1668
1669 // Verify sensitive headers are masked
1670 if let Some(auth) = log.request.headers.get("authorization") {
1671 assert_eq!(
1672 auth.to_str().unwrap(),
1673 "*****",
1674 "authorization header should be masked"
1675 );
1676 }
1677 if let Some(api_key) = log.request.headers.get("x-api-key") {
1678 assert_eq!(
1679 api_key.to_str().unwrap(),
1680 "*****",
1681 "x-api-key header should be masked"
1682 );
1683 }
1684 // Non-sensitive headers should not be masked
1685 if let Some(content_type) = log.request.headers.get("content-type") {
1686 assert_eq!(
1687 content_type.to_str().unwrap(),
1688 "application/json",
1689 "content-type header should not be masked"
1690 );
1691 }
1692 }
1693 }
1694
1695 assert!(found_http_event, "Should have received HTTP event");
1696 Ok(())
1697 }
1698
1699 #[tokio::test]
1700 #[serial_test::serial]
1701 async fn masking_show_sensitive_disables_masking_in_http_logs() -> eyre::Result<()> {
1702 use crate::masking;
1703
1704 masking::set_mask_sensitive(true);
1705
1706 let mut server = mockito::Server::new_async().await;
1707 let _mock = server
1708 .mock("GET", "/")
1709 .with_status(200)
1710 .create_async()
1711 .await;
1712
1713 let factory: TestCaseFactory = Arc::new(move || {
1714 let url = server.url();
1715 Box::pin(async move {
1716 let client = crate::http::Client::new();
1717 let _res = client
1718 .get(format!("{url}?access_token=secret_token_123"))
1719 .header("authorization", "Bearer secret_bearer_token")
1720 .send()
1721 .await?;
1722 Ok(())
1723 })
1724 });
1725
1726 let mut rx = subscribe()?;
1727 let mut runner = Runner::with_config(create_config());
1728 runner.capture_http();
1729 runner.show_sensitive();
1730 runner.add_test(
1731 "show_sensitive_test",
1732 "masking_module",
1733 None,
1734 0,
1735 false,
1736 factory,
1737 );
1738
1739 runner.run(&[], &[], &[]).await?;
1740
1741 let mut found_http_event = false;
1742 while let Ok(event) = rx.try_recv() {
1743 if event.test != "show_sensitive_test" {
1744 continue;
1745 }
1746 if let EventBody::Call(CallLog::Http(log)) = event.body {
1747 found_http_event = true;
1748 let url_str = log.request.url.to_string();
1749 assert!(
1750 url_str.contains("access_token=secret_token_123"),
1751 "access_token should not be masked when show_sensitive is enabled"
1752 );
1753 if let Some(auth) = log.request.headers.get("authorization") {
1754 assert_eq!(
1755 auth.to_str().unwrap(),
1756 "Bearer secret_bearer_token",
1757 "authorization header should not be masked when show_sensitive is enabled"
1758 );
1759 }
1760 }
1761 }
1762
1763 assert!(found_http_event, "Should have received HTTP event");
1764 Ok(())
1765 }
1766}