#![allow(clippy::type_complexity)]
#![forbid(unsafe_code)]
use serde::{Deserialize, Serialize};
use std::fmt;
use std::future::Future;
use std::io::{self, SeekFrom};
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::time::Duration;
pub mod bench;
pub mod kafka_record_batch_v2;
pub mod lean_coverage_matrix;
pub mod lean_frontier;
pub mod logging;
pub mod raptorq_rfc6330;
pub mod report;
pub mod rfc6330_tests;
pub mod runner;
pub mod tests;
pub mod traceability;
pub use bench::{
BenchAllocSnapshot, BenchAllocStats, BenchCategory, BenchComparisonResult,
BenchComparisonSummary, BenchConfig, BenchOutput, BenchRunResult, BenchRunSummary, BenchRunner,
BenchThresholds, Benchmark, Comparison, ComparisonConfidence, RegressionCheck,
RegressionConfig, RegressionMetric, Stats, StatsError, default_benchmarks,
run_benchmark_comparison,
};
pub use kafka_record_batch_v2::{
ConformanceTestResult, Header, KafkaConformanceHarness, RecordAttribute, RecordBatchV2,
RecordV2, TimestampType,
};
pub use lean_coverage_matrix::{
BlockerCode, CoverageBlocker, CoverageEvidence, CoverageRow, CoverageRowType, CoverageStatus,
LEAN_COVERAGE_SCHEMA_VERSION, LeanCoverageMatrix,
};
pub use lean_frontier::{
LEAN_FRONTIER_SCHEMA_VERSION, LeanDiagnosticSeverity, LeanFrontierBucket,
LeanFrontierDiagnostic, LeanFrontierReport, extract_frontier_report,
};
pub use logging::{
ConformanceTestLogger, LogCollector, LogConfig, LogEntry, LogLevel, TestEvent, TestEventKind,
};
pub use report::{render_console_summary, write_json_report};
pub use runner::{
ComparisonResult, ComparisonStatus, ComparisonSummary, RunConfig, RunSummary, SingleRunResult,
SuiteResult, SuiteTestResult, TestRunner, compare_results, run_comparison,
run_conformance_suite,
};
pub use traceability::{
CiReport, CoverageStats, ScanWarning, SpecRequirement, TraceabilityEntry, TraceabilityMatrix,
TraceabilityMatrixBuilder, TraceabilityScan, TraceabilityScanError, requirements_from_entries,
scan_conformance_attributes,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestResult {
pub passed: bool,
pub message: Option<String>,
pub checkpoints: Vec<Checkpoint>,
pub duration_ms: Option<u64>,
}
impl TestResult {
pub fn passed() -> Self {
Self {
passed: true,
message: None,
checkpoints: Vec::new(),
duration_ms: None,
}
}
pub fn failed(message: impl Into<String>) -> Self {
Self {
passed: false,
message: Some(message.into()),
checkpoints: Vec::new(),
duration_ms: None,
}
}
pub fn with_checkpoint(mut self, checkpoint: Checkpoint) -> Self {
self.checkpoints.push(checkpoint);
self
}
pub fn with_duration(mut self, duration_ms: u64) -> Self {
self.duration_ms = Some(duration_ms);
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Checkpoint {
pub name: String,
pub data: serde_json::Value,
}
impl Checkpoint {
pub fn new(name: impl Into<String>, data: serde_json::Value) -> Self {
Self {
name: name.into(),
data,
}
}
}
pub fn checkpoint(name: &str, data: serde_json::Value) {
let _ = Checkpoint::new(name, data.clone());
crate::logging::record_checkpoint(name, data);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TestCategory {
Spawn,
Channels,
IO,
Sync,
Time,
Cancel,
}
impl fmt::Display for TestCategory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TestCategory::Spawn => write!(f, "spawn"),
TestCategory::Channels => write!(f, "channels"),
TestCategory::IO => write!(f, "io"),
TestCategory::Sync => write!(f, "sync"),
TestCategory::Time => write!(f, "time"),
TestCategory::Cancel => write!(f, "cancel"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestMeta {
pub id: String,
pub name: String,
pub description: String,
pub category: TestCategory,
pub tags: Vec<String>,
pub expected: String,
}
pub trait RuntimeInterface: Sized {
type JoinHandle<T: Send + 'static>: Future<Output = T> + Send + 'static;
type MpscSender<T: Send + 'static>: MpscSender<T> + 'static;
type MpscReceiver<T: Send + 'static>: MpscReceiver<T> + 'static;
type OneshotSender<T: Send + 'static>: OneshotSender<T> + 'static;
type OneshotReceiver<T: Send + 'static>: Future<Output = Result<T, OneshotRecvError>>
+ Send
+ 'static;
type BroadcastSender<T: Send + Clone + 'static>: BroadcastSender<T> + 'static;
type BroadcastReceiver<T: Send + Clone + 'static>: BroadcastReceiver<T> + 'static;
type WatchSender<T: Send + Sync + 'static>: WatchSender<T> + 'static;
type WatchReceiver<T: Send + Sync + Clone + 'static>: WatchReceiver<T> + 'static;
type File: AsyncFile + 'static;
type TcpListener: TcpListener<Stream = Self::TcpStream> + 'static;
type TcpStream: TcpStream + 'static;
type UdpSocket: UdpSocket + 'static;
fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static;
fn block_on<F: Future>(&self, future: F) -> F::Output;
fn bench_alloc_snapshot(&self) -> Option<crate::bench::runner::BenchAllocSnapshot> {
None
}
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
fn timeout<'a, F: Future + Send + 'a>(
&'a self,
duration: Duration,
future: F,
) -> Pin<Box<dyn Future<Output = Result<F::Output, TimeoutError>> + Send + 'a>>
where
F::Output: Send;
fn mpsc_channel<T: Send + 'static>(
&self,
capacity: usize,
) -> (Self::MpscSender<T>, Self::MpscReceiver<T>);
fn oneshot_channel<T: Send + 'static>(
&self,
) -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>);
fn broadcast_channel<T: Send + Clone + 'static>(
&self,
capacity: usize,
) -> (Self::BroadcastSender<T>, Self::BroadcastReceiver<T>);
fn watch_channel<T: Send + Sync + Clone + 'static>(
&self,
initial: T,
) -> (Self::WatchSender<T>, Self::WatchReceiver<T>);
fn file_create<'a>(
&'a self,
path: &'a Path,
) -> Pin<Box<dyn Future<Output = io::Result<Self::File>> + Send + 'a>>;
fn file_open<'a>(
&'a self,
path: &'a Path,
) -> Pin<Box<dyn Future<Output = io::Result<Self::File>> + Send + 'a>>;
fn tcp_listen<'a>(
&'a self,
addr: &'a str,
) -> Pin<Box<dyn Future<Output = io::Result<Self::TcpListener>> + Send + 'a>>;
fn tcp_connect<'a>(
&'a self,
addr: SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<Self::TcpStream>> + Send + 'a>>;
fn udp_bind<'a>(
&'a self,
addr: &'a str,
) -> Pin<Box<dyn Future<Output = io::Result<Self::UdpSocket>> + Send + 'a>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OneshotRecvError;
impl fmt::Display for OneshotRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "oneshot channel sender dropped")
}
}
impl std::error::Error for OneshotRecvError {}
pub trait MpscSender<T: Send>: Clone + Send + Sync {
fn send(&self, value: T) -> Pin<Box<dyn Future<Output = Result<(), T>> + Send + '_>>;
}
pub trait MpscReceiver<T: Send>: Send {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<T>> + Send + '_>>;
}
pub trait OneshotSender<T: Send>: Send {
fn send(self, value: T) -> Result<(), T>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BroadcastRecvError {
Lagged(u64),
Closed,
}
impl fmt::Display for BroadcastRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Lagged(n) => write!(f, "receiver lagged by {n} messages"),
Self::Closed => write!(f, "broadcast channel closed"),
}
}
}
impl std::error::Error for BroadcastRecvError {}
pub trait BroadcastSender<T: Send + Clone>: Clone + Send + Sync {
fn send(&self, value: T) -> Result<usize, T>;
fn subscribe(&self) -> Box<dyn BroadcastReceiver<T>>;
}
pub trait BroadcastReceiver<T: Send + Clone>: Send {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, BroadcastRecvError>> + Send + '_>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WatchRecvError;
impl fmt::Display for WatchRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "watch channel closed")
}
}
impl std::error::Error for WatchRecvError {}
pub trait WatchSender<T: Send + Sync>: Send + Sync {
fn send(&self, value: T) -> Result<(), T>;
}
pub trait WatchReceiver<T: Send + Sync>: Clone + Send + Sync {
fn changed(&mut self) -> Pin<Box<dyn Future<Output = Result<(), WatchRecvError>> + Send + '_>>;
fn borrow_and_clone(&self) -> T;
}
pub trait AsyncFile: Send {
fn write_all<'a>(
&'a mut self,
buf: &'a [u8],
) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
fn read_exact<'a>(
&'a mut self,
buf: &'a mut [u8],
) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
fn read_to_end<'a>(
&'a mut self,
buf: &'a mut Vec<u8>,
) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;
fn seek<'a>(
&'a mut self,
pos: SeekFrom,
) -> Pin<Box<dyn Future<Output = io::Result<u64>> + Send + 'a>>;
fn sync_all(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimeoutError;
impl fmt::Display for TimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "operation timed out")
}
}
impl std::error::Error for TimeoutError {}
pub trait TcpListener: Send {
type Stream: TcpStream;
fn local_addr(&self) -> io::Result<SocketAddr>;
fn accept(
&mut self,
) -> Pin<Box<dyn Future<Output = io::Result<(Self::Stream, SocketAddr)>> + Send + '_>>;
}
pub trait TcpStream: Send {
fn read<'a>(
&'a mut self,
buf: &'a mut [u8],
) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;
fn read_exact<'a>(
&'a mut self,
buf: &'a mut [u8],
) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
fn write_all<'a>(
&'a mut self,
buf: &'a [u8],
) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
}
pub trait UdpSocket: Send {
fn local_addr(&self) -> io::Result<SocketAddr>;
fn send_to<'a>(
&'a self,
buf: &'a [u8],
addr: SocketAddr,
) -> Pin<Box<dyn Future<Output = io::Result<usize>> + Send + 'a>>;
fn recv_from<'a>(
&'a self,
buf: &'a mut [u8],
) -> Pin<Box<dyn Future<Output = io::Result<(usize, SocketAddr)>> + Send + 'a>>;
}
pub struct ConformanceTest<RT: RuntimeInterface> {
pub meta: TestMeta,
pub test_fn: fn(&RT) -> TestResult,
}
impl<RT: RuntimeInterface> ConformanceTest<RT> {
pub const fn new(meta: TestMeta, test_fn: fn(&RT) -> TestResult) -> Self {
Self { meta, test_fn }
}
pub fn run(&self, runtime: &RT) -> TestResult {
(self.test_fn)(runtime)
}
}
#[macro_export]
macro_rules! conformance_test {
(
id: $id:literal,
name: $name:literal,
description: $desc:literal,
category: $cat:expr,
tags: [$($tag:literal),* $(,)?],
expected: $expected:literal,
test: |$rt:ident| $body:expr
) => {
{
fn test_fn<RT: $crate::RuntimeInterface>($rt: &RT) -> $crate::TestResult {
$body
}
$crate::ConformanceTest::new(
$crate::TestMeta {
id: $id.to_string(),
name: $name.to_string(),
description: $desc.to_string(),
category: $cat,
tags: vec![$($tag.to_string()),*],
expected: $expected.to_string(),
},
test_fn,
)
}
};
}