asupersync-conformance 0.3.1

Conformance test suite for async runtime specifications
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
//! Asupersync Conformance Test Suite
//!
// Allow type complexity for trait method return types - these are intentionally
// verbose to be explicit about the async behavior and lifetimes
#![allow(clippy::type_complexity)]
//!
//! This crate provides a conformance test suite for async runtime implementations.
//! Tests are designed to verify that runtimes correctly implement the expected
//! semantics for spawning, channels, I/O, synchronization, and cancellation.
//!
//! # Architecture
//!
//! The test suite is runtime-agnostic. Each runtime must implement the
//! `RuntimeInterface` trait to provide the necessary primitives. Tests are
//! written against this interface, allowing the same tests to validate
//! different runtime implementations.
//!
//! # Test Categories
//!
//! - `Spawn`: Task spawning and join handles
//! - `Channels`: MPSC, oneshot, broadcast, and watch channels
//! - `IO`: File operations, TCP, and UDP networking
//! - `Sync`: Mutex, RwLock, Semaphore, Barrier, OnceCell
//! - `Time`: Sleep, timeout, interval
//! - `Cancel`: Cancellation token and cooperative cancellation

#![forbid(unsafe_code)]

use serde::{Deserialize, Serialize};
use std::fmt;
use std::future::Future;
use std::io::{self, SeekFrom};
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::time::Duration;

pub mod bench;
pub mod kafka_record_batch_v2;
pub mod lean_coverage_matrix;
pub mod lean_frontier;
pub mod logging;
pub mod raptorq_rfc6330;
pub mod report;
pub mod rfc6330_tests;
pub mod runner;
pub mod tests;
pub mod traceability;

pub use bench::{
    BenchAllocSnapshot, BenchAllocStats, BenchCategory, BenchComparisonResult,
    BenchComparisonSummary, BenchConfig, BenchOutput, BenchRunResult, BenchRunSummary, BenchRunner,
    BenchThresholds, Benchmark, Comparison, ComparisonConfidence, RegressionCheck,
    RegressionConfig, RegressionMetric, Stats, StatsError, default_benchmarks,
    run_benchmark_comparison,
};
pub use kafka_record_batch_v2::{
    ConformanceTestResult, Header, KafkaConformanceHarness, RecordAttribute, RecordBatchV2,
    RecordV2, TimestampType,
};
pub use lean_coverage_matrix::{
    BlockerCode, CoverageBlocker, CoverageEvidence, CoverageRow, CoverageRowType, CoverageStatus,
    LEAN_COVERAGE_SCHEMA_VERSION, LeanCoverageMatrix,
};
pub use lean_frontier::{
    LEAN_FRONTIER_SCHEMA_VERSION, LeanDiagnosticSeverity, LeanFrontierBucket,
    LeanFrontierDiagnostic, LeanFrontierReport, extract_frontier_report,
};
pub use logging::{
    ConformanceTestLogger, LogCollector, LogConfig, LogEntry, LogLevel, TestEvent, TestEventKind,
};
pub use report::{render_console_summary, write_json_report};
pub use runner::{
    ComparisonResult, ComparisonStatus, ComparisonSummary, RunConfig, RunSummary, SingleRunResult,
    SuiteResult, SuiteTestResult, TestRunner, compare_results, run_comparison,
    run_conformance_suite,
};
pub use traceability::{
    CiReport, CoverageStats, ScanWarning, SpecRequirement, TraceabilityEntry, TraceabilityMatrix,
    TraceabilityMatrixBuilder, TraceabilityScan, TraceabilityScanError, requirements_from_entries,
    scan_conformance_attributes,
};

// ============================================================================
// Test Result Types
// ============================================================================

/// Result of a conformance test execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestResult {
    /// Whether the test passed.
    pub passed: bool,
    /// Optional failure message.
    pub message: Option<String>,
    /// Checkpoints recorded during test execution.
    pub checkpoints: Vec<Checkpoint>,
    /// Duration of test execution.
    pub duration_ms: Option<u64>,
}

impl TestResult {
    /// Create a passing test result.
    pub fn passed() -> Self {
        Self {
            passed: true,
            message: None,
            checkpoints: Vec::new(),
            duration_ms: None,
        }
    }

    /// Create a failing test result with a message.
    pub fn failed(message: impl Into<String>) -> Self {
        Self {
            passed: false,
            message: Some(message.into()),
            checkpoints: Vec::new(),
            duration_ms: None,
        }
    }

    /// Add a checkpoint to the result.
    pub fn with_checkpoint(mut self, checkpoint: Checkpoint) -> Self {
        self.checkpoints.push(checkpoint);
        self
    }

