use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
net::SocketAddr,
sync::{
Arc, Mutex, OnceLock,
atomic::{AtomicU64, Ordering},
mpsc as std_mpsc,
},
time::{Duration, Instant},
};
use datum::{
Keep, Materializer, NotUsed, Sink, Source, StreamCompletion, StreamRefId, StreamRefSettings,
StreamRefs,
};
use datum_net::Datagram;
use datum_net::quic::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
quinn,
rustls::{
ClientConfig as QuicRustlsClientConfig, RootCertStore as QuicRootCertStore,
ServerConfig as QuicRustlsServerConfig,
pki_types::{
CertificateDer as QuicCertificateDer, PrivateKeyDer as QuicPrivateKeyDer,
PrivatePkcs8KeyDer as QuicPrivatePkcs8KeyDer,
},
},
};
use datum_net::tls::rustls::{
ClientConfig, RootCertStore, ServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName},
};
use datum_net::{
QuicConnection, QuicIncomingConnection, TlsIncomingConnection, TokioQuic, TokioTls, TokioUdp,
serve_source_ref_over_quic, source_ref_over_quic,
};
const TLS_CHUNK_SIZE: usize = 8192;
const TLS_PAYLOAD_BYTES: usize = 64;
const UDP_DATAGRAMS: usize = 128;
const UDP_PAYLOAD_BYTES: usize = 64;
const UDP_DATAGRAM_SIZE: usize = 2048;
const UDP_RECEIVE_BUFFER: usize = 256;
const STREAMREF_N: u64 = 1024;
const STREAMREF_CHUNK_SIZE: usize = 8192;
const STREAMREF_ID: StreamRefId = StreamRefId::from_u128(1);
const CERT_DER_B64: &str = "MIIDSTCCAjGgAwIBAgIUN1YxPtPF67raIV0xnXhIFpgYFMcwDQYJKoZIhvcNAQELBQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI2MDYxNjE1NDE1OFoXDTM2MDYxMzE1NDE1OFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoUfl8KwUMwSJOEmRjQoqm+A3Qx+trcIU7JkUiYaVDMO/om/5R1wrRAngeOdxNiX2CQbtodN7GGZBOOXIhBxrW97ZDji+JZSxIOEkX/u6dAopbRsGWf60ueSkPBVOAXpSVylCocUeoD4THNpW63VTYXeREQqWhvZiHm0TgApSeA755Jsokk8ka9+Uvn/7V4QRzmhQEarcJQQgRedWIi+SnrHlnT5SYkxqIx/1TKud3yfvxIInZZregis6a7psJmyXZmzwclcZp7k0cVP9YAm/P/mY/J19WckDQiK3DgvjFE5BOYCJT5q1O05pHYNQEOiJ40UG+8Ah3wZVbYCCTsGzQwIDAQABo4GSMIGPMB0GA1UdDgQWBBTQCPPq+yHQwkFUFa4HEVorPT4pDTAfBgNVHSMEGDAWgBTQCPPq+yHQwkFUFa4HEVorPT4pDTAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwDQYJKoZIhvcNAQELBQADggEBAD+ruR1cUL836JZmoEl5s8KhRFgm97J+1GzbOyRVtkK8iVc6NLYgzNr2bFjTxQfmaDasziPyd4xxycXYesxF2iPr6ZAu/FMg+Wk6rfpthe2nutYLKEnMznBQmISeN5iB9r1/NpR72h2+aRzOobKUjrdNIxU2G6GRI9GDmajhyo2MPSPHz00zj+WubDBR4mCKAguv1ABft4M/RB+nKudB4bKCukFPbDLQAlKMW+KmbeWvzujrHq6y9EcgSexlEB3sNxJtk2P+imy3R8OdsvrF9BSc+feuz75sNN0MdllxUZRcnoo2BfvA2Io0t/SkUdMDBZpnqVeG99uRQFBkkcD0/RM=";
const KEY_DER_B64: &str = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQChR+XwrBQzBIk4SZGNCiqb4DdDH62twhTsmRSJhpUMw7+ib/lHXCtECeB453E2JfYJBu2h03sYZkE45ciEHGtb3tkOOL4llLEg4SRf+7p0CiltGwZZ/rS55KQ8FU4BelJXKUKhxR6gPhMc2lbrdVNhd5ERCpaG9mIebROAClJ4DvnkmyiSTyRr35S+f/tXhBHOaFARqtwlBCBF51YiL5KeseWdPlJiTGojH/VMq53fJ+/Egidlmt6CKzprumwmbJdmbPByVxmnuTRxU/1gCb8/+Zj8nX1ZyQNCIrcOC+MUTkE5gIlPmrU7Tmkdg1AQ6InjRQb7wCHfBlVtgIJOwbNDAgMBAAECggEAFt2WLJKBBKCqgbfwclIUK2hU/WfC3GUtSUQExOC3RgmEpFeCvIrFOY666/lINDiakroQO8a7Z4Cr05DEZnKPCAXmuRzzXGmh2TsCSLdsPmJocCcKNf0mC0ruu/P7TLtWbDQV1dCYIHjchdUwyI4oRaPJheR95eGNlKm7P4Z3Hb0r4dncp/P47DjBGI3KiIzV1lk39sljT7G1+4LpIvj4uxTGnwvL0dnl66Dd8s6Xu3Sh4ciofPV3zA2mG1y86OKHUIh0cdXEFbOZjwCWuGlMatK4UgiXi3s/5YyokGDSWZlz/lIx6UgNAbwRBRWvwHVIEiuxv0NDqtEGEgeLVsHfVQKBgQDhxw6TqaSeWIL40PhB1VTXcQbEtt4uWfS/dIUMiyPDBYmwjTDl64c7gvF2lsdKsyvoBZaKTo5H6Z7n79s4qY1SMn5SeauRfxNtOOsvwhCjM0Lfd5aAm0CiabajJSyDzF9vN10dV3YrtC3mYlK+fKGHbzEUZqywL2r9fftMZHMKhQKBgQC23qsqDaMvPW3K9uot9jIxZDnm8AaMnkYJYBQ6d5esYqGfpxvg4ZHfaaz7/6zZBLZ3PM382BD9UruRz95d+Dz7sTe6IqdRkLTlqZ3jz22zatdz3GzDL5ywAg1Em18qiORuP6EfJBpj3r2T9frqYCJZy3c71KbrHpTNkVSZPvSFJwKBgH9ttvd2jvfkIv/GCeg2PK1gGJ7cS2hD01RfslxTB7shnAXumoOaPHhFPn1dOlwZ5hUNWlrzXXCPFeX4RXWklKOo5g0sqL30yAUkU/Ffib5ix0KTH3m0pnVETxWjfI1iwxZlXAOcgpDGIAXgxqzGcPCcvEm0bZD/IlayTSxfoWJ1AoGBALO46S2eU4eE7tevjX14XeIO550gvr8KejiSla7UhFsmYEPA/3QnHK8ec6UCpURwDkfnmWZqj1/4JGHsklV2N7EBfYxQrHJMLo0cAxG2ddEEKxUySC4YJ/0PNyz7Ki9qdF4p6NbrK4iFi3nUITKhIoFvpxJFL3saBhJ2R1eI+NW3AoGACQsZiQS4at0hHYb+Mo835AorlfbayWZB2mggSs71DXEFNq+/LJvJUbqDaOPPTW3yZG6znCuQgOVFoN23eEUps9KTDFAahsBbwu2PlIPUG7qIRciNbl0ArpoLq7x0hpNucsYqTd+nFBXABCnEKDkRKPtxbGpZKl/VFnnZ3A1TaQQ=";
const QUIC_CERT_DER_B64: &str = "MIIDFjCCAf6gAwIBAgIUf6vScDq/VI97ByepXNchgNpefNowDQYJKoZIhvcNAQELBQAwFjEUMBIGA1UEAwwLRGF0dW1UZXN0Q0EwHhcNMjYwNjE4MDcxNjE3WhcNMzYwNjE1MDcxNjE3WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC6tXmPtOi05d1Gc1MG+mcl/8kzBuBtV9KTDuE7lauvoM7gDeDzv+j86oMpX5qVZNLFVR7lxF4+4Q7y3TxQA6FqPKTXkUoZrIjAmpbFArGYzu61314okg0ggkZaGjenPlM98MHhVozVmycjPsR8icnDMmWC+HYl7aQGss4Pp4GaS8WUDUr0oRcxUTvkffhRPj+jFvpA6cH3W2G1h+Rf3KLmcffIxIYgiWlqiAZXwHQbqdpd/hlgSEKFs4MhwcnNspqfxxD9kV0kGgTpDbgE8Pjr2/7QNHk347Zks3bzy6PDE2NVsUjHUjeCINJqd2xPu+pwSdtXUSy2vOnAQeNbglk/AgMBAAGjXjBcMBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATAdBgNVHQ4EFgQUP5J9Y1t1AQlEZU7w4LB9KAv2avQwHwYDVR0jBBgwFoAUFF954UOKjNuQkkA/L7cF1ndSFTQwDQYJKoZIhvcNAQELBQADggEBALKxpKTyd/0O0cu8Hxaz8Ys8OqVjsboHEMmk149LgYnVPfNyTYj8fmDDgjoPykwrH/GlOt5VerKZxiQ9n1t0oh7gTkUAJHIK0dtW/GEJ3SgnhDOlmOx9tdAdJHfnMonqlL1EMIF9e3Z/fc67PVK9pqQgrWOp4zP2yRIKEnnogB11bxbHZMd4FtuXdvEuPWR2rWAQUnpbvwkpkukkxuUsKahLPTDxR29DycPKDKH6J+5UZa6EV0eYF0GXZQJiMnRJej8RvRKmPlMv+gg/XM8GfAEELi7cixQ2Fy0FqySzYe2FmhRGEZgG353l2m6P8GUSmlfVwr4aLEq6PfI5e8lS/jk=";
const QUIC_KEY_DER_B64: &str = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC6tXmPtOi05d1Gc1MG+mcl/8kzBuBtV9KTDuE7lauvoM7gDeDzv+j86oMpX5qVZNLFVR7lxF4+4Q7y3TxQA6FqPKTXkUoZrIjAmpbFArGYzu61314okg0ggkZaGjenPlM98MHhVozVmycjPsR8icnDMmWC+HYl7aQGss4Pp4GaS8WUDUr0oRcxUTvkffhRPj+jFvpA6cH3W2G1h+Rf3KLmcffIxIYgiWlqiAZXwHQbqdpd/hlgSEKFs4MhwcnNspqfxxD9kV0kGgTpDbgE8Pjr2/7QNHk347Zks3bzy6PDE2NVsUjHUjeCINJqd2xPu+pwSdtXUSy2vOnAQeNbglk/AgMBAAECggEADgj/Sti4F4UiBZ48sw0WvOoF92KAc2fDKg/XyZFeNxUZCAmzC8+01XJRisYniixqla/o4y/IP+TOKV6NHpyvQRuu7o966QQpwYS9abDipdCEbEKOwvD7sKJqZHKNFXLfcgwLnwNREeuYXRNmVNBMzPIYZGiGBSLgjipZyB545R7E+QbxttO41xAZTjCPSBIdMp2UtXtjzXlyC9y/QSZhB2faKvCoPWADiiMliLuTNDZ5WASfAfTzykYmo/jIxMMfs++QuV4TmcnSKVUOKz2Lai14mWaQbRqj9TNBwyPes4J/VPC0yKCBT5DVBeDRfcrVumDWvFcyy1MHUy+zAZ8A6QKBgQD0KEow8DRDMUGijJBzgf+jknE1Dy17V6k3JjtYb7aBH+ear9FQIbH25JDZeHqBctX/BKcC/ogxpKabL40FsQ0V+JTGezrbXlV1yq6Ahb90bzkwFIGO+yknp1xaulwI1CyQLe+x6EpGa/eQrwfrjt/B2nH/J2mSDcnqmyjcxoIEGQKBgQDDw9heSKGJatLpe5T/9QcZJ/b/unJ/lVD3fUCWwH271WiIB8j7HmV+uhoCFhA/G/8JLnjI36kpu2fY5W1KTmGuYIL7jbPPSJ3tMPa3pn41SEnOFx7uTsG5si7QcL6f4EN1amyDSh0Re3u4BFlfhEd2fntibSxsW+Vf1qgzv3szFwKBgF9S95xhu9Xb/kcrYB5oy9keDKrtkRFN6VgJyjO98eA6B5+iMzN45FWDaXYujo3JuvkE5PvLSnsvUIjYKNPnCzffxJ41cIZDL1R83GJ7VYNtBUPbTbX4I1pDAlC9377mr7te4+1uxsj10EMhwxLZxkyWL9mxlosabRH2oB6KNirJAoGAGVAHeBl4GaagYuRB3d627j/GpkZba7fI4ldltJhrKZBslbo7+IxznzE9D6v+qcpa7br+jyqsWuMKiHcR7Ju58aw1A4XNDC5djT/ZhuGWTBluwaEntLSmQ/aQKw+I9m8vCc2a/lbk70/CLYcSb1FRVDcCiTWImvI+9GdzFE3cBj0CgYEAv2QyLhmWNQnu0HT1M8de64zOdQQGbFCUyJdHhY1PY9QOW1A7Ew36g2KdwOf8EP6ygYwF4JUm0FtKo6o/85YL6QMpyRHNahLqDEF3l9Mjeu38sq9CqQ+8yYHgZSrBDnSEDlM5INIvBhz+j02nWO3IYP8DwBwSQ/tFWWyjmRkpX0s=";
const QUIC_CA_DER_B64: &str = "MIIDDTCCAfWgAwIBAgIUInMUFEEO3+4Uu17Jo+TlS+wVqB4wDQYJKoZIhvcNAQELBQAwFjEUMBIGA1UEAwwLRGF0dW1UZXN0Q0EwHhcNMjYwNjE4MDcxNjE3WhcNMzYwNjE1MDcxNjE3WjAWMRQwEgYDVQQDDAtEYXR1bVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAM+duiVpggGSTs4wjI1N0fzBW2mdTudSfTaqY/VcOdDrJXTIz/IV5T0B6LbbadkRU0tpA3uSpRIsTzg8tAc/Cg8k7AMh7JwqfqA7D2TcryLwf16TaqHS7xqpajMitudo1raW+UBKb8J5XzQeewgzPA7B1Wizfjj2V/LZRWDcKlaiQuunjZJztS8tKf4kyKxPvrg0fE2wMFD8UNd+9QcK1wgUidJz2g3WafYWlaYTWN+QqyIitHESVYg4QZZJRPWaviepkY27Hqe8CkqT5NnmdPstZpBN2fFZzRsa+31s2vWggPg7l+WO/B4fcooXNM5ra3rg1TLJKWYKb5m3aeVh0bcCAwEAAaNTMFEwHQYDVR0OBBYEFBRfeeFDiozbkJJAPy+3BdZ3UhU0MB8GA1UdIwQYMBaAFBRfeeFDiozbkJJAPy+3BdZ3UhU0MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAJkAQIquWpKKhdq1obRxDxjkOL1aYnDjbq3B24TJ6wZj98vXXzQvFNgwfSxI/8jSx3X5CStkukHAKRvq4J/u+NDZGOPeScTbz4AsEkOUdj4w4q5QK/VXucXsjgJYXFaUpqXHM3ZxvCAWi0HR50nYlErMwskLDULGJN5fDP6kwmB1xC36WxuALCf1dSjDQy0ikB4If07CDLmjcwfMKjjNPWcb9OyfUgmggRCaIO1APesOz5+FQ8fRyekOrxQUQCX/fNT3wYQIJ/hFc+I2BnrKsbI+crbufeJ70CVUnI6yuR6Wx9VVtaUQ6tVPJjO11lEuAEBYDIC/VYqwN9fG34c0sxs=";
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
struct Scenario {
name: &'static str,
iterations: u64,
run: fn() -> u64,
}
fn main() {
let scenarios = [
Scenario {
name: "tls_echo_roundtrip_64b",
iterations: 100,
run: tls_echo_roundtrip_64b,
},
Scenario {
name: "udp_loopback_send_receive_128x64b",
iterations: 100,
run: udp_loopback_send_receive_128x64b,
},
Scenario {
name: "streamref_remote_1024",
iterations: 50,
run: streamref_remote_1024,
},
Scenario {
name: "streamref_remote_reuse_1024",
iterations: 50,
run: streamref_remote_reuse_1024,
},
];
println!("scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op");
for scenario in scenarios {
for _ in 0..3 {
black_box((scenario.run)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..scenario.iterations {
checksum = checksum.wrapping_add(black_box((scenario.run)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / scenario.iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / scenario.iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / scenario.iterations as f64;
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}",
scenario.name, scenario.iterations
);
}
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct TlsEchoServer {
addr: SocketAddr,
client_config: Arc<ClientConfig>,
server_name: ServerName<'static>,
_completion: StreamCompletion<NotUsed>,
}
fn tls_echo_server() -> &'static TlsEchoServer {
static SERVER: OnceLock<TlsEchoServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config, server_name) = tls_configs();
let (binding, completion) = TokioTls::bind("127.0.0.1:0", server_config, TLS_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: TlsIncomingConnection| {
let (source, sink) = connection.into_parts();
source
.run_with(sink)
.expect("TLS echo stream materializes")
.wait()
.expect("TLS echo stream completes");
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("TLS echo server materializes");
let binding = binding.wait().expect("TLS echo server binds");
TlsEchoServer {
addr: binding.local_addr(),
client_config,
server_name,
_completion: completion,
}
})
}
fn tls_configs() -> (Arc<ServerConfig>, Arc<ClientConfig>, ServerName<'static>) {
let cert_der = CertificateDer::from(decode_base64(CERT_DER_B64));
let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(decode_base64(KEY_DER_B64)));
let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("TLS server config");
let mut roots = RootCertStore::empty();
roots.add(cert_der).expect("trust self-signed TLS cert");
let client_config = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
Arc::new(server_config),
Arc::new(client_config),
ServerName::try_from("localhost")
.expect("TLS server name")
.to_owned(),
)
}
fn tls_echo_roundtrip_64b() -> u64 {
let server = tls_echo_server();
Source::single(vec![b'p'; TLS_PAYLOAD_BYTES])
.via(TokioTls::outgoing_connection(
server.addr,
server.server_name.clone(),
Arc::clone(&server.client_config),
TLS_CHUNK_SIZE,
))
.run_with_materializer(Sink::head(), materializer())
.expect("TLS client stream materializes")
.wait()
.expect("TLS echo round trip completes")
.len() as u64
}
struct UdpReceiveServer {
addr: SocketAddr,
receiver: Mutex<std_mpsc::Receiver<usize>>,
_completion: StreamCompletion<NotUsed>,
}
fn udp_receive_server() -> &'static UdpReceiveServer {
static SERVER: OnceLock<UdpReceiveServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (sender, receiver) = std_mpsc::channel();
let (binding, completion) =
TokioUdp::bind("127.0.0.1:0", UDP_DATAGRAM_SIZE, UDP_RECEIVE_BUFFER)
.to_mat(
Sink::foreach(move |datagram: Datagram| {
let _ = sender.send(datagram.payload().len());
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("UDP receive server materializes");
let binding = binding.wait().expect("UDP receive server binds");
UdpReceiveServer {
addr: binding.local_addr(),
receiver: Mutex::new(receiver),
_completion: completion,
}
})
}
fn udp_loopback_send_receive_128x64b() -> u64 {
let server = udp_receive_server();
{
let receiver = server.receiver.lock().expect("UDP receiver lock");
while receiver.try_recv().is_ok() {}
}
Source::from_iterable(
(0..UDP_DATAGRAMS).map(|_| Datagram::new(vec![b'p'; UDP_PAYLOAD_BYTES], server.addr)),
)
.run_with_materializer(TokioUdp::send_sink("127.0.0.1:0"), materializer())
.expect("UDP sender materializes")
.wait()
.expect("UDP sender completes");
let receiver = server.receiver.lock().expect("UDP receiver lock");
let mut total = 0_u64;
for _ in 0..UDP_DATAGRAMS {
total += receiver
.recv_timeout(Duration::from_secs(30))
.expect("UDP receiver observes datagram") as u64;
}
total
}
fn quic_configs() -> (quinn::ServerConfig, quinn::ClientConfig) {
let cert_der = QuicCertificateDer::from(decode_base64(QUIC_CERT_DER_B64));
let key_der = QuicPrivateKeyDer::Pkcs8(QuicPrivatePkcs8KeyDer::from(decode_base64(
QUIC_KEY_DER_B64,
)));
let server_crypto = QuicRustlsServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der], key_der)
.expect("QUIC server config");
let ca_der = QuicCertificateDer::from(decode_base64(QUIC_CA_DER_B64));
let mut roots = QuicRootCertStore::empty();
roots.add(ca_der).expect("trust QUIC CA cert");
let client_crypto = QuicRustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
quinn::ServerConfig::with_crypto(Arc::new(
QuicServerConfig::try_from(server_crypto).expect("QUIC server crypto config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).expect("QUIC client crypto config"),
)),
)
}
struct StreamRefQuicServer {
addr: SocketAddr,
client_config: quinn::ClientConfig,
_completion: StreamCompletion<NotUsed>,
}
fn streamref_quic_server() -> &'static StreamRefQuicServer {
static SERVER: OnceLock<StreamRefQuicServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config) = quic_configs();
let (binding, completion) =
TokioQuic::bind("127.0.0.1:0", server_config, STREAMREF_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: QuicIncomingConnection| {
let stream = connection
.accept_bi_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("accept bi materializes")
.wait()
.expect("accept bi stream");
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with(StreamRefs::source_ref())
.expect("source ref materializes");
let handle = serve_source_ref_over_quic(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("serve source ref over quic");
let _ = handle.wait();
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("QUIC streamref server materializes");
let binding = binding.wait().expect("QUIC streamref server binds");
StreamRefQuicServer {
addr: binding.local_addr(),
client_config,
_completion: completion,
}
})
}
fn streamref_remote_1024() -> u64 {
let server = streamref_quic_server();
let connection = TokioQuic::connect(
server.addr,
"localhost",
server.client_config.clone(),
STREAMREF_CHUNK_SIZE,
)
.run_with(Sink::head())
.expect("QUIC client connection materializes")
.wait()
.expect("QUIC client connects");
let client_stream = connection
.open_bi_stream_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("client bi stream materializes")
.wait()
.expect("client opens bi stream");
let (remote_source, handle) =
source_ref_over_quic::<u64>(client_stream, STREAMREF_ID, StreamRefSettings::default());
let values = remote_source.run_collect().expect("remote source collect");
handle.wait().expect("stream ref handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(
values.iter().copied().sum::<u64>(),
expected_checksum,
"streamref quic checksum mismatch"
);
values.len() as u64
}
struct StreamRefQuicReuseServer {
addr: SocketAddr,
client_config: quinn::ClientConfig,
_completion: StreamCompletion<NotUsed>,
}
fn streamref_quic_reuse_server() -> &'static StreamRefQuicReuseServer {
static SERVER: OnceLock<StreamRefQuicReuseServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config) = quic_configs();
let (binding, completion) =
TokioQuic::bind("127.0.0.1:0", server_config, STREAMREF_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: QuicIncomingConnection| {
let conn = connection.connection();
loop {
let stream = conn
.accept_bi_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("reuse accept bi materializes")
.wait()
.expect("reuse accept bi stream");
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with(StreamRefs::source_ref())
.expect("reuse source ref materializes");
let handle = serve_source_ref_over_quic(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("reuse serve source ref over quic");
let _ = handle.wait();
}
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("QUIC reuse server materializes");
let binding = binding.wait().expect("QUIC reuse server binds");
StreamRefQuicReuseServer {
addr: binding.local_addr(),
client_config,
_completion: completion,
}
})
}
struct StreamRefQuicReuseConnection {
connection: QuicConnection,
}
fn streamref_quic_reuse_connection() -> &'static StreamRefQuicReuseConnection {
static CONN: OnceLock<StreamRefQuicReuseConnection> = OnceLock::new();
CONN.get_or_init(|| {
let server = streamref_quic_reuse_server();
let connection = TokioQuic::connect(
server.addr,
"localhost",
server.client_config.clone(),
STREAMREF_CHUNK_SIZE,
)
.run_with(Sink::head())
.expect("reuse QUIC client connection materializes")
.wait()
.expect("reuse QUIC client connects");
StreamRefQuicReuseConnection { connection }
})
}
fn streamref_remote_reuse_1024() -> u64 {
let conn_state = streamref_quic_reuse_connection();
let client_stream = conn_state
.connection
.open_bi_stream_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("reuse client bi stream materializes")
.wait()
.expect("reuse client opens bi stream");
let (remote_source, handle) =
source_ref_over_quic::<u64>(client_stream, STREAMREF_ID, StreamRefSettings::default());
let values = remote_source
.run_collect()
.expect("reuse remote source collect");
handle.wait().expect("reuse stream ref handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(
values.iter().copied().sum::<u64>(),
expected_checksum,
"reuse streamref quic checksum mismatch"
);
values.len() as u64
}
fn decode_base64(input: &str) -> Vec<u8> {
let mut output = Vec::with_capacity(input.len() * 3 / 4);
let mut buffer = 0_u32;
let mut bits = 0_u8;
for byte in input.bytes() {
let value = match byte {
b'A'..=b'Z' => byte - b'A',
b'a'..=b'z' => byte - b'a' + 26,
b'0'..=b'9' => byte - b'0' + 52,
b'+' => 62,
b'/' => 63,
b'=' => break,
_ => panic!("invalid base64 byte {byte}"),
};
buffer = (buffer << 6) | u32::from(value);
bits += 6;
if bits >= 8 {
bits -= 8;
output.push(((buffer >> bits) & 0xff) as u8);
}
}
output
}