pub mod integration_tests;
pub mod masque;
pub mod tcptls;
pub mod webtransport;
use crate::atp::object::ObjectId;
use crate::error::{Error, ErrorKind, Result};
use crate::types::TraceId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum AdapterType {
#[serde(rename = "native-quic")]
NativeQuic,
#[serde(rename = "webtransport")]
WebTransport,
#[serde(rename = "masque-connect-udp")]
MasqueConnectUdp,
#[serde(rename = "tcp-tls-443")]
TcpTlsFallback,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterParity {
pub object_support: FeatureSupport,
pub stream_support: FeatureSupport,
pub proof_support: FeatureSupport,
pub path_support: FeatureSupport,
pub repair_support: FeatureSupport,
pub datagram_support: FeatureSupport,
pub mailbox_support: FeatureSupport,
pub swarm_support: FeatureSupport,
pub diagnostic_support: FeatureSupport,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum FeatureSupport {
Full,
Partial,
Downgraded,
Unsupported,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterConfig {
pub preferred_adapters: Vec<AdapterType>,
pub downgrade_policy: DowngradePolicy,
pub required_features: Vec<RequiredFeature>,
pub adapter_configs: HashMap<AdapterType, AdapterSpecificConfig>,
pub caveat_reporting: CaveatReporting,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DowngradePolicy {
Strict,
AllowDowngrade,
AllowSpecific(Vec<AdapterType>),
FallbackOnly(AdapterType),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RequiredFeature {
ObjectVerification,
StreamProtocol,
ProofGeneration,
RepairCapabilities,
SwarmCoordination,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterSpecificConfig {
pub connection_timeout: Duration,
pub max_concurrent: usize,
pub feature_flags: HashMap<String, bool>,
pub performance_config: PerformanceConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceConfig {
pub buffer_sizes: BufferSizes,
pub retry_policy: RetryPolicy,
pub keep_alive: KeepAliveConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferSizes {
pub send_buffer: usize,
pub recv_buffer: usize,
pub max_frame_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: usize,
pub base_delay: Duration,
pub max_delay: Duration,
pub backoff_factor: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeepAliveConfig {
pub interval: Duration,
pub timeout: Duration,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaveatReporting {
pub report_performance: bool,
pub report_hol_blocking: bool,
pub report_diagnostic_limits: bool,
pub include_timing: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterNegotiation {
pub selected_adapter: AdapterType,
pub feature_parity: AdapterParity,
pub downgrade_reasons: Vec<DowngradeReason>,
pub performance_caveats: Vec<PerformanceCaveat>,
pub adapter_metadata: AdapterMetadata,
pub negotiated_at: SystemTime,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DowngradeReason {
AdapterUnavailable,
FeatureRequirementsNotMet(Vec<RequiredFeature>),
ConnectionFailed(String),
PerformanceBelowThreshold,
PolicyEnforced,
NetworkConditions,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PerformanceCaveat {
HeadOfLineBlocking,
IncreasedLatency(Duration),
ReducedThroughput(f64),
NoMultiplexing,
LimitedConcurrency(usize),
NestedTransportOverhead,
ReliabilityConcerns(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterMetadata {
pub version: String,
pub path_info: TransportPath,
pub relay_info: Option<RelayInfo>,
pub security_params: SecurityParams,
pub replay_pointer: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportPath {
pub local_endpoint: String,
pub remote_endpoint: String,
pub intermediate_hops: Vec<String>,
pub path_mtu: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayInfo {
pub relay_address: String,
pub relay_type: String,
pub relay_capabilities: Vec<String>,
pub relay_path_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityParams {
pub tls_version: Option<String>,
pub cipher_suite: Option<String>,
pub cert_validation: CertValidationMode,
pub security_flags: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CertValidationMode {
Full,
Relaxed,
Custom,
}
#[derive(Debug)]
pub struct AdapterManager {
config: AdapterConfig,
parity_matrix: HashMap<AdapterType, AdapterParity>,
active_sessions: HashMap<String, AdapterSession>,
metrics: AdapterMetrics,
}
#[derive(Debug, Clone)]
pub struct AdapterSession {
pub session_id: String,
pub negotiation: AdapterNegotiation,
pub started_at: SystemTime,
pub last_activity: SystemTime,
pub stats: SessionStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionStats {
pub bytes_sent: u64,
pub bytes_received: u64,
pub objects_transferred: u64,
pub streams_created: u64,
pub connection_errors: u64,
pub avg_latency: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterMetrics {
pub sessions_by_adapter: HashMap<AdapterType, u64>,
pub downgrade_frequency: HashMap<DowngradeReason, u64>,
pub caveat_frequency: HashMap<String, u64>,
pub avg_session_duration: HashMap<AdapterType, Duration>,
pub success_rates: HashMap<AdapterType, f64>,
pub last_updated: SystemTime,
}
impl AdapterManager {
pub fn new(config: AdapterConfig) -> Self {
let parity_matrix = Self::build_parity_matrix();
Self {
config,
parity_matrix,
active_sessions: HashMap::new(),
metrics: AdapterMetrics::new(),
}
}
pub async fn negotiate_adapter(
&mut self,
requirements: &[RequiredFeature],
trace_id: TraceId,
) -> Result<AdapterNegotiation> {
for &adapter_type in &self.config.preferred_adapters {
if let Ok(negotiation) = self.try_adapter(adapter_type, requirements, trace_id).await {
self.record_successful_negotiation(&negotiation);
return Ok(negotiation);
}
}
match &self.config.downgrade_policy {
DowngradePolicy::Strict => Err(Error::new(ErrorKind::ConnectionRefused)),
DowngradePolicy::AllowDowngrade => {
self.try_fallback_adapters(requirements, trace_id).await
}
DowngradePolicy::AllowSpecific(allowed) => {
self.try_specific_adapters(allowed, requirements, trace_id)
.await
}
DowngradePolicy::FallbackOnly(fallback) => {
self.try_adapter(*fallback, requirements, trace_id).await
}
}
}
pub async fn start_session(
&mut self,
negotiation: AdapterNegotiation,
_object_id: ObjectId,
) -> Result<String> {
let session_id = format!(
"adapter-{:?}-{}",
negotiation.selected_adapter,
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let session = AdapterSession {
session_id: session_id.clone(),
negotiation,
started_at: SystemTime::now(),
last_activity: SystemTime::now(),
stats: SessionStats::new(),
};
self.active_sessions.insert(session_id.clone(), session);
self.update_metrics();
Ok(session_id)
}
pub fn get_adapter_parity(&self, adapter_type: AdapterType) -> Option<&AdapterParity> {
self.parity_matrix.get(&adapter_type)
}
pub fn metrics(&self) -> &AdapterMetrics {
&self.metrics
}
fn build_parity_matrix() -> HashMap<AdapterType, AdapterParity> {
let mut matrix = HashMap::new();
matrix.insert(
AdapterType::NativeQuic,
AdapterParity {
object_support: FeatureSupport::Full,
stream_support: FeatureSupport::Full,
proof_support: FeatureSupport::Full,
path_support: FeatureSupport::Full,
repair_support: FeatureSupport::Full,
datagram_support: FeatureSupport::Full,
mailbox_support: FeatureSupport::Full,
swarm_support: FeatureSupport::Full,
diagnostic_support: FeatureSupport::Full,
},
);
matrix.insert(
AdapterType::WebTransport,
AdapterParity {
object_support: FeatureSupport::Full,
stream_support: FeatureSupport::Full,
proof_support: FeatureSupport::Partial, path_support: FeatureSupport::Partial, repair_support: FeatureSupport::Partial, datagram_support: FeatureSupport::Full,
mailbox_support: FeatureSupport::Full,
swarm_support: FeatureSupport::Partial, diagnostic_support: FeatureSupport::Partial, },
);
matrix.insert(
AdapterType::MasqueConnectUdp,
AdapterParity {
object_support: FeatureSupport::Full,
stream_support: FeatureSupport::Downgraded, proof_support: FeatureSupport::Full,
path_support: FeatureSupport::Downgraded, repair_support: FeatureSupport::Partial, datagram_support: FeatureSupport::Full,
mailbox_support: FeatureSupport::Full,
swarm_support: FeatureSupport::Downgraded, diagnostic_support: FeatureSupport::Partial, },
);
matrix.insert(
AdapterType::TcpTlsFallback,
AdapterParity {
object_support: FeatureSupport::Full,
stream_support: FeatureSupport::Downgraded, proof_support: FeatureSupport::Full,
path_support: FeatureSupport::Downgraded, repair_support: FeatureSupport::Downgraded, datagram_support: FeatureSupport::Unsupported, mailbox_support: FeatureSupport::Downgraded, swarm_support: FeatureSupport::Downgraded, diagnostic_support: FeatureSupport::Partial, },
);
matrix
}
async fn try_adapter(
&self,
adapter_type: AdapterType,
requirements: &[RequiredFeature],
trace_id: TraceId,
) -> Result<AdapterNegotiation> {
let parity = self
.parity_matrix
.get(&adapter_type)
.ok_or_else(|| Error::new(ErrorKind::ConfigError))?;
let mut downgrade_reasons = Vec::new();
let mut performance_caveats = Vec::new();
for requirement in requirements {
if !self.check_feature_requirement(parity, requirement) {
downgrade_reasons.push(DowngradeReason::FeatureRequirementsNotMet(vec![
requirement.clone(),
]));
}
}
if !downgrade_reasons.is_empty() {
return Err(Error::new(ErrorKind::ConnectionRefused).with_message(format!(
"{adapter_type} does not satisfy required ATP adapter features: {downgrade_reasons:?}"
)));
}
self.add_adapter_caveats(adapter_type, &mut performance_caveats);
let negotiation = AdapterNegotiation {
selected_adapter: adapter_type,
feature_parity: parity.clone(),
downgrade_reasons,
performance_caveats,
adapter_metadata: self.build_adapter_metadata(adapter_type, trace_id),
negotiated_at: SystemTime::now(),
};
Ok(negotiation)
}
async fn try_fallback_adapters(
&self,
requirements: &[RequiredFeature],
trace_id: TraceId,
) -> Result<AdapterNegotiation> {
let fallback_order = [
AdapterType::WebTransport,
AdapterType::MasqueConnectUdp,
AdapterType::TcpTlsFallback,
];
for &adapter_type in &fallback_order {
if let Ok(negotiation) = self.try_adapter(adapter_type, requirements, trace_id).await {
return Ok(negotiation);
}
}
Err(Error::new(ErrorKind::ConnectionRefused))
}
async fn try_specific_adapters(
&self,
allowed: &[AdapterType],
requirements: &[RequiredFeature],
trace_id: TraceId,
) -> Result<AdapterNegotiation> {
for &adapter_type in allowed {
if let Ok(negotiation) = self.try_adapter(adapter_type, requirements, trace_id).await {
return Ok(negotiation);
}
}
Err(Error::new(ErrorKind::ConnectionRefused))
}
fn check_feature_requirement(
&self,
parity: &AdapterParity,
requirement: &RequiredFeature,
) -> bool {
match requirement {
RequiredFeature::ObjectVerification => matches!(
parity.object_support,
FeatureSupport::Full | FeatureSupport::Partial
),
RequiredFeature::StreamProtocol => matches!(
parity.stream_support,
FeatureSupport::Full | FeatureSupport::Partial | FeatureSupport::Downgraded
),
RequiredFeature::ProofGeneration => matches!(
parity.proof_support,
FeatureSupport::Full | FeatureSupport::Partial
),
RequiredFeature::RepairCapabilities => matches!(
parity.repair_support,
FeatureSupport::Full | FeatureSupport::Partial | FeatureSupport::Downgraded
),
RequiredFeature::SwarmCoordination => matches!(
parity.swarm_support,
FeatureSupport::Full | FeatureSupport::Partial | FeatureSupport::Downgraded
),
}
}
fn add_adapter_caveats(&self, adapter_type: AdapterType, caveats: &mut Vec<PerformanceCaveat>) {
match adapter_type {
AdapterType::NativeQuic => {
}
AdapterType::WebTransport => {
caveats.push(PerformanceCaveat::NestedTransportOverhead);
}
AdapterType::MasqueConnectUdp => {
caveats.push(PerformanceCaveat::NestedTransportOverhead);
caveats.push(PerformanceCaveat::IncreasedLatency(Duration::from_millis(
50,
)));
}
AdapterType::TcpTlsFallback => {
caveats.push(PerformanceCaveat::HeadOfLineBlocking);
caveats.push(PerformanceCaveat::NoMultiplexing);
caveats.push(PerformanceCaveat::IncreasedLatency(Duration::from_millis(
100,
)));
caveats.push(PerformanceCaveat::ReducedThroughput(0.7)); }
}
}
fn build_adapter_metadata(
&self,
_adapter_type: AdapterType,
trace_id: TraceId,
) -> AdapterMetadata {
AdapterMetadata {
version: "1.0.0".to_string(),
path_info: TransportPath {
local_endpoint: "0.0.0.0:0".to_string(),
remote_endpoint: "example.com:443".to_string(),
intermediate_hops: Vec::new(),
path_mtu: Some(1500),
},
relay_info: None, security_params: SecurityParams {
tls_version: Some("TLS 1.3".to_string()),
cipher_suite: Some("TLS_AES_128_GCM_SHA256".to_string()),
cert_validation: CertValidationMode::Full,
security_flags: vec!["ALPN".to_string()],
},
replay_pointer: Some(format!("trace-{}", trace_id.as_u128())),
}
}
fn record_successful_negotiation(&mut self, negotiation: &AdapterNegotiation) {
*self
.metrics
.sessions_by_adapter
.entry(negotiation.selected_adapter)
.or_insert(0) += 1;
self.metrics.last_updated = SystemTime::now();
}
fn update_metrics(&mut self) {
self.metrics.last_updated = SystemTime::now();
}
}
impl Default for AdapterConfig {
fn default() -> Self {
Self {
preferred_adapters: vec![
AdapterType::NativeQuic,
AdapterType::WebTransport,
AdapterType::MasqueConnectUdp,
AdapterType::TcpTlsFallback,
],
downgrade_policy: DowngradePolicy::AllowDowngrade,
required_features: vec![RequiredFeature::ObjectVerification],
adapter_configs: HashMap::new(),
caveat_reporting: CaveatReporting {
report_performance: true,
report_hol_blocking: true,
report_diagnostic_limits: true,
include_timing: true,
},
}
}
}
impl SessionStats {
fn new() -> Self {
Self {
bytes_sent: 0,
bytes_received: 0,
objects_transferred: 0,
streams_created: 0,
connection_errors: 0,
avg_latency: Duration::from_millis(0),
}
}
}
impl AdapterMetrics {
fn new() -> Self {
Self {
sessions_by_adapter: HashMap::new(),
downgrade_frequency: HashMap::new(),
caveat_frequency: HashMap::new(),
avg_session_duration: HashMap::new(),
success_rates: HashMap::new(),
last_updated: SystemTime::now(),
}
}
}
impl fmt::Display for AdapterType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AdapterType::NativeQuic => write!(f, "native-quic"),
AdapterType::WebTransport => write!(f, "webtransport"),
AdapterType::MasqueConnectUdp => write!(f, "masque-connect-udp"),
AdapterType::TcpTlsFallback => write!(f, "tcp-tls-443"),
}
}
}
impl fmt::Display for FeatureSupport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FeatureSupport::Full => write!(f, "full"),
FeatureSupport::Partial => write!(f, "partial"),
FeatureSupport::Downgraded => write!(f, "downgraded"),
FeatureSupport::Unsupported => write!(f, "unsupported"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future::block_on;
#[test]
fn test_adapter_parity_matrix() {
let manager = AdapterManager::new(AdapterConfig::default());
let native_parity = manager.get_adapter_parity(AdapterType::NativeQuic).unwrap();
assert_eq!(native_parity.object_support, FeatureSupport::Full);
assert_eq!(native_parity.stream_support, FeatureSupport::Full);
let tcp_parity = manager
.get_adapter_parity(AdapterType::TcpTlsFallback)
.unwrap();
assert_eq!(tcp_parity.datagram_support, FeatureSupport::Unsupported);
assert_eq!(tcp_parity.stream_support, FeatureSupport::Downgraded);
}
#[test]
fn test_feature_requirements() {
let manager = AdapterManager::new(AdapterConfig::default());
let tcp_parity = manager
.get_adapter_parity(AdapterType::TcpTlsFallback)
.unwrap();
assert!(
manager.check_feature_requirement(tcp_parity, &RequiredFeature::ObjectVerification)
);
assert!(manager.check_feature_requirement(tcp_parity, &RequiredFeature::StreamProtocol));
}
#[test]
fn test_adapter_negotiation() {
block_on(async {
let mut manager = AdapterManager::new(AdapterConfig::default());
let requirements = vec![RequiredFeature::ObjectVerification];
let trace_id = TraceId::from_parts(1, 1);
let negotiation = manager
.negotiate_adapter(&requirements, trace_id)
.await
.unwrap();
assert_eq!(negotiation.selected_adapter, AdapterType::NativeQuic);
});
}
#[test]
fn adapter_negotiation_skips_unmet_feature_requirements() {
block_on(async {
let mut manager = AdapterManager::new(AdapterConfig::default());
manager.config.preferred_adapters =
vec![AdapterType::NativeQuic, AdapterType::WebTransport];
manager
.parity_matrix
.get_mut(&AdapterType::NativeQuic)
.expect("native parity")
.object_support = FeatureSupport::Unsupported;
let negotiation = manager
.negotiate_adapter(
&[RequiredFeature::ObjectVerification],
TraceId::from_parts(1, 1),
)
.await
.expect("compatible fallback should be selected");
assert_eq!(negotiation.selected_adapter, AdapterType::WebTransport);
assert!(
negotiation.downgrade_reasons.is_empty(),
"selected adapter should satisfy the requested features"
);
});
}
#[test]
fn test_adapter_display() {
assert_eq!(format!("{}", AdapterType::NativeQuic), "native-quic");
assert_eq!(format!("{}", AdapterType::WebTransport), "webtransport");
assert_eq!(
format!("{}", AdapterType::MasqueConnectUdp),
"masque-connect-udp"
);
assert_eq!(format!("{}", AdapterType::TcpTlsFallback), "tcp-tls-443");
}
#[test]
fn test_feature_support_display() {
assert_eq!(format!("{}", FeatureSupport::Full), "full");
assert_eq!(format!("{}", FeatureSupport::Partial), "partial");
assert_eq!(format!("{}", FeatureSupport::Downgraded), "downgraded");
assert_eq!(format!("{}", FeatureSupport::Unsupported), "unsupported");
}
}