    /// Set the duration.
    pub fn with_duration(mut self, duration_ms: u64) -> Self {
        self.duration_ms = Some(duration_ms);
        self
    }
}

/// A checkpoint recorded during test execution.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Checkpoint {
    /// Name of the checkpoint.
    pub name: String,
    /// Data associated with the checkpoint.
    pub data: serde_json::Value,
}

impl Checkpoint {
    /// Create a new checkpoint.
    pub fn new(name: impl Into<String>, data: serde_json::Value) -> Self {
        Self {
            name: name.into(),
            data,
        }
    }
}

/// Helper function to record a checkpoint.
pub fn checkpoint(name: &str, data: serde_json::Value) {
    let _ = Checkpoint::new(name, data.clone());
    crate::logging::record_checkpoint(name, data);
}

// ============================================================================
// Test Categories
// ============================================================================

/// Categories of conformance tests.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TestCategory {
    /// Task spawning and join handles.
    Spawn,
    /// Channel primitives (MPSC, oneshot, broadcast, watch).
    Channels,
    /// I/O operations (file, TCP, UDP).
    IO,
    /// Synchronization primitives (Mutex, RwLock, etc.).
    Sync,
    /// Time-related operations (sleep, timeout).
    Time,
    /// Cancellation mechanisms.
    Cancel,
}

impl fmt::Display for TestCategory {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            TestCategory::Spawn => write!(f, "spawn"),
            TestCategory::Channels => write!(f, "channels"),
            TestCategory::IO => write!(f, "io"),
            TestCategory::Sync => write!(f, "sync"),
            TestCategory::Time => write!(f, "time"),
            TestCategory::Cancel => write!(f, "cancel"),
        }
    }
}

// ============================================================================
// Test Metadata
// ============================================================================

/// Metadata for a conformance test.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestMeta {
    /// Unique identifier for the test.
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// Description of what the test validates.
    pub description: String,
    /// Category of the test.
    pub category: TestCategory,
    /// Tags for filtering.
    pub tags: Vec<String>,
    /// Expected behavior description.
    pub expected: String,
}

// ============================================================================
// Runtime Interface
// ============================================================================

/// Trait that async runtimes must implement to run conformance tests.
///
/// This trait provides the common primitives that tests require. Each method
/// returns a concrete type that the runtime provides.
pub trait RuntimeInterface: Sized {
    // ---- Core Types ----
    /// Join handle for spawned tasks.
    type JoinHandle<T: Send + 'static>: Future<Output = T> + Send + 'static;

    /// MPSC sender.
    type MpscSender<T: Send + 'static>: MpscSender<T> + 'static;

    /// MPSC receiver.
    type MpscReceiver<T: Send + 'static>: MpscReceiver<T> + 'static;

    /// Oneshot sender.
    type OneshotSender<T: Send + 'static>: OneshotSender<T> + 'static;

    /// Oneshot receiver.
    type OneshotReceiver<T: Send + 'static>: Future<Output = Result<T, OneshotRecvError>>
        + Send
        + 'static;

    /// Broadcast sender.
    type BroadcastSender<T: Send + Clone + 'static>: BroadcastSender<T> + 'static;

    /// Broadcast receiver.
    type BroadcastReceiver<T: Send + Clone + 'static>: BroadcastReceiver<T> + 'static;

    /// Watch sender.
    type WatchSender<T: Send + Sync + 'static>: WatchSender<T> + 'static;

    /// Watch receiver.
    type WatchReceiver<T: Send + Sync + Clone + 'static>: WatchReceiver<T> + 'static;

    /// Async file handle.
    type File: AsyncFile + 'static;

    /// TCP listener.
    type TcpListener: TcpListener<Stream = Self::TcpStream> + 'static;

    /// TCP stream.
    type TcpStream: TcpStream + 'static;

    /// UDP socket.
    type UdpSocket: UdpSocket + 'static;

    // ---- Spawn ----
    /// Spawn an async task.
    fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static;

    // ---- Block On ----
    /// Block on a future until it completes.
    fn block_on<F: Future>(&self, future: F) -> F::Output;

    /// Snapshot allocation counters for benchmarking (if supported).
    fn bench_alloc_snapshot(&self) -> Option<crate::bench::runner::BenchAllocSnapshot> {
        None
    }

    // ---- Time ----
    /// Sleep for a duration.
    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Run a future with a timeout.
    fn timeout<'a, F: Future + Send + 'a>(
        &'a self,
        duration: Duration,
        future: F,
    ) -> Pin<Box<dyn Future<Output = Result<F::Output, TimeoutError>> + Send + 'a>>
    where
        F::Output: Send;

