asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
#![allow(clippy::all)]
//! Test utilities for Asupersync.
//!
//! This module provides shared helpers for unit tests:
//! - Consistent tracing-based logging initialization
//! - Phase/section macros for readable test output
//! - Lab runtime constructors
//! - Async test runners
//! - Outcome assertion macros
//! - Test types for pool-style tests
//!
//! # Example
//! ```
//! use asupersync::test_utils::{init_test_logging, run_test};
//!
//! fn my_async_test() {
//!     init_test_logging();
//!     run_test(|| async {
//!         // async test code
//!     });
//! }
//! ```

use crate::cx::Cx;
use crate::lab::{LabConfig, LabRuntime};
use crate::runtime::RuntimeBuilder;
pub use crate::test_logging::{
    ARTIFACT_SCHEMA_VERSION, AllocatedPort, DockerFixtureService, EnvironmentMetadata,
    FixtureService, InProcessService, NoOpFixtureService, PortAllocator, ReproManifest,
    TempDirFixture, TestContext, TestEnvironment, derive_component_seed, derive_entropy_seed,
    derive_scenario_seed, wait_until_healthy,
};

pub use crate::test_ndjson::{
    NDJSON_SCHEMA_VERSION, NdjsonEvent, NdjsonLogger, artifact_base_dir, artifact_bundle_dir,
    ndjson_file_name, trace_file_name, write_artifact_bundle,
};
use crate::time::timeout;
use parking_lot::Mutex;
use std::future::Future;
use std::sync::{Arc, Once};
use std::time::Duration;
use tracing::Dispatch;
use tracing_subscriber::fmt::format::FmtSpan;

static GLOBAL_INIT_LOGGING: Once = Once::new();
#[allow(dead_code)] // Used by other modules' #[cfg(test)] blocks via test-internals feature
static ENV_LOCK: Mutex<()> = Mutex::new(());

/// Default seed used by test lab helpers.
pub const DEFAULT_TEST_SEED: u64 = 0xDEAD_BEEF;

/// Runtime-isolated subscriber handle for per-runtime tracing.
///
/// **CRITICAL**: This fixes the global subscriber conflict where multiple
/// runtimes in the same process would interfere with each other's tracing.
/// Each runtime gets its own isolated subscriber instead of sharing global state.
#[derive(Debug, Clone)]
pub struct RuntimeSubscriberHandle {
    _dispatch: Arc<Dispatch>,
    #[allow(dead_code)]
    runtime_id: String,
}

impl RuntimeSubscriberHandle {
    /// Create a per-runtime subscriber with isolation from other runtimes.
    ///
    /// **SECURITY FIX**: This prevents global subscriber state conflicts
    /// where the second runtime would lose tracing output due to the
    /// Once guard in the old implementation.
    pub fn new_isolated(runtime_id: String, level: tracing::Level) -> Self {
        let subscriber = tracing_subscriber::fmt()
            .with_max_level(level)
            .with_test_writer()
            .with_file(true)
            .with_line_number(true)
            .with_target(true)
            .with_thread_ids(true)
            .with_span_events(FmtSpan::CLOSE)
            .with_ansi(false)
            .finish();

        let dispatch = Arc::new(Dispatch::new(subscriber));

        Self {
            _dispatch: dispatch,
            runtime_id,
        }
    }

    /// Execute a closure with this runtime's subscriber as the default.
    ///
    /// **ISOLATION**: Tracing events within the closure use this runtime's
    /// subscriber, regardless of global subscriber state.
    pub fn with_subscriber<F, R>(&self, f: F) -> R
    where
        F: FnOnce() -> R,
    {
        tracing::dispatcher::with_default(&*self._dispatch, f)
    }
}

/// Initialize test logging with trace-level output.
///
/// **DEPRECATED**: Use `init_runtime_logging()` for new code to get proper
/// per-runtime isolation. This function maintains global semantics for
/// backwards compatibility with existing tests.
///
/// Safe to call multiple times; only initializes once per process.
pub fn init_test_logging() {
    init_test_logging_with_level(tracing::Level::TRACE);
}

