use crate::util::DetRng;
use sha2::{Digest, Sha256};
pub const ATP_LAB_MODEL_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabScenario {
pub name: String,
pub seed: u64,
pub regimes: Vec<AtpLabRegime>,
pub oracle_config: AtpLabOracleConfig,
}
impl AtpLabScenario {
#[must_use]
pub fn new(name: impl Into<String>, seed: u64) -> Self {
Self {
name: name.into(),
seed,
regimes: Vec::new(),
oracle_config: AtpLabOracleConfig::default(),
}
}
#[must_use]
pub fn with_regime(mut self, regime: AtpLabRegime) -> Self {
self.regimes.push(regime);
self
}
#[must_use]
pub fn with_futurelock_and_leak_oracles(mut self) -> Self {
self.oracle_config.futurelock = true;
self.oracle_config.task_leak = true;
self.oracle_config.obligation_leak = true;
self.oracle_config.region_leak = true;
self
}
#[must_use]
pub fn with_artifact_retention(mut self) -> Self {
self.oracle_config.preserve_failure_artifacts = true;
self
}
#[must_use]
pub fn compose(self, transfer: AtpLabTransferSpec) -> AtpTransferLabPlan {
let events = generate_events(&self, &transfer);
let replay = AtpLabReplayMetadata::from_parts(&self, &transfer, &events);
AtpTransferLabPlan {
scenario: self,
transfer,
events,
replay,
}
}
#[must_use]
pub fn required_matrix() -> Vec<Self> {
vec![
Self::new("easy-nat-direct", 0xA7F0_0001)
.with_regime(AtpLabRegime::LanMulticast)
.with_regime(AtpLabRegime::EasyNat)
.with_regime(AtpLabRegime::ExplicitPublicUdp)
.with_regime(AtpLabRegime::Ipv6Direct),
Self::new("hard-nat-relay", 0xA7F0_0002)
.with_regime(AtpLabRegime::HardNat)
.with_regime(AtpLabRegime::SymmetricNat)
.with_regime(AtpLabRegime::RelayOnly)
.with_regime(AtpLabRegime::RelayTcpTls443),
Self::new("udp-blocked-private-route", 0xA7F0_0003)
.with_regime(AtpLabRegime::UdpBlocked)
.with_regime(AtpLabRegime::TailscalePrivateRoute),
Self::new("enterprise-masque-connect-udp", 0xA7F0_0007)
.with_regime(AtpLabRegime::UdpBlocked)
.with_regime(AtpLabRegime::MasqueConnectUdpProxy),
Self::new("mailbox-only-store-forward", 0xA7F0_0008)
.with_regime(AtpLabRegime::UdpBlocked)
.with_regime(AtpLabRegime::OfflineMailbox),
Self::new("path-migration-loss", 0xA7F0_0004)
.with_regime(AtpLabRegime::PathMigration)
.with_regime(AtpLabRegime::PacketDuplication)
.with_regime(AtpLabRegime::DelayedAck)
.with_regime(AtpLabRegime::AckLoss)
.with_regime(AtpLabRegime::PtoStorm),
Self::new("disk-crash-replay", 0xA7F0_0005)
.with_regime(AtpLabRegime::DiskStall)
.with_regime(AtpLabRegime::Crash)
.with_artifact_retention(),
Self::new("adversarial-relay-repair", 0xA7F0_0006)
.with_regime(AtpLabRegime::PacketTruncation)
.with_regime(AtpLabRegime::MaliciousChunks)
.with_regime(AtpLabRegime::CorruptedRepairSymbols)
.with_regime(AtpLabRegime::LyingRelay)
.with_regime(AtpLabRegime::StalledReceiver)
.with_regime(AtpLabRegime::Cancellation)
.with_futurelock_and_leak_oracles()
.with_artifact_retention(),
]
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabTransferSpec {
pub source: String,
pub destination: String,
pub bytes: u64,
pub object_count: u64,
}
impl AtpLabTransferSpec {
#[must_use]
pub fn new(
source: impl Into<String>,
destination: impl Into<String>,
bytes: u64,
object_count: u64,
) -> Self {
Self {
source: source.into(),
destination: destination.into(),
bytes,
object_count,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpTransferLabPlan {
pub scenario: AtpLabScenario,
pub transfer: AtpLabTransferSpec,
pub events: Vec<AtpLabEvent>,
pub replay: AtpLabReplayMetadata,
}
impl AtpTransferLabPlan {
#[must_use]
pub fn run_model(&self) -> AtpLabArtifact {
let failure = self
.events
.iter()
.find(|event| event.fault.is_failure())
.map(|event| AtpLabFailure {
step: event.step,
fault: event.fault.clone(),
replay_hint: format!(
"replay {} from seed {} at step {}",
self.replay.minimization_key, self.replay.seed, event.step
),
});
let attachments =
if failure.is_some() && self.scenario.oracle_config.preserve_failure_artifacts {
vec![
AtpLabAttachment::from_text(
"events.txt",
self.events
.iter()
.map(AtpLabEvent::artifact_line)
.collect::<Vec<_>>()
.join("\n"),
),
AtpLabAttachment::from_text("replay.txt", self.replay.replay_command.clone()),
]
} else {
Vec::new()
};
AtpLabArtifact {
replay: self.replay.clone(),
oracle_config: self.scenario.oracle_config.clone(),
events: self.events.clone(),
failure,
attachments,
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum AtpLabRegime {
LanMulticast,
EasyNat,
ExplicitPublicUdp,
HardNat,
SymmetricNat,
UdpBlocked,
Ipv6Direct,
RelayOnly,
RelayTcpTls443,
TailscalePrivateRoute,
MasqueConnectUdpProxy,
OfflineMailbox,
PathMigration,
PacketDuplication,
PacketTruncation,
DelayedAck,
AckLoss,
PtoStorm,
DiskStall,
Crash,
MaliciousChunks,
CorruptedRepairSymbols,
LyingRelay,
StalledReceiver,
Cancellation,
}
impl AtpLabRegime {
#[must_use]
pub const fn label(self) -> &'static str {
match self {
Self::LanMulticast => "lan_multicast",
Self::EasyNat => "easy_nat",
Self::ExplicitPublicUdp => "explicit_public_udp",
Self::HardNat => "hard_nat",
Self::SymmetricNat => "symmetric_nat",
Self::UdpBlocked => "udp_blocked",
Self::Ipv6Direct => "ipv6_direct",
Self::RelayOnly => "relay_only",
Self::RelayTcpTls443 => "relay_tcp_tls_443",
Self::TailscalePrivateRoute => "tailscale_private_route",
Self::MasqueConnectUdpProxy => "masque_connect_udp_proxy",
Self::OfflineMailbox => "offline_mailbox",
Self::PathMigration => "path_migration",
Self::PacketDuplication => "packet_duplication",
Self::PacketTruncation => "packet_truncation",
Self::DelayedAck => "delayed_ack",
Self::AckLoss => "ack_loss",
Self::PtoStorm => "pto_storm",
Self::DiskStall => "disk_stall",
Self::Crash => "crash",
Self::MaliciousChunks => "malicious_chunks",
Self::CorruptedRepairSymbols => "corrupted_repair_symbols",
Self::LyingRelay => "lying_relay",
Self::StalledReceiver => "stalled_receiver",
Self::Cancellation => "cancellation",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum AtpLabFault {
DirectPath,
ExplicitPublicUdpPath,
DirectPathBlocked,
RelayPath,
RelayTcpTls443Path,
PrivateRoute,
MasqueProxyPath,
OfflineMailboxPath,
PathMigrated,
PacketDuplicated,
PacketTruncated,
AckDelayed { micros: u64 },
AckLost,
PtoStorm { bursts: u8 },
DiskStall { micros: u64 },
Crash { restart_after_micros: u64 },
MaliciousChunk,
CorruptedRepairSymbol,
LyingRelay,
StalledReceiver { micros: u64 },
CancellationRequested,
}
impl AtpLabFault {
fn is_failure(&self) -> bool {
matches!(
self,
Self::PacketTruncated
| Self::Crash { .. }
| Self::MaliciousChunk
| Self::CorruptedRepairSymbol
| Self::LyingRelay
| Self::StalledReceiver { .. }
| Self::CancellationRequested
)
}
fn label(&self) -> &'static str {
match self {
Self::DirectPath => "direct_path",
Self::ExplicitPublicUdpPath => "explicit_public_udp_path",
Self::DirectPathBlocked => "direct_path_blocked",
Self::RelayPath => "relay_path",
Self::RelayTcpTls443Path => "relay_tcp_tls_443_path",
Self::PrivateRoute => "private_route",
Self::MasqueProxyPath => "masque_proxy_path",
Self::OfflineMailboxPath => "offline_mailbox_path",
Self::PathMigrated => "path_migrated",
Self::PacketDuplicated => "packet_duplicated",
Self::PacketTruncated => "packet_truncated",
Self::AckDelayed { .. } => "ack_delayed",
Self::AckLost => "ack_lost",
Self::PtoStorm { .. } => "pto_storm",
Self::DiskStall { .. } => "disk_stall",
Self::Crash { .. } => "crash",
Self::MaliciousChunk => "malicious_chunk",
Self::CorruptedRepairSymbol => "corrupted_repair_symbol",
Self::LyingRelay => "lying_relay",
Self::StalledReceiver { .. } => "stalled_receiver",
Self::CancellationRequested => "cancellation_requested",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabEvent {
pub step: u64,
pub tick_micros: u64,
pub regime: AtpLabRegime,
pub fault: AtpLabFault,
}
impl AtpLabEvent {
fn artifact_line(&self) -> String {
format!(
"step={} tick={} regime={} fault={}",
self.step,
self.tick_micros,
self.regime.label(),
self.fault.label()
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabOracleConfig {
pub futurelock: bool,
pub task_leak: bool,
pub obligation_leak: bool,
pub region_leak: bool,
pub preserve_failure_artifacts: bool,
pub minimization: bool,
}
impl Default for AtpLabOracleConfig {
fn default() -> Self {
Self {
futurelock: false,
task_leak: true,
obligation_leak: true,
region_leak: true,
preserve_failure_artifacts: true,
minimization: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabReplayMetadata {
pub schema_version: u32,
pub seed: u64,
pub scenario_name: String,
pub fingerprint_hex: String,
pub minimization_key: String,
pub replay_command: String,
}
impl AtpLabReplayMetadata {
fn from_parts(
scenario: &AtpLabScenario,
transfer: &AtpLabTransferSpec,
events: &[AtpLabEvent],
) -> Self {
let fingerprint_hex = fingerprint_hex(scenario, transfer, events);
let minimization_key = format!("atp-lab-{}-{}", scenario.name, &fingerprint_hex[..12]);
Self {
schema_version: ATP_LAB_MODEL_SCHEMA_VERSION,
seed: scenario.seed,
scenario_name: scenario.name.clone(),
fingerprint_hex,
replay_command: format!(
"cargo test -p asupersync atp_lab -- --exact {}",
scenario.name
),
minimization_key,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabAttachment {
pub name: String,
pub byte_len: u64,
pub sha256_hex: String,
pub text: String,
}
impl AtpLabAttachment {
fn from_text(name: impl Into<String>, text: String) -> Self {
let digest = Sha256::digest(text.as_bytes());
Self {
name: name.into(),
byte_len: u64::try_from(text.len()).unwrap_or(u64::MAX),
sha256_hex: hex::encode(digest),
text,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabFailure {
pub step: u64,
pub fault: AtpLabFault,
pub replay_hint: String,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AtpLabArtifact {
pub replay: AtpLabReplayMetadata,
pub oracle_config: AtpLabOracleConfig,
pub events: Vec<AtpLabEvent>,
pub failure: Option<AtpLabFailure>,
pub attachments: Vec<AtpLabAttachment>,
}
fn generate_events(scenario: &AtpLabScenario, transfer: &AtpLabTransferSpec) -> Vec<AtpLabEvent> {
let mut rng = DetRng::new(scenario.seed ^ transfer.bytes ^ transfer.object_count);
let mut events = Vec::new();
for regime in &scenario.regimes {
let step = u64::try_from(events.len()).unwrap_or(u64::MAX);
let jitter = rng.next_u64() % 997;
let tick_micros = step.saturating_mul(10_000).saturating_add(jitter);
events.push(AtpLabEvent {
step,
tick_micros,
regime: *regime,
fault: fault_for_regime(*regime, &mut rng),
});
}
events
}
fn fault_for_regime(regime: AtpLabRegime, rng: &mut DetRng) -> AtpLabFault {
match regime {
AtpLabRegime::LanMulticast | AtpLabRegime::EasyNat | AtpLabRegime::Ipv6Direct => {
AtpLabFault::DirectPath
}
AtpLabRegime::ExplicitPublicUdp => AtpLabFault::ExplicitPublicUdpPath,
AtpLabRegime::HardNat | AtpLabRegime::SymmetricNat | AtpLabRegime::UdpBlocked => {
AtpLabFault::DirectPathBlocked
}
AtpLabRegime::RelayOnly => AtpLabFault::RelayPath,
AtpLabRegime::RelayTcpTls443 => AtpLabFault::RelayTcpTls443Path,
AtpLabRegime::TailscalePrivateRoute => AtpLabFault::PrivateRoute,
AtpLabRegime::MasqueConnectUdpProxy => AtpLabFault::MasqueProxyPath,
AtpLabRegime::OfflineMailbox => AtpLabFault::OfflineMailboxPath,
AtpLabRegime::PathMigration => AtpLabFault::PathMigrated,
AtpLabRegime::PacketDuplication => AtpLabFault::PacketDuplicated,
AtpLabRegime::PacketTruncation => AtpLabFault::PacketTruncated,
AtpLabRegime::DelayedAck => AtpLabFault::AckDelayed {
micros: 5_000 + (rng.next_u64() % 20_000),
},
AtpLabRegime::AckLoss => AtpLabFault::AckLost,
AtpLabRegime::PtoStorm => AtpLabFault::PtoStorm {
bursts: 1 + u8::try_from(rng.next_u64() % 4).unwrap_or(0),
},
AtpLabRegime::DiskStall => AtpLabFault::DiskStall {
micros: 50_000 + (rng.next_u64() % 500_000),
},
AtpLabRegime::Crash => AtpLabFault::Crash {
restart_after_micros: 100_000 + (rng.next_u64() % 1_000_000),
},
AtpLabRegime::MaliciousChunks => AtpLabFault::MaliciousChunk,
AtpLabRegime::CorruptedRepairSymbols => AtpLabFault::CorruptedRepairSymbol,
AtpLabRegime::LyingRelay => AtpLabFault::LyingRelay,
AtpLabRegime::StalledReceiver => AtpLabFault::StalledReceiver {
micros: 250_000 + (rng.next_u64() % 750_000),
},
AtpLabRegime::Cancellation => AtpLabFault::CancellationRequested,
}
}
fn fingerprint_hex(
scenario: &AtpLabScenario,
transfer: &AtpLabTransferSpec,
events: &[AtpLabEvent],
) -> String {
let mut hasher = Sha256::new();
hasher.update(ATP_LAB_MODEL_SCHEMA_VERSION.to_be_bytes());
hasher.update(scenario.name.as_bytes());
hasher.update(scenario.seed.to_be_bytes());
hasher.update(transfer.source.as_bytes());
hasher.update(transfer.destination.as_bytes());
hasher.update(transfer.bytes.to_be_bytes());
hasher.update(transfer.object_count.to_be_bytes());
for event in events {
hasher.update(event.step.to_be_bytes());
hasher.update(event.tick_micros.to_be_bytes());
hasher.update(event.regime.label().as_bytes());
hasher.update(event.fault.label().as_bytes());
}
hex::encode(hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
fn transfer() -> AtpLabTransferSpec {
AtpLabTransferSpec::new("alice-laptop", "gpu-box", 8 * 1024 * 1024, 3)
}
#[test]
fn required_matrix_covers_acceptance_regimes() {
let covered: BTreeSet<_> = AtpLabScenario::required_matrix()
.into_iter()
.flat_map(|scenario| scenario.regimes)
.collect();
let required = BTreeSet::from([
AtpLabRegime::LanMulticast,
AtpLabRegime::EasyNat,
AtpLabRegime::ExplicitPublicUdp,
AtpLabRegime::HardNat,
AtpLabRegime::SymmetricNat,
AtpLabRegime::UdpBlocked,
AtpLabRegime::Ipv6Direct,
AtpLabRegime::RelayOnly,
AtpLabRegime::RelayTcpTls443,
AtpLabRegime::TailscalePrivateRoute,
AtpLabRegime::MasqueConnectUdpProxy,
AtpLabRegime::OfflineMailbox,
AtpLabRegime::PathMigration,
AtpLabRegime::PacketDuplication,
AtpLabRegime::PacketTruncation,
AtpLabRegime::DelayedAck,
AtpLabRegime::AckLoss,
AtpLabRegime::PtoStorm,
AtpLabRegime::DiskStall,
AtpLabRegime::Crash,
AtpLabRegime::MaliciousChunks,
AtpLabRegime::CorruptedRepairSymbols,
AtpLabRegime::LyingRelay,
AtpLabRegime::StalledReceiver,
AtpLabRegime::Cancellation,
]);
assert_eq!(covered, required);
}
#[test]
fn composed_transfer_plan_is_deterministic() {
let scenario = AtpLabScenario::new("path-migration", 42)
.with_regime(AtpLabRegime::PathMigration)
.with_regime(AtpLabRegime::AckLoss);
let first = scenario.clone().compose(transfer());
let second = scenario.compose(transfer());
assert_eq!(first.events, second.events);
assert_eq!(first.replay.fingerprint_hex, second.replay.fingerprint_hex);
}
#[test]
fn futurelock_and_leak_oracles_can_be_enabled() {
let scenario = AtpLabScenario::new("oracles", 7).with_futurelock_and_leak_oracles();
assert!(scenario.oracle_config.futurelock);
assert!(scenario.oracle_config.task_leak);
assert!(scenario.oracle_config.obligation_leak);
assert!(scenario.oracle_config.region_leak);
}
#[test]
fn failure_preserves_replay_and_minimization_artifacts() {
let plan = AtpLabScenario::new("adversary", 99)
.with_regime(AtpLabRegime::LyingRelay)
.with_artifact_retention()
.compose(transfer());
let artifact = plan.run_model();
assert!(artifact.failure.is_some());
assert_eq!(artifact.attachments.len(), 2);
assert!(
artifact
.replay
.minimization_key
.starts_with("atp-lab-adversary")
);
assert!(artifact.attachments[0].text.contains("fault=lying_relay"));
}
#[test]
fn masque_proxy_regime_emits_non_failure_adapter_event() {
let plan = AtpLabScenario::new("masque-proxy", 0xA7F0_0007)
.with_regime(AtpLabRegime::MasqueConnectUdpProxy)
.compose(transfer());
assert_eq!(plan.events.len(), 1);
assert_eq!(plan.events[0].fault, AtpLabFault::MasqueProxyPath);
assert_eq!(plan.events[0].fault.label(), "masque_proxy_path");
assert!(plan.run_model().failure.is_none());
}
}