    // ---- Channels ----
    /// Create an MPSC channel with the given capacity.
    fn mpsc_channel<T: Send + 'static>(
        &self,
        capacity: usize,
    ) -> (Self::MpscSender<T>, Self::MpscReceiver<T>);

    /// Create a oneshot channel.
    fn oneshot_channel<T: Send + 'static>(
        &self,
    ) -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>);

    /// Create a broadcast channel.
    fn broadcast_channel<T: Send + Clone + 'static>(
        &self,
        capacity: usize,
    ) -> (Self::BroadcastSender<T>, Self::BroadcastReceiver<T>);

    /// Create a watch channel.
    fn watch_channel<T: Send + Sync + Clone + 'static>(
        &self,
        initial: T,
    ) -> (Self::WatchSender<T>, Self::WatchReceiver<T>);

    // ---- File I/O ----
    /// Create a file for writing.
    fn file_create<'a>(
        &'a self,
        path: &'a Path,
    ) -> Pin<Box<dyn Future<Output = io::Result<Self::File>> + Send + 'a>>;

    /// Open a file for reading.
    fn file_open<'a>(
        &'a self,
        path: &'a Path,
    ) -> Pin<Box<dyn Future<Output = io::Result<Self::File>> + Send + 'a>>;

    // ---- Network ----
    /// Bind a TCP listener to an address.
    fn tcp_listen<'a>(
        &'a self,
        addr: &'a str,
    ) -> Pin<Box<dyn Future<Output = io::Result<Self::TcpListener>> + Send + 'a>>;

    /// Connect to a TCP address.
    fn tcp_connect<'a>(
        &'a self,
        addr: SocketAddr,
    ) -> Pin<Box<dyn Future<Output = io::Result<Self::TcpStream>> + Send + 'a>>;

    /// Bind a UDP socket to an address.
    fn udp_bind<'a>(
        &'a self,
        addr: &'a str,
    ) -> Pin<Box<dyn Future<Output = io::Result<Self::UdpSocket>> + Send + 'a>>;
}

// ============================================================================
// Channel Traits
// ============================================================================

/// Error when receiving from a closed oneshot channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OneshotRecvError;

impl fmt::Display for OneshotRecvError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "oneshot channel sender dropped")
    }
}

impl std::error::Error for OneshotRecvError {}

/// MPSC sender trait.
pub trait MpscSender<T: Send>: Clone + Send + Sync {
    /// Send a value, waiting if the channel is full.
    fn send(&self, value: T) -> Pin<Box<dyn Future<Output = Result<(), T>> + Send + '_>>;
}

/// MPSC receiver trait.
pub trait MpscReceiver<T: Send>: Send {
    /// Receive a value, returning None if the channel is closed.
    fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<T>> + Send + '_>>;
}

/// Oneshot sender trait.
pub trait OneshotSender<T: Send>: Send {
    /// Send a value. Can only be called once.
    fn send(self, value: T) -> Result<(), T>;
}

/// Error when receiving from a closed broadcast channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BroadcastRecvError {
    /// The receiver lagged too far behind.
    Lagged(u64),
    /// The sender was dropped.
    Closed,
}

impl fmt::Display for BroadcastRecvError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Lagged(n) => write!(f, "receiver lagged by {n} messages"),
            Self::Closed => write!(f, "broadcast channel closed"),
        }
    }
}

impl std::error::Error for BroadcastRecvError {}

/// Broadcast sender trait.
pub trait BroadcastSender<T: Send + Clone>: Clone + Send + Sync {
    /// Send a value to all receivers.
    fn send(&self, value: T) -> Result<usize, T>;

    /// Create a new receiver.
    fn subscribe(&self) -> Box<dyn BroadcastReceiver<T>>;
}

/// Broadcast receiver trait.
pub trait BroadcastReceiver<T: Send + Clone>: Send {
    /// Receive a value.
    fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, BroadcastRecvError>> + Send + '_>>;
}

/// Error when receiving from a closed watch channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WatchRecvError;

impl fmt::Display for WatchRecvError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "watch channel closed")
    }
}

impl std::error::Error for WatchRecvError {}

/// Watch sender trait.
pub trait WatchSender<T: Send + Sync>: Send + Sync {
    /// Send a new value.
    fn send(&self, value: T) -> Result<(), T>;
}