/// Initialize test logging with a custom level.
///
/// **DEPRECATED**: Use `init_runtime_logging_with_level()` for new code.
///
/// The first call wins; later calls are no-ops. This maintains the old
/// behavior for compatibility but is vulnerable to multi-runtime conflicts.
pub fn init_test_logging_with_level(level: tracing::Level) {
    GLOBAL_INIT_LOGGING.call_once(|| {
        let _ = tracing_subscriber::fmt()
            .with_max_level(level)
            .with_test_writer()
            .with_file(true)
            .with_line_number(true)
            .with_target(true)
            .with_thread_ids(true)
            .with_span_events(FmtSpan::CLOSE)
            .with_ansi(false)
            .try_init();
    });
}

/// Initialize per-runtime logging with trace-level output.
///
/// **RECOMMENDED**: Use this for new test code that needs runtime isolation.
/// Returns a handle that can be used to execute code with this runtime's
/// subscriber active.
///
/// **SAFETY**: Each runtime gets its own isolated subscriber, preventing
/// global subscriber conflicts that break tracing for subsequent runtimes.
pub fn init_runtime_logging(runtime_id: String) -> RuntimeSubscriberHandle {
    init_runtime_logging_with_level(runtime_id, tracing::Level::TRACE)
}

/// Initialize per-runtime logging with a custom level.
///
/// **ISOLATION**: Creates a completely isolated subscriber for this runtime.
/// Multiple runtimes can coexist without interfering with each other's
/// tracing output.
pub fn init_runtime_logging_with_level(
    runtime_id: String,
    level: tracing::Level,
) -> RuntimeSubscriberHandle {
    RuntimeSubscriberHandle::new_isolated(runtime_id, level)
}

/// Acquire the global environment lock for tests that mutate env vars.
#[allow(dead_code)] // Used by other modules' #[cfg(test)] blocks
pub(crate) fn env_lock() -> parking_lot::MutexGuard<'static, ()> {
    ENV_LOCK.lock()
}

/// Create a deterministic lab runtime for testing.
#[must_use]
pub fn test_lab() -> LabRuntime {
    LabRuntime::new(LabConfig::new(DEFAULT_TEST_SEED))
}

/// Create a lab runtime with a specific seed.
#[must_use]
pub fn test_lab_with_seed(seed: u64) -> LabRuntime {
    LabRuntime::new(LabConfig::new(seed))
}

/// Create a lab runtime with a larger trace buffer for debugging.
#[must_use]
pub fn test_lab_with_tracing() -> LabRuntime {
    LabRuntime::new(LabConfig::new(DEFAULT_TEST_SEED).trace_capacity(64 * 1024))
}

/// Create a lab runtime from a [`TestContext`], using the context's seed.
#[must_use]
pub fn test_lab_from_context(ctx: &TestContext) -> LabRuntime {
    LabRuntime::new(LabConfig::new(ctx.seed))
}

/// Create a lab runtime and hand it to a closure for deterministic execution.
///
/// This is the escape hatch for tests that need direct control over a [`LabRuntime`].
/// Callers can configure the runtime, drive it with
/// [`crate::conformance::LabRuntimeTarget::block_on`], or step it manually.
pub fn lab_with_config<F, R>(f: F) -> R
where
    F: FnOnce(&mut LabRuntime) -> R,
{
    init_test_logging();
    let mut lab = test_lab();
    f(&mut lab)
}

/// Create a [`TestContext`] for a unit test with the default seed.
#[must_use]
pub fn test_context(test_id: &str) -> TestContext {
    TestContext::new(test_id, DEFAULT_TEST_SEED)
}

/// Create a [`TestContext`] for a unit test with a specific seed.
#[must_use]
pub fn test_context_with_seed(test_id: &str, seed: u64) -> TestContext {
    TestContext::new(test_id, seed)
}

/// Run async test code using a lightweight current-thread runtime.
pub fn run_test<F, Fut>(f: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = ()>,
{
    init_test_logging();
    let runtime = RuntimeBuilder::current_thread()
        .build()
        .expect("failed to build test runtime");
    runtime.block_on(f());
}

/// Run async test code with a test `Cx`.
pub fn run_test_with_cx<F, Fut>(f: F)
where
    F: FnOnce(Cx) -> Fut,
    Fut: Future<Output = ()>,
{
    init_test_logging();
    let cx: Cx = Cx::for_testing();
    let runtime = RuntimeBuilder::current_thread()
        .build()
        .expect("failed to build test runtime");
    runtime.block_on(f(cx));
}

/// Assert that an async operation completes within a timeout.
pub async fn assert_completes_within<F, Fut, T>(
    timeout_duration: Duration,
    description: &str,
    f: F,
) -> T
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = T> + Unpin,
{
    // Keep standalone usage correct: `TimeoutFuture` uses `Sleep`, whose fallback clock is
    // `wall_now()`. Passing `Time::ZERO` here can cause immediate timeouts if `wall_now()`
    // has already advanced earlier in the process.
    let now = Cx::current()
        .and_then(|cx| cx.timer_driver())
        .map_or_else(crate::time::wall_now, |driver| driver.now());

    let Ok(value) = timeout(now, timeout_duration, f()).await else {
        unreachable!("operation '{description}' did not complete within {timeout_duration:?}");
    };
    tracing::debug!(
        description = %description,
        timeout_ms = timeout_duration.as_millis(),
        "operation completed within timeout"
    );
    value
}

#[cfg(test)]
mod tests {
    #![allow(
        clippy::pedantic,
        clippy::nursery,
        clippy::expect_fun_call,
        clippy::map_unwrap_or,
        clippy::cast_possible_wrap,
        clippy::future_not_send
    )]
    use super::*;
    use crate::conformance::{ConformanceTarget, LabRuntimeTarget};
    use futures_lite::future;

    #[test]
    fn assert_completes_within_uses_wall_time_when_no_runtime_is_active() {
        // Ensure the wall clock origin is initialized and has advanced beyond the timeout.
        let _t0 = crate::time::wall_now();
        std::thread::sleep(Duration::from_millis(50));

        // This should not spuriously time out in standalone mode.
        let value = future::block_on(assert_completes_within(
            Duration::from_millis(10),
            "standalone immediate future",
            || std::future::ready(7_u8),
        ));
        assert_eq!(value, 7);
    }

    #[test]
    fn lab_with_config_exposes_a_usable_lab_runtime() {
        let (seed, value) = lab_with_config(|runtime| {
            let seed = runtime.config().seed;
            let value = LabRuntimeTarget::block_on(runtime, async { 42_u8 });
            (seed, value)
        });

        assert_eq!(seed, DEFAULT_TEST_SEED);
        assert_eq!(value, 42);
    }
}

/// Log a test phase transition with a visual separator.
#[macro_export]
macro_rules! test_phase {
    ($name:expr) => {
        tracing::info!(phase = %$name, "========================================");
        tracing::info!(phase = %$name, "TEST PHASE: {}", $name);
        tracing::info!(phase = %$name, "========================================");
    };
}

/// Log a section within a test phase.
#[macro_export]
macro_rules! test_section {
    ($name:expr) => {
        tracing::debug!(section = %$name, "--- {} ---", $name);
    };
}

/// Log test completion with summary.
#[macro_export]
macro_rules! test_complete {
    ($name:expr) => {
        tracing::info!(test = %$name, "test completed successfully: {}", $name);
    };
    ($name:expr, $($key:ident = $value:expr),* $(,)?) => {
        tracing::info!(
            test = %$name,
            $($key = %$value,)*
            "test completed successfully: {}",
            $name
        );
    };
}

/// Log before assertions for context.
#[macro_export]
macro_rules! assert_with_log {
    ($cond:expr, $msg:expr, $expected:expr, $actual:expr) => {{
        tracing::debug!(
            expected = ?$expected,
            actual = ?$actual,
            "Asserting: {}",
            $msg
        );
        assert!($cond, "{}: expected {:?}, got {:?}", $msg, $expected, $actual);
    }};
}

/// Assert that an outcome is Ok with a specific value.
#[macro_export]
macro_rules! assert_outcome_ok {
    ($outcome:expr, $expected:expr) => {
        match $outcome {
            $crate::types::Outcome::Ok(v) => assert_eq!(v, $expected),
            other => unreachable!("expected Outcome::Ok({:?}), got {:?}", $expected, other),
        }
    };
}

/// Assert that an outcome is Cancelled.
#[macro_export]
macro_rules! assert_outcome_cancelled {
    ($outcome:expr) => {
        match $outcome {
            $crate::types::Outcome::Cancelled(_) => {}
            other => unreachable!("expected Outcome::Cancelled, got {:?}", other),
        }
    };
}

/// Assert that an outcome is Err.
#[macro_export]
macro_rules! assert_outcome_err {
    ($outcome:expr) => {
        match $outcome {
            $crate::types::Outcome::Err(_) => {}
            other => unreachable!("expected Outcome::Err, got {:?}", other),
        }
    };
}

/// Assert that an outcome is Panicked.
#[macro_export]
macro_rules! assert_outcome_panicked {
    ($outcome:expr) => {
        match $outcome {
            $crate::types::Outcome::Panicked(_) => {}
            other => unreachable!("expected Outcome::Panicked, got {:?}", other),
        }
    };
}

/// Deterministic in-memory connection for pool testing.
#[derive(Debug)]
pub struct TestConnection {
    id: usize,
    query_count: std::sync::atomic::AtomicUsize,
}

impl TestConnection {
    /// Create a new test connection with a stable ID.
    #[must_use]
    pub fn new(id: usize) -> Self {
        Self {
            id,
            query_count: std::sync::atomic::AtomicUsize::new(0),
        }
    }

    /// Returns the connection ID.
    #[must_use]
    pub const fn id(&self) -> usize {
        self.id
    }

    /// Returns how many queries were issued.
    #[must_use]
    pub fn query_count(&self) -> usize {
        self.query_count.load(std::sync::atomic::Ordering::SeqCst)
    }

    /// Simulate a query.
    pub fn query(&self, _sql: &str) -> Result<(), TestError> {
        self.query_count
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        Ok(())
    }
}

/// Test error for pool testing.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TestError(pub String);

impl std::error::Error for TestError {}

impl std::fmt::Display for TestError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "TestError: {}", self.0)
    }
}

// ============================================================================
// Evidence Logging for Structured Test Analysis
// ============================================================================

use crate::test_logging::{TestEvent, TestLogLevel};
use std::path::PathBuf;

/// Evidence sink for capturing structured JSON events during test execution.
///
/// Automatically writes test events to `tests/_evidence/<test_name>.jsonl`
/// for post-hoc analysis, flake pattern detection, and regression tracking.
///
/// # Example
/// ```
/// use asupersync::test_utils::EvidenceSink;
///
/// let evidence = EvidenceSink::for_test("my_test");
/// evidence.phase("setup");
/// evidence.event("task_spawn", &[("task_id", "1"), ("name", "worker")]);
/// evidence.outcome("passed");
/// evidence.save().unwrap();
/// ```
pub struct EvidenceSink {
    logger: NdjsonLogger,
    test_name: String,
    current_phase: String,
}

impl EvidenceSink {
    /// Create a new evidence sink for the given test.
    ///
    /// Uses a default seed and subsystem. Call `with_context()` for custom configuration.
    pub fn for_test(test_name: &str) -> Self {
        let ctx = TestContext::new(test_name, DEFAULT_TEST_SEED);
        let logger = NdjsonLogger::enabled(TestLogLevel::Info, Some(ctx));

        Self {
            logger,
            test_name: test_name.to_string(),
            current_phase: "init".to_string(),
        }
    }