/// Watch receiver trait.
pub trait WatchReceiver<T: Send + Sync>: Clone + Send + Sync {
    /// Wait for a change.
    fn changed(&mut self) -> Pin<Box<dyn Future<Output = Result<(), WatchRecvError>> + Send + '_>>;

    /// Get the current value.
    fn borrow_and_clone(&self) -> T;
}

// ============================================================================
// File I/O Traits
// ============================================================================

/// Async file trait.
pub trait AsyncFile: Send {
    /// Write all bytes to the file.
    fn write_all<'a>(
        &'a mut self,
        buf: &'a [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;

    /// Read to fill the buffer exactly.
    fn read_exact<'a>(
        &'a mut self,
        buf: &'a mut [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;

    /// Read all bytes into a vector.
    fn read_to_end<'a>(
        &'a mut self,
        buf: &'a mut Vec<u8>,
    ) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;

    /// Seek to a position.
    fn seek<'a>(
        &'a mut self,
        pos: SeekFrom,
    ) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + 'a>>;

    /// Sync all data to disk.
    fn sync_all(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;

    /// Shutdown the file (for sockets).
    fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
}

// ============================================================================
// Network Traits
// ============================================================================

/// Timeout error.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimeoutError;

impl fmt::Display for TimeoutError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "operation timed out")
    }
}

impl std::error::Error for TimeoutError {}

/// TCP listener trait.
pub trait TcpListener: Send {
    /// The stream type returned by accept.
    type Stream: TcpStream;

    /// Get the local address.
    fn local_addr(&self) -> io::Result<SocketAddr>;

    /// Accept a connection.
    fn accept(
        &mut self,
    ) -> Pin<Box<dyn Future<Output = io::Result<(Self::Stream, SocketAddr)>> + Send + '_>>;
}

/// TCP stream trait.
pub trait TcpStream: Send {
    /// Read into a buffer.
    fn read<'a>(
        &'a mut self,
        buf: &'a mut [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;

    /// Read to fill the buffer exactly.
    fn read_exact<'a>(
        &'a mut self,
        buf: &'a mut [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;

    /// Write all bytes.
    fn write_all<'a>(
        &'a mut self,
        buf: &'a [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;

    /// Shutdown the stream.
    fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
}

/// UDP socket trait.
pub trait UdpSocket: Send {
    /// Get the local address.
    fn local_addr(&self) -> io::Result<SocketAddr>;

    /// Send to an address.
    fn send_to<'a>(
        &'a self,
        buf: &'a [u8],
        addr: SocketAddr,
    ) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;

    /// Receive from any address.
    fn recv_from<'a>(
        &'a self,
        buf: &'a mut [u8],
    ) -> Pin<Box<dyn Future<Output = io::Result<(usize, SocketAddr)>> + Send + 'a>>;
}

// ============================================================================
// Test Registration
// ============================================================================

/// A registered conformance test.
pub struct ConformanceTest<RT: RuntimeInterface> {
    /// Test metadata.
    pub meta: TestMeta,
    /// The test function.
    pub test_fn: fn(&RT) -> TestResult,
}

impl<RT: RuntimeInterface> ConformanceTest<RT> {
    /// Create a new conformance test.
    pub const fn new(meta: TestMeta, test_fn: fn(&RT) -> TestResult) -> Self {
        Self { meta, test_fn }
    }

    /// Run the test.
    pub fn run(&self, runtime: &RT) -> TestResult {
        (self.test_fn)(runtime)
    }
}

/// Macro for defining conformance tests.
///
/// # Example
///
/// ```ignore
/// conformance_test! {
///     id: "io-001",
///     name: "File write and read",
///     description: "Write data to file, read it back",
///     category: TestCategory::IO,
///     tags: ["file", "basic"],
///     expected: "Read data matches written data",
///     test: |rt| {
///         rt.block_on(async {
///             // test implementation
///             TestResult::passed()
///         })
///     }
/// }
/// ```
#[macro_export]
macro_rules! conformance_test {
    (
        id: $id:literal,
        name: $name:literal,
        description: $desc:literal,
        category: $cat:expr,
        tags: [$($tag:literal),* $(,)?],
        expected: $expected:literal,
        test: |$rt:ident| $body:expr
    ) => {
        {
            fn test_fn<RT: $crate::RuntimeInterface>($rt: &RT) -> $crate::TestResult {
                $body
            }

            $crate::ConformanceTest::new(
                $crate::TestMeta {
                    id: $id.to_string(),
                    name: $name.to_string(),
                    description: $desc.to_string(),
                    category: $cat,
                    tags: vec![$($tag.to_string()),*],
                    expected: $expected.to_string(),
                },
                test_fn,
            )
        }
    };
}