    /// Create evidence sink with custom test context.
    pub fn with_context(test_name: &str, ctx: TestContext) -> Self {
        let logger = NdjsonLogger::enabled(TestLogLevel::Info, Some(ctx));

        Self {
            logger,
            test_name: test_name.to_string(),
            current_phase: "init".to_string(),
        }
    }

    /// Record a test phase transition.
    ///
    /// Phase examples: "setup", "execution", "teardown", "validation"
    pub fn phase(&mut self, phase: &str) {
        self.current_phase = phase.to_string();
        self.logger.log(TestEvent::Custom {
            category: "test",
            message: format!(
                "phase_transition: phase={} test_name={}",
                phase, self.test_name
            ),
        });
    }

    /// Record a structured event with key-value data.
    ///
    /// Event examples: "task_spawn", "region_close", "obligation_leak", "cancel_request"
    pub fn event(&self, event: &str, data: &[(&str, &str)]) {
        let data_str = data
            .iter()
            .map(|(k, v)| format!("{}={}", k, v))
            .chain(std::iter::once(format!("phase={}", self.current_phase)))
            .chain(std::iter::once(format!("test_name={}", self.test_name)))
            .collect::<Vec<_>>()
            .join(" ");

        self.logger.log(TestEvent::Custom {
            category: "evidence",
            message: format!("{}: {}", event, data_str),
        });
    }

    /// Record test outcome: "passed", "failed", "skipped", or "error".
    pub fn outcome(&self, outcome: &str) {
        self.logger.log(TestEvent::Custom {
            category: "test",
            message: format!(
                "outcome: outcome={} test_name={} final_phase={}",
                outcome, self.test_name, self.current_phase
            ),
        });
    }

    /// Record a context ID from the async runtime.
    ///
    /// Useful for correlating events with specific execution contexts.
    pub fn cx_id(&self, cx_id: &str) {
        self.logger.log(TestEvent::Custom {
            category: "runtime",
            message: format!(
                "cx_active: cx_id={} phase={} test_name={}",
                cx_id, self.current_phase, self.test_name
            ),
        });
    }

    /// Save evidence to `tests/_evidence/<test_name>.jsonl`.
    ///
    /// Creates the evidence directory if it doesn't exist.
    pub fn save(&self) -> std::io::Result<PathBuf> {
        let evidence_dir = std::path::Path::new("tests/_evidence");
        std::fs::create_dir_all(evidence_dir)?;

        let file_path = evidence_dir.join(format!("{}.jsonl", self.test_name));
        self.logger.write_ndjson_file(&file_path)?;
        Ok(file_path)
    }

    /// Access the underlying NDJSON logger for advanced usage.
    pub fn logger(&self) -> &NdjsonLogger {
        &self.logger
    }
}

/// Enhanced test phase macro that automatically logs to evidence.
///
/// Usage: `evidence_phase!(evidence_sink, "setup");`
#[macro_export]
macro_rules! evidence_phase {
    ($sink:expr, $phase:expr) => {
        $sink.phase($phase);
        tracing::info!(phase = %$phase, "TEST PHASE: {}", $phase);
    };
}

/// Helper to create and configure evidence sink for LabRuntime tests.
///
/// Integrates with the existing lab runtime helpers while adding structured logging.
pub fn lab_with_evidence<F, T>(test_name: &str, f: F) -> (T, EvidenceSink)
where
    F: FnOnce(&LabRuntime, &mut EvidenceSink) -> T,
{
    let mut evidence = EvidenceSink::for_test(test_name);
    evidence.phase("lab_setup");

    let result = lab_with_config(|runtime| {
        evidence.event(
            "lab_start",
            &[
                ("seed", &runtime.config().seed.to_string()),
                ("deterministic", "true"),
            ],
        );

        let result = f(runtime, &mut evidence);

        evidence.phase("lab_complete");
        result
    });

    evidence.outcome("passed");
    (result, evidence)
}