use std::{
alloc::{GlobalAlloc, Layout, System},
env,
future::Future,
hint::black_box,
net::SocketAddr,
sync::{
Arc, Barrier, Mutex, OnceLock,
atomic::{AtomicU64, Ordering},
mpsc as std_mpsc,
},
thread,
time::{Duration, Instant},
};
use datum::{
Keep, Materializer, NotUsed, Sink, Source, StreamCompletion, StreamRefId, StreamRefSettings,
StreamRefs,
};
use datum_net::Datagram;
use datum_net::quic::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
quinn,
rustls::{
ClientConfig as QuicRustlsClientConfig, RootCertStore as QuicRootCertStore,
ServerConfig as QuicRustlsServerConfig,
pki_types::{
CertificateDer as QuicCertificateDer, PrivateKeyDer as QuicPrivateKeyDer,
PrivatePkcs8KeyDer as QuicPrivatePkcs8KeyDer,
},
},
};
use datum_net::tls::rustls::{
ClientConfig, RootCertStore, ServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName},
};
use datum_net::{
QuicConnection, QuicIncomingConnection, StreamRefProtocolDiagnostics,
StreamRefProtocolMessageCounts, TlsIncomingConnection, TokioQuic, TokioTls, TokioUdp,
serve_source_ref_over_quic, serve_source_ref_over_tcp, serve_source_ref_over_tcp_stream,
serve_source_ref_over_tcp_with_diagnostics, source_ref_over_quic, source_ref_over_tcp,
source_ref_over_tcp_stream, source_ref_over_tcp_with_diagnostics,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::runtime::Runtime as TokioRuntime;
use tokio::sync::mpsc as tokio_mpsc;
use tokio_rustls::{TlsAcceptor, TlsConnector};
const TLS_CHUNK_SIZE: usize = 8192;
const TLS_PAYLOAD_BYTES: usize = 64;
const TLS_CONCURRENT_ROUNDTRIPS: usize = 1024;
const TLS_CONCURRENT_RECEIVE_BUFFER: usize = 2048;
const TLS_SHARDED_MIN_CONNECTIONS: usize = 64;
const UDP_DATAGRAMS: usize = 128;
const UDP_PAYLOAD_BYTES: usize = 64;
const UDP_DATAGRAM_SIZE: usize = 2048;
const UDP_RECEIVE_BUFFER: usize = 256;
const UDP_CONCURRENT_ROUNDTRIPS: usize = 1024;
const UDP_CONCURRENT_RECEIVE_BUFFER: usize = 2048;
const QUIC_PAYLOAD_BYTES: usize = 64;
const QUIC_CONCURRENT_ROUNDTRIPS: usize = 1024;
const QUIC_CONCURRENT_RECEIVE_BUFFER: usize = 2048;
const STREAMREF_N: u64 = 1024;
const STREAMREF_CHUNK_SIZE: usize = 8192;
const STREAMREF_ID: StreamRefId = StreamRefId::from_u128(1);
const BENCH_WARMUP_WINDOWS: usize = 5;
const BENCH_MEASUREMENT_WINDOWS: usize = 5;
const BENCH_WINDOW: Duration = Duration::from_secs(1);
const CERT_DER_B64: &str = "MIIDSTCCAjGgAwIBAgIUN1YxPtPF67raIV0xnXhIFpgYFMcwDQYJKoZIhvcNAQELBQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI2MDYxNjE1NDE1OFoXDTM2MDYxMzE1NDE1OFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoUfl8KwUMwSJOEmRjQoqm+A3Qx+trcIU7JkUiYaVDMO/om/5R1wrRAngeOdxNiX2CQbtodN7GGZBOOXIhBxrW97ZDji+JZSxIOEkX/u6dAopbRsGWf60ueSkPBVOAXpSVylCocUeoD4THNpW63VTYXeREQqWhvZiHm0TgApSeA755Jsokk8ka9+Uvn/7V4QRzmhQEarcJQQgRedWIi+SnrHlnT5SYkxqIx/1TKud3yfvxIInZZregis6a7psJmyXZmzwclcZp7k0cVP9YAm/P/mY/J19WckDQiK3DgvjFE5BOYCJT5q1O05pHYNQEOiJ40UG+8Ah3wZVbYCCTsGzQwIDAQABo4GSMIGPMB0GA1UdDgQWBBTQCPPq+yHQwkFUFa4HEVorPT4pDTAfBgNVHSMEGDAWgBTQCPPq+yHQwkFUFa4HEVorPT4pDTAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwDQYJKoZIhvcNAQELBQADggEBAD+ruR1cUL836JZmoEl5s8KhRFgm97J+1GzbOyRVtkK8iVc6NLYgzNr2bFjTxQfmaDasziPyd4xxycXYesxF2iPr6ZAu/FMg+Wk6rfpthe2nutYLKEnMznBQmISeN5iB9r1/NpR72h2+aRzOobKUjrdNIxU2G6GRI9GDmajhyo2MPSPHz00zj+WubDBR4mCKAguv1ABft4M/RB+nKudB4bKCukFPbDLQAlKMW+KmbeWvzujrHq6y9EcgSexlEB3sNxJtk2P+imy3R8OdsvrF9BSc+feuz75sNN0MdllxUZRcnoo2BfvA2Io0t/SkUdMDBZpnqVeG99uRQFBkkcD0/RM=";
const KEY_DER_B64: &str = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQChR+XwrBQzBIk4SZGNCiqb4DdDH62twhTsmRSJhpUMw7+ib/lHXCtECeB453E2JfYJBu2h03sYZkE45ciEHGtb3tkOOL4llLEg4SRf+7p0CiltGwZZ/rS55KQ8FU4BelJXKUKhxR6gPhMc2lbrdVNhd5ERCpaG9mIebROAClJ4DvnkmyiSTyRr35S+f/tXhBHOaFARqtwlBCBF51YiL5KeseWdPlJiTGojH/VMq53fJ+/Egidlmt6CKzprumwmbJdmbPByVxmnuTRxU/1gCb8/+Zj8nX1ZyQNCIrcOC+MUTkE5gIlPmrU7Tmkdg1AQ6InjRQb7wCHfBlVtgIJOwbNDAgMBAAECggEAFt2WLJKBBKCqgbfwclIUK2hU/WfC3GUtSUQExOC3RgmEpFeCvIrFOY666/lINDiakroQO8a7Z4Cr05DEZnKPCAXmuRzzXGmh2TsCSLdsPmJocCcKNf0mC0ruu/P7TLtWbDQV1dCYIHjchdUwyI4oRaPJheR95eGNlKm7P4Z3Hb0r4dncp/P47DjBGI3KiIzV1lk39sljT7G1+4LpIvj4uxTGnwvL0dnl66Dd8s6Xu3Sh4ciofPV3zA2mG1y86OKHUIh0cdXEFbOZjwCWuGlMatK4UgiXi3s/5YyokGDSWZlz/lIx6UgNAbwRBRWvwHVIEiuxv0NDqtEGEgeLVsHfVQKBgQDhxw6TqaSeWIL40PhB1VTXcQbEtt4uWfS/dIUMiyPDBYmwjTDl64c7gvF2lsdKsyvoBZaKTo5H6Z7n79s4qY1SMn5SeauRfxNtOOsvwhCjM0Lfd5aAm0CiabajJSyDzF9vN10dV3YrtC3mYlK+fKGHbzEUZqywL2r9fftMZHMKhQKBgQC23qsqDaMvPW3K9uot9jIxZDnm8AaMnkYJYBQ6d5esYqGfpxvg4ZHfaaz7/6zZBLZ3PM382BD9UruRz95d+Dz7sTe6IqdRkLTlqZ3jz22zatdz3GzDL5ywAg1Em18qiORuP6EfJBpj3r2T9frqYCJZy3c71KbrHpTNkVSZPvSFJwKBgH9ttvd2jvfkIv/GCeg2PK1gGJ7cS2hD01RfslxTB7shnAXumoOaPHhFPn1dOlwZ5hUNWlrzXXCPFeX4RXWklKOo5g0sqL30yAUkU/Ffib5ix0KTH3m0pnVETxWjfI1iwxZlXAOcgpDGIAXgxqzGcPCcvEm0bZD/IlayTSxfoWJ1AoGBALO46S2eU4eE7tevjX14XeIO550gvr8KejiSla7UhFsmYEPA/3QnHK8ec6UCpURwDkfnmWZqj1/4JGHsklV2N7EBfYxQrHJMLo0cAxG2ddEEKxUySC4YJ/0PNyz7Ki9qdF4p6NbrK4iFi3nUITKhIoFvpxJFL3saBhJ2R1eI+NW3AoGACQsZiQS4at0hHYb+Mo835AorlfbayWZB2mggSs71DXEFNq+/LJvJUbqDaOPPTW3yZG6znCuQgOVFoN23eEUps9KTDFAahsBbwu2PlIPUG7qIRciNbl0ArpoLq7x0hpNucsYqTd+nFBXABCnEKDkRKPtxbGpZKl/VFnnZ3A1TaQQ=";
const QUIC_CERT_DER_B64: &str = "MIIDFjCCAf6gAwIBAgIUf6vScDq/VI97ByepXNchgNpefNowDQYJKoZIhvcNAQELBQAwFjEUMBIGA1UEAwwLRGF0dW1UZXN0Q0EwHhcNMjYwNjE4MDcxNjE3WhcNMzYwNjE1MDcxNjE3WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC6tXmPtOi05d1Gc1MG+mcl/8kzBuBtV9KTDuE7lauvoM7gDeDzv+j86oMpX5qVZNLFVR7lxF4+4Q7y3TxQA6FqPKTXkUoZrIjAmpbFArGYzu61314okg0ggkZaGjenPlM98MHhVozVmycjPsR8icnDMmWC+HYl7aQGss4Pp4GaS8WUDUr0oRcxUTvkffhRPj+jFvpA6cH3W2G1h+Rf3KLmcffIxIYgiWlqiAZXwHQbqdpd/hlgSEKFs4MhwcnNspqfxxD9kV0kGgTpDbgE8Pjr2/7QNHk347Zks3bzy6PDE2NVsUjHUjeCINJqd2xPu+pwSdtXUSy2vOnAQeNbglk/AgMBAAGjXjBcMBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATAdBgNVHQ4EFgQUP5J9Y1t1AQlEZU7w4LB9KAv2avQwHwYDVR0jBBgwFoAUFF954UOKjNuQkkA/L7cF1ndSFTQwDQYJKoZIhvcNAQELBQADggEBALKxpKTyd/0O0cu8Hxaz8Ys8OqVjsboHEMmk149LgYnVPfNyTYj8fmDDgjoPykwrH/GlOt5VerKZxiQ9n1t0oh7gTkUAJHIK0dtW/GEJ3SgnhDOlmOx9tdAdJHfnMonqlL1EMIF9e3Z/fc67PVK9pqQgrWOp4zP2yRIKEnnogB11bxbHZMd4FtuXdvEuPWR2rWAQUnpbvwkpkukkxuUsKahLPTDxR29DycPKDKH6J+5UZa6EV0eYF0GXZQJiMnRJej8RvRKmPlMv+gg/XM8GfAEELi7cixQ2Fy0FqySzYe2FmhRGEZgG353l2m6P8GUSmlfVwr4aLEq6PfI5e8lS/jk=";
const QUIC_KEY_DER_B64: &str = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC6tXmPtOi05d1Gc1MG+mcl/8kzBuBtV9KTDuE7lauvoM7gDeDzv+j86oMpX5qVZNLFVR7lxF4+4Q7y3TxQA6FqPKTXkUoZrIjAmpbFArGYzu61314okg0ggkZaGjenPlM98MHhVozVmycjPsR8icnDMmWC+HYl7aQGss4Pp4GaS8WUDUr0oRcxUTvkffhRPj+jFvpA6cH3W2G1h+Rf3KLmcffIxIYgiWlqiAZXwHQbqdpd/hlgSEKFs4MhwcnNspqfxxD9kV0kGgTpDbgE8Pjr2/7QNHk347Zks3bzy6PDE2NVsUjHUjeCINJqd2xPu+pwSdtXUSy2vOnAQeNbglk/AgMBAAECggEADgj/Sti4F4UiBZ48sw0WvOoF92KAc2fDKg/XyZFeNxUZCAmzC8+01XJRisYniixqla/o4y/IP+TOKV6NHpyvQRuu7o966QQpwYS9abDipdCEbEKOwvD7sKJqZHKNFXLfcgwLnwNREeuYXRNmVNBMzPIYZGiGBSLgjipZyB545R7E+QbxttO41xAZTjCPSBIdMp2UtXtjzXlyC9y/QSZhB2faKvCoPWADiiMliLuTNDZ5WASfAfTzykYmo/jIxMMfs++QuV4TmcnSKVUOKz2Lai14mWaQbRqj9TNBwyPes4J/VPC0yKCBT5DVBeDRfcrVumDWvFcyy1MHUy+zAZ8A6QKBgQD0KEow8DRDMUGijJBzgf+jknE1Dy17V6k3JjtYb7aBH+ear9FQIbH25JDZeHqBctX/BKcC/ogxpKabL40FsQ0V+JTGezrbXlV1yq6Ahb90bzkwFIGO+yknp1xaulwI1CyQLe+x6EpGa/eQrwfrjt/B2nH/J2mSDcnqmyjcxoIEGQKBgQDDw9heSKGJatLpe5T/9QcZJ/b/unJ/lVD3fUCWwH271WiIB8j7HmV+uhoCFhA/G/8JLnjI36kpu2fY5W1KTmGuYIL7jbPPSJ3tMPa3pn41SEnOFx7uTsG5si7QcL6f4EN1amyDSh0Re3u4BFlfhEd2fntibSxsW+Vf1qgzv3szFwKBgF9S95xhu9Xb/kcrYB5oy9keDKrtkRFN6VgJyjO98eA6B5+iMzN45FWDaXYujo3JuvkE5PvLSnsvUIjYKNPnCzffxJ41cIZDL1R83GJ7VYNtBUPbTbX4I1pDAlC9377mr7te4+1uxsj10EMhwxLZxkyWL9mxlosabRH2oB6KNirJAoGAGVAHeBl4GaagYuRB3d627j/GpkZba7fI4ldltJhrKZBslbo7+IxznzE9D6v+qcpa7br+jyqsWuMKiHcR7Ju58aw1A4XNDC5djT/ZhuGWTBluwaEntLSmQ/aQKw+I9m8vCc2a/lbk70/CLYcSb1FRVDcCiTWImvI+9GdzFE3cBj0CgYEAv2QyLhmWNQnu0HT1M8de64zOdQQGbFCUyJdHhY1PY9QOW1A7Ew36g2KdwOf8EP6ygYwF4JUm0FtKo6o/85YL6QMpyRHNahLqDEF3l9Mjeu38sq9CqQ+8yYHgZSrBDnSEDlM5INIvBhz+j02nWO3IYP8DwBwSQ/tFWWyjmRkpX0s=";
const QUIC_CA_DER_B64: &str = "MIIDDTCCAfWgAwIBAgIUInMUFEEO3+4Uu17Jo+TlS+wVqB4wDQYJKoZIhvcNAQELBQAwFjEUMBIGA1UEAwwLRGF0dW1UZXN0Q0EwHhcNMjYwNjE4MDcxNjE3WhcNMzYwNjE1MDcxNjE3WjAWMRQwEgYDVQQDDAtEYXR1bVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAM+duiVpggGSTs4wjI1N0fzBW2mdTudSfTaqY/VcOdDrJXTIz/IV5T0B6LbbadkRU0tpA3uSpRIsTzg8tAc/Cg8k7AMh7JwqfqA7D2TcryLwf16TaqHS7xqpajMitudo1raW+UBKb8J5XzQeewgzPA7B1Wizfjj2V/LZRWDcKlaiQuunjZJztS8tKf4kyKxPvrg0fE2wMFD8UNd+9QcK1wgUidJz2g3WafYWlaYTWN+QqyIitHESVYg4QZZJRPWaviepkY27Hqe8CkqT5NnmdPstZpBN2fFZzRsa+31s2vWggPg7l+WO/B4fcooXNM5ra3rg1TLJKWYKb5m3aeVh0bcCAwEAAaNTMFEwHQYDVR0OBBYEFBRfeeFDiozbkJJAPy+3BdZ3UhU0MB8GA1UdIwQYMBaAFBRfeeFDiozbkJJAPy+3BdZ3UhU0MA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAJkAQIquWpKKhdq1obRxDxjkOL1aYnDjbq3B24TJ6wZj98vXXzQvFNgwfSxI/8jSx3X5CStkukHAKRvq4J/u+NDZGOPeScTbz4AsEkOUdj4w4q5QK/VXucXsjgJYXFaUpqXHM3ZxvCAWi0HR50nYlErMwskLDULGJN5fDP6kwmB1xC36WxuALCf1dSjDQy0ikB4If07CDLmjcwfMKjjNPWcb9OyfUgmggRCaIO1APesOz5+FQ8fRyekOrxQUQCX/fNT3wYQIJ/hFc+I2BnrKsbI+crbufeJ70CVUnI6yuR6Wx9VVtaUQ6tVPJjO11lEuAEBYDIC/VYqwN9fG34c0sxs=";
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
static OBSERVED_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
struct Scenario {
name: &'static str,
run: fn() -> u64,
mode: MeasureMode,
}
#[derive(Clone, Copy)]
struct MeasureMode {
warmup_windows: usize,
measurement_windows: usize,
window: Duration,
}
fn main() {
let filter = scenario_filter();
let scenarios = [
Scenario {
name: "tls_echo_roundtrip_64b",
run: tls_echo_roundtrip_64b,
mode: timed_mode(),
},
Scenario {
name: "tls_bridge_concurrent_1x1024",
run: tls_bridge_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_async_concurrent_1x1024",
run: tls_async_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_bridge_concurrent_16x1024",
run: tls_bridge_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_async_concurrent_16x1024",
run: tls_async_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_bridge_concurrent_64x1024",
run: tls_bridge_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_async_concurrent_64x1024",
run: tls_async_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_bridge_concurrent_256x1024",
run: tls_bridge_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_async_concurrent_256x1024",
run: tls_async_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_bridge_concurrent_1024x1024",
run: tls_bridge_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_async_concurrent_1024x1024",
run: tls_async_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_sharded_concurrent_1x1024",
run: tls_sharded_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_sharded_concurrent_16x1024",
run: tls_sharded_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_sharded_concurrent_64x1024",
run: tls_sharded_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_sharded_concurrent_256x1024",
run: tls_sharded_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "tls_sharded_concurrent_1024x1024",
run: tls_sharded_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_loopback_send_receive_128x64b",
run: udp_loopback_send_receive_128x64b,
mode: timed_mode(),
},
Scenario {
name: "udp_bridge_concurrent_1x1024",
run: udp_bridge_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_async_concurrent_1x1024",
run: udp_async_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_bridge_concurrent_16x1024",
run: udp_bridge_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_async_concurrent_16x1024",
run: udp_async_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_bridge_concurrent_64x1024",
run: udp_bridge_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_async_concurrent_64x1024",
run: udp_async_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_bridge_concurrent_256x1024",
run: udp_bridge_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_async_concurrent_256x1024",
run: udp_async_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_bridge_concurrent_1024x1024",
run: udp_bridge_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "udp_async_concurrent_1024x1024",
run: udp_async_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_bridge_concurrent_1x1024",
run: quic_bridge_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_async_concurrent_1x1024",
run: quic_async_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_bridge_concurrent_16x1024",
run: quic_bridge_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_async_concurrent_16x1024",
run: quic_async_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_bridge_concurrent_64x1024",
run: quic_bridge_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_async_concurrent_64x1024",
run: quic_async_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_bridge_concurrent_256x1024",
run: quic_bridge_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_async_concurrent_256x1024",
run: quic_async_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_bridge_concurrent_1024x1024",
run: quic_bridge_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "quic_async_concurrent_1024x1024",
run: quic_async_concurrent_1024x1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_1024",
run: streamref_remote_1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_reuse_1024",
run: streamref_remote_reuse_1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_1024",
run: streamref_remote_tcp_1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_reuse_1024",
run: streamref_remote_tcp_reuse_1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_concurrent_1x1024",
run: streamref_remote_tcp_concurrent_1x1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_concurrent_16x1024",
run: streamref_remote_tcp_concurrent_16x1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_concurrent_64x1024",
run: streamref_remote_tcp_concurrent_64x1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_concurrent_256x1024",
run: streamref_remote_tcp_concurrent_256x1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_1",
run: streamref_remote_tcp_n_1,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_4",
run: streamref_remote_tcp_n_4,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_16",
run: streamref_remote_tcp_n_16,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_64",
run: streamref_remote_tcp_n_64,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_256",
run: streamref_remote_tcp_n_256,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_1024",
run: streamref_remote_tcp_n_1024,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_4096",
run: streamref_remote_tcp_n_4096,
mode: timed_mode(),
},
Scenario {
name: "streamref_remote_tcp_n_16384",
run: streamref_remote_tcp_n_16384,
mode: timed_mode(),
},
];
println!(
"scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op\tpeak_rss_kb\tthread_count\twarmup_iterations\twarmup_cpu_ns_per_op\twall_p50_us\twall_p99_us"
);
for scenario in scenarios {
if !scenario_enabled(scenario.name, filter.as_deref()) {
continue;
}
let warmup = warmup_scenario(scenario.run, scenario.mode);
let measurement = measure_scenario(scenario.run, scenario.mode);
let snapshot = process_snapshot();
let thread_count = snapshot
.thread_count
.max(OBSERVED_THREAD_COUNT.load(Ordering::Relaxed));
let ns_per_op = measurement.elapsed.as_nanos() as f64 / measurement.iterations as f64;
let allocated_bytes_per_op =
measurement.allocated_bytes as f64 / measurement.iterations as f64;
let cpu_ns_per_op = measurement.cpu_ns as f64 / measurement.iterations as f64;
let warmup_cpu_ns_per_op = if warmup.iterations == 0 {
0.0
} else {
warmup.cpu_ns as f64 / warmup.iterations as f64
};
println!(
"{}\t{}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}\t{}\t{}\t{}\t{warmup_cpu_ns_per_op:.2}\t{:.3}\t{:.3}",
scenario.name,
measurement.iterations,
snapshot.peak_rss_kb,
thread_count,
warmup.iterations,
duration_us(measurement.wall_p50),
duration_us(measurement.wall_p99)
);
}
let (_, counts) = streamref_remote_tcp_1024_with_diagnostics();
eprintln!(
"streamref_remote_tcp_1024_protocol_counts\tCumulativeDemand={}\tSequencedOnNext={}\tAck={}",
counts.cumulative_demand, counts.sequenced_on_next, counts.ack
);
}
fn scenario_filter() -> Option<Vec<String>> {
env::var("DATUM_NET_COMPARE_FILTER")
.ok()
.map(|value| {
value
.split(',')
.map(str::trim)
.filter(|name| !name.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.filter(|names| !names.is_empty())
}
fn scenario_enabled(name: &str, filter: Option<&[String]>) -> bool {
filter.is_none_or(|names| names.iter().any(|candidate| candidate == name))
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
struct ProcessSnapshot {
peak_rss_kb: u64,
thread_count: u64,
}
struct WarmupResult {
iterations: u64,
cpu_ns: u128,
}
struct MeasurementResult {
iterations: u64,
elapsed: Duration,
allocated_bytes: u64,
cpu_ns: u128,
wall_p50: Duration,
wall_p99: Duration,
}
const fn timed_mode() -> MeasureMode {
MeasureMode {
warmup_windows: BENCH_WARMUP_WINDOWS,
measurement_windows: BENCH_MEASUREMENT_WINDOWS,
window: BENCH_WINDOW,
}
}
fn warmup_scenario(run: fn() -> u64, mode: MeasureMode) -> WarmupResult {
let cpu_start = process_cpu_ns();
let mut iterations = 0_u64;
let mut checksum = 0_u64;
for _ in 0..mode.warmup_windows {
let (window_iterations, window_checksum) = run_for_window(run, mode.window);
iterations += window_iterations;
checksum = checksum.wrapping_add(window_checksum);
}
black_box(checksum);
WarmupResult {
iterations,
cpu_ns: process_cpu_ns().saturating_sub(cpu_start),
}
}
fn measure_scenario(run: fn() -> u64, mode: MeasureMode) -> MeasurementResult {
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
OBSERVED_THREAD_COUNT.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut iterations = 0_u64;
let mut checksum = 0_u64;
let mut wall_samples = Vec::new();
for _ in 0..mode.measurement_windows {
let (window_iterations, window_checksum) =
run_for_window_sampled(run, mode.window, &mut wall_samples);
iterations += window_iterations;
checksum = checksum.wrapping_add(window_checksum);
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_ns = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
MeasurementResult {
iterations,
elapsed,
allocated_bytes,
cpu_ns,
wall_p50: percentile_duration(&mut wall_samples, 50),
wall_p99: percentile_duration(&mut wall_samples, 99),
}
}
fn record_observed_thread_count() {
let thread_count = process_snapshot().thread_count;
OBSERVED_THREAD_COUNT.fetch_max(thread_count, Ordering::Relaxed);
}
fn process_snapshot() -> ProcessSnapshot {
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
return ProcessSnapshot {
peak_rss_kb: 0,
thread_count: 0,
};
};
let mut peak_rss_kb = 0_u64;
let mut thread_count = 0_u64;
for line in status.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
peak_rss_kb = first_u64(rest);
} else if let Some(rest) = line.strip_prefix("Threads:") {
thread_count = first_u64(rest);
}
}
ProcessSnapshot {
peak_rss_kb,
thread_count,
}
}
fn first_u64(input: &str) -> u64 {
input
.split_whitespace()
.next()
.and_then(|value| value.parse().ok())
.unwrap_or(0)
}
fn run_for_window(run: fn() -> u64, duration: Duration) -> (u64, u64) {
let mut ignored_samples = Vec::new();
run_for_window_sampled(run, duration, &mut ignored_samples)
}
fn run_for_window_sampled(
run: fn() -> u64,
duration: Duration,
wall_samples: &mut Vec<Duration>,
) -> (u64, u64) {
let started = Instant::now();
let mut iterations = 0_u64;
let mut checksum = 0_u64;
while iterations == 0 || started.elapsed() < duration {
let op_started = Instant::now();
checksum = checksum.wrapping_add(black_box(run()));
wall_samples.push(op_started.elapsed());
iterations += 1;
}
(iterations, checksum)
}
fn percentile_duration(samples: &mut [Duration], percentile: usize) -> Duration {
if samples.is_empty() {
return Duration::ZERO;
}
samples.sort_unstable();
let rank = (samples.len() - 1) * percentile / 100;
samples[rank]
}
fn duration_us(duration: Duration) -> f64 {
duration.as_nanos() as f64 / 1_000.0
}
struct TlsEchoServer {
addr: SocketAddr,
client_config: Arc<ClientConfig>,
server_name: ServerName<'static>,
_completion: StreamCompletion<NotUsed>,
}
fn tls_echo_server() -> &'static TlsEchoServer {
static SERVER: OnceLock<TlsEchoServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config, server_name) = tls_configs();
let (binding, completion) = TokioTls::bind("127.0.0.1:0", server_config, TLS_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: TlsIncomingConnection| {
let (source, sink) = connection.into_parts();
source
.run_with(sink)
.expect("TLS echo stream materializes")
.wait()
.expect("TLS echo stream completes");
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("TLS echo server materializes");
let binding = binding.wait().expect("TLS echo server binds");
TlsEchoServer {
addr: binding.local_addr(),
client_config,
server_name,
_completion: completion,
}
})
}
fn tls_configs() -> (Arc<ServerConfig>, Arc<ClientConfig>, ServerName<'static>) {
let cert_der = CertificateDer::from(decode_base64(CERT_DER_B64));
let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(decode_base64(KEY_DER_B64)));
let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("TLS server config");
let mut roots = RootCertStore::empty();
roots.add(cert_der).expect("trust self-signed TLS cert");
let client_config = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
Arc::new(server_config),
Arc::new(client_config),
ServerName::try_from("localhost")
.expect("TLS server name")
.to_owned(),
)
}
fn tls_echo_roundtrip_64b() -> u64 {
let server = tls_echo_server();
Source::single(vec![b'p'; TLS_PAYLOAD_BYTES])
.via(TokioTls::outgoing_connection(
server.addr,
server.server_name.clone(),
Arc::clone(&server.client_config),
TLS_CHUNK_SIZE,
))
.run_with_materializer(Sink::head(), materializer())
.expect("TLS client stream materializes")
.wait()
.expect("TLS echo round trip completes")
.len() as u64
}
struct RawTlsEchoServer {
addr: SocketAddr,
task: tokio::task::JoinHandle<()>,
}
impl Drop for RawTlsEchoServer {
fn drop(&mut self) {
self.task.abort();
}
}
async fn start_tls_echo_server(server_config: Arc<ServerConfig>) -> RawTlsEchoServer {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("TLS concurrent echo server binds");
let addr = listener.local_addr().expect("TLS concurrent echo addr");
let acceptor = TlsAcceptor::from(server_config);
let task = tokio::spawn(async move {
let Ok((tcp, _remote)) = listener.accept().await else {
return;
};
let Ok(mut stream) = acceptor.accept(tcp).await else {
return;
};
let mut buffer = vec![0_u8; TLS_CHUNK_SIZE];
loop {
let read = match stream.read(&mut buffer).await {
Ok(0) => return,
Ok(read) => read,
Err(_) => return,
};
if stream.write_all(&buffer[..read]).await.is_err() {
return;
}
if stream.flush().await.is_err() {
return;
}
}
});
RawTlsEchoServer { addr, task }
}
async fn connect_tls_client(
addr: SocketAddr,
server_name: ServerName<'static>,
client_config: Arc<ClientConfig>,
) -> tokio_rustls::client::TlsStream<TcpStream> {
let tcp = TcpStream::connect(addr)
.await
.expect("TLS concurrent client connects TCP");
TlsConnector::from(client_config)
.connect(server_name, tcp)
.await
.expect("TLS concurrent client handshakes")
}
async fn read_exact_payload<R>(reader: &mut R, buffer: &mut [u8]) -> bool
where
R: AsyncRead + Unpin,
{
let mut filled = 0_usize;
while filled < buffer.len() {
let read = match reader.read(&mut buffer[filled..]).await {
Ok(0) => return false,
Ok(read) => read,
Err(_) => return false,
};
filled += read;
}
true
}
struct ShardedBenchRuntime {
shards: Vec<BenchShardRuntime>,
}
struct BenchShardRuntime {
handle: tokio::runtime::Handle,
_thread: thread::JoinHandle<()>,
}
impl ShardedBenchRuntime {
fn spawn<Fut>(&'static self, shard: usize, future: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
self.shards[shard % self.shards.len()].handle.spawn(future);
}
}
fn tls_sharded_bench_runtime() -> &'static ShardedBenchRuntime {
static RUNTIME: OnceLock<ShardedBenchRuntime> = OnceLock::new();
RUNTIME.get_or_init(|| {
let logical = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
.max(1);
let physical = num_cpus::get_physical().max(1);
let shards = physical.min(logical);
let shards = (0..shards)
.map(start_bench_shard_runtime)
.collect::<Vec<_>>();
ShardedBenchRuntime { shards }
})
}
fn start_bench_shard_runtime(index: usize) -> BenchShardRuntime {
let (sender, receiver) = std_mpsc::sync_channel(1);
let thread = thread::Builder::new()
.name(format!("datum-net-compare-tls-shard-{index}"))
.spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("TLS sharded benchmark runtime");
let handle = runtime.handle().clone();
sender.send(handle).expect("TLS shard handle sent");
runtime.block_on(std::future::pending::<()>());
})
.expect("TLS sharded benchmark thread");
let handle = receiver.recv().expect("TLS shard handle received");
BenchShardRuntime {
handle,
_thread: thread,
}
}
fn tls_shard_count(concurrency: usize) -> usize {
let logical = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let physical = num_cpus::get_physical().max(1);
let cores = physical.min(logical);
if cores < 2 || concurrency < TLS_SHARDED_MIN_CONNECTIONS {
0
} else {
cores.min(concurrency)
}
}
fn tls_bridge_concurrent_1x1024() -> u64 {
tls_bridge_concurrent(1)
}
fn tls_async_concurrent_1x1024() -> u64 {
tls_async_concurrent(1)
}
fn tls_sharded_concurrent_1x1024() -> u64 {
tls_sharded_concurrent(1)
}
fn tls_bridge_concurrent_16x1024() -> u64 {
tls_bridge_concurrent(16)
}
fn tls_async_concurrent_16x1024() -> u64 {
tls_async_concurrent(16)
}
fn tls_sharded_concurrent_16x1024() -> u64 {
tls_sharded_concurrent(16)
}
fn tls_bridge_concurrent_64x1024() -> u64 {
tls_bridge_concurrent(64)
}
fn tls_async_concurrent_64x1024() -> u64 {
tls_async_concurrent(64)
}
fn tls_sharded_concurrent_64x1024() -> u64 {
tls_sharded_concurrent(64)
}
fn tls_bridge_concurrent_256x1024() -> u64 {
tls_bridge_concurrent(256)
}
fn tls_async_concurrent_256x1024() -> u64 {
tls_async_concurrent(256)
}
fn tls_sharded_concurrent_256x1024() -> u64 {
tls_sharded_concurrent(256)
}
fn tls_bridge_concurrent_1024x1024() -> u64 {
tls_bridge_concurrent(1024)
}
fn tls_async_concurrent_1024x1024() -> u64 {
tls_async_concurrent(1024)
}
fn tls_sharded_concurrent_1024x1024() -> u64 {
tls_sharded_concurrent(1024)
}
fn tls_bridge_concurrent(concurrency: usize) -> u64 {
let start = Arc::new(Barrier::new(concurrency + 1));
let connected = Arc::new(Barrier::new(concurrency + 1));
let release = Arc::new(Barrier::new(concurrency + 1));
let handle = udp_bench_runtime().handle().clone();
let (server_config, client_config, server_name) = tls_configs();
let payload = Arc::new(vec![b'p'; TLS_PAYLOAD_BYTES]);
thread::scope(|scope| {
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let handle = handle.clone();
let server_config = Arc::clone(&server_config);
let client_config = Arc::clone(&client_config);
let server_name = server_name.clone();
let payload = Arc::clone(&payload);
workers.push(scope.spawn(move || {
let (stream, _server) = handle.block_on(async move {
let server = start_tls_echo_server(server_config).await;
let stream = connect_tls_client(server.addr, server_name, client_config).await;
(stream, server)
});
let (mut reader, mut writer) = tokio::io::split(stream);
let (sender, mut receiver) =
tokio_mpsc::channel::<Vec<u8>>(TLS_CONCURRENT_RECEIVE_BUFFER);
let read_task = handle.spawn(async move {
let mut buffer = vec![0_u8; TLS_PAYLOAD_BYTES];
while read_exact_payload(&mut reader, &mut buffer).await {
if sender.send(buffer.clone()).await.is_err() {
return;
}
}
});
start.wait();
connected.wait();
release.wait();
let mut total = 0_u64;
for _ in 0..TLS_CONCURRENT_ROUNDTRIPS {
handle
.block_on(async {
writer.write_all(&payload).await?;
writer.flush().await
})
.expect("TLS bridge send succeeds");
let response = receiver
.blocking_recv()
.expect("TLS bridge receives response");
total = total.wrapping_add(response.len() as u64);
}
let _ = handle.block_on(async { writer.shutdown().await });
read_task.abort();
total
}));
}
start.wait();
connected.wait();
record_observed_thread_count();
release.wait();
workers
.into_iter()
.map(|worker| worker.join().expect("TLS bridge worker joins"))
.fold(0_u64, u64::wrapping_add)
})
}
fn tls_async_concurrent(concurrency: usize) -> u64 {
let (server_config, client_config, server_name) = tls_configs();
udp_bench_runtime().block_on(async move {
let start = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let connected = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let release = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let payload = Arc::new(vec![b'p'; TLS_PAYLOAD_BYTES]);
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let server_config = Arc::clone(&server_config);
let client_config = Arc::clone(&client_config);
let server_name = server_name.clone();
let payload = Arc::clone(&payload);
workers.push(tokio::spawn(async move {
let server = start_tls_echo_server(server_config).await;
let mut stream = connect_tls_client(server.addr, server_name, client_config).await;
let mut buffer = vec![0_u8; TLS_PAYLOAD_BYTES];
start.wait().await;
connected.wait().await;
release.wait().await;
let mut total = 0_u64;
for _ in 0..TLS_CONCURRENT_ROUNDTRIPS {
stream
.write_all(&payload)
.await
.expect("TLS async send succeeds");
stream.flush().await.expect("TLS async flush succeeds");
assert!(
read_exact_payload(&mut stream, &mut buffer).await,
"TLS async receives response"
);
total = total.wrapping_add(buffer.len() as u64);
}
let _ = stream.shutdown().await;
total
}));
}
start.wait().await;
connected.wait().await;
record_observed_thread_count();
release.wait().await;
let mut total = 0_u64;
for worker in workers {
total = total.wrapping_add(worker.await.expect("TLS async worker joins"));
}
total
})
}
fn tls_sharded_concurrent(concurrency: usize) -> u64 {
let shards = tls_shard_count(concurrency);
if shards < 2 {
return tls_async_concurrent(concurrency);
}
let runtime = tls_sharded_bench_runtime();
let (server_config, client_config, server_name) = tls_configs();
let start = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let connected = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let release = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let payload = Arc::new(vec![b'p'; TLS_PAYLOAD_BYTES]);
let (result_sender, result_receiver) = std_mpsc::channel();
for index in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let server_config = Arc::clone(&server_config);
let client_config = Arc::clone(&client_config);
let server_name = server_name.clone();
let payload = Arc::clone(&payload);
let result_sender = result_sender.clone();
runtime.spawn(index % shards, async move {
let server = start_tls_echo_server(server_config).await;
let mut stream = connect_tls_client(server.addr, server_name, client_config).await;
let mut buffer = vec![0_u8; TLS_PAYLOAD_BYTES];
start.wait().await;
connected.wait().await;
release.wait().await;
let mut total = 0_u64;
for _ in 0..TLS_CONCURRENT_ROUNDTRIPS {
stream
.write_all(&payload)
.await
.expect("TLS sharded send succeeds");
stream.flush().await.expect("TLS sharded flush succeeds");
assert!(
read_exact_payload(&mut stream, &mut buffer).await,
"TLS sharded receives response"
);
total = total.wrapping_add(buffer.len() as u64);
}
let _ = stream.shutdown().await;
let _ = result_sender.send(total);
});
}
drop(result_sender);
let (coordinator_sender, coordinator_receiver) = std_mpsc::channel();
runtime.spawn(0, async move {
start.wait().await;
connected.wait().await;
record_observed_thread_count();
release.wait().await;
let _ = coordinator_sender.send(());
});
coordinator_receiver
.recv()
.expect("TLS sharded coordinator completes");
let mut total = 0_u64;
for _ in 0..concurrency {
total = total.wrapping_add(
result_receiver
.recv()
.expect("TLS sharded worker completes"),
);
}
total
}
struct UdpReceiveServer {
addr: SocketAddr,
receiver: Mutex<std_mpsc::Receiver<usize>>,
_completion: StreamCompletion<NotUsed>,
}
fn udp_receive_server() -> &'static UdpReceiveServer {
static SERVER: OnceLock<UdpReceiveServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (sender, receiver) = std_mpsc::channel();
let (binding, completion) =
TokioUdp::bind("127.0.0.1:0", UDP_DATAGRAM_SIZE, UDP_RECEIVE_BUFFER)
.to_mat(
Sink::foreach(move |datagram: Datagram| {
let _ = sender.send(datagram.payload().len());
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("UDP receive server materializes");
let binding = binding.wait().expect("UDP receive server binds");
UdpReceiveServer {
addr: binding.local_addr(),
receiver: Mutex::new(receiver),
_completion: completion,
}
})
}
fn udp_loopback_send_receive_128x64b() -> u64 {
let server = udp_receive_server();
{
let receiver = server.receiver.lock().expect("UDP receiver lock");
while receiver.try_recv().is_ok() {}
}
Source::from_iterable(
(0..UDP_DATAGRAMS).map(|_| Datagram::new(vec![b'p'; UDP_PAYLOAD_BYTES], server.addr)),
)
.run_with_materializer(TokioUdp::send_sink("127.0.0.1:0"), materializer())
.expect("UDP sender materializes")
.wait()
.expect("UDP sender completes");
let receiver = server.receiver.lock().expect("UDP receiver lock");
let mut total = 0_u64;
for _ in 0..UDP_DATAGRAMS {
total += receiver
.recv_timeout(Duration::from_secs(30))
.expect("UDP receiver observes datagram") as u64;
}
total
}
struct UdpEchoServer {
addr: SocketAddr,
task: tokio::task::JoinHandle<()>,
}
impl Drop for UdpEchoServer {
fn drop(&mut self) {
self.task.abort();
}
}
fn udp_bench_runtime() -> &'static TokioRuntime {
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.thread_name("datum-net-compare-udp")
.enable_all()
.build()
.expect("UDP benchmark runtime")
})
}
async fn start_udp_echo_server() -> UdpEchoServer {
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("UDP echo server binds");
let addr = socket.local_addr().expect("UDP echo local addr");
let task = tokio::spawn(async move {
let mut buffer = vec![0_u8; UDP_DATAGRAM_SIZE];
loop {
let Ok((read, remote)) = socket.recv_from(&mut buffer).await else {
return;
};
let _ = socket.send_to(&buffer[..read], remote).await;
}
});
UdpEchoServer { addr, task }
}
fn udp_bridge_concurrent_1x1024() -> u64 {
udp_bridge_concurrent(1)
}
fn udp_async_concurrent_1x1024() -> u64 {
udp_async_concurrent(1)
}
fn udp_bridge_concurrent_16x1024() -> u64 {
udp_bridge_concurrent(16)
}
fn udp_async_concurrent_16x1024() -> u64 {
udp_async_concurrent(16)
}
fn udp_bridge_concurrent_64x1024() -> u64 {
udp_bridge_concurrent(64)
}
fn udp_async_concurrent_64x1024() -> u64 {
udp_async_concurrent(64)
}
fn udp_bridge_concurrent_256x1024() -> u64 {
udp_bridge_concurrent(256)
}
fn udp_async_concurrent_256x1024() -> u64 {
udp_async_concurrent(256)
}
fn udp_bridge_concurrent_1024x1024() -> u64 {
udp_bridge_concurrent(1024)
}
fn udp_async_concurrent_1024x1024() -> u64 {
udp_async_concurrent(1024)
}
fn udp_bridge_concurrent(concurrency: usize) -> u64 {
let start = Arc::new(Barrier::new(concurrency + 1));
let connected = Arc::new(Barrier::new(concurrency + 1));
let release = Arc::new(Barrier::new(concurrency + 1));
let handle = udp_bench_runtime().handle().clone();
let payload = Arc::new(vec![b'p'; UDP_PAYLOAD_BYTES]);
thread::scope(|scope| {
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let payload = Arc::clone(&payload);
let handle = handle.clone();
workers.push(scope.spawn(move || {
let (socket, _server) = handle.block_on(async {
let server = start_udp_echo_server().await;
let socket = Arc::new(
UdpSocket::bind("127.0.0.1:0")
.await
.expect("UDP bridge client binds"),
);
socket
.connect(server.addr)
.await
.expect("UDP bridge client connects");
(socket, server)
});
let (sender, mut receiver) =
tokio_mpsc::channel::<Vec<u8>>(UDP_CONCURRENT_RECEIVE_BUFFER);
let read_socket = Arc::clone(&socket);
let read_task = handle.spawn(async move {
let mut buffer = vec![0_u8; UDP_DATAGRAM_SIZE];
loop {
let read = match read_socket.recv(&mut buffer).await {
Ok(read) => read,
Err(_) => return,
};
if sender.try_send(buffer[..read].to_vec()).is_err() {
loop {
match read_socket.try_recv(&mut buffer) {
Ok(_) => {}
Err(error)
if error.kind() == std::io::ErrorKind::WouldBlock =>
{
break;
}
Err(error)
if error.kind() == std::io::ErrorKind::Interrupted => {}
Err(_) => return,
}
}
}
}
});
start.wait();
connected.wait();
release.wait();
let mut total = 0_u64;
for _ in 0..UDP_CONCURRENT_ROUNDTRIPS {
let sent = handle
.block_on(async { socket.send(&payload).await })
.expect("UDP bridge send succeeds");
assert_eq!(sent, UDP_PAYLOAD_BYTES, "UDP bridge short send");
let response = receiver
.blocking_recv()
.expect("UDP bridge receives response");
total = total.wrapping_add(response.len() as u64);
}
read_task.abort();
total
}));
}
start.wait();
connected.wait();
record_observed_thread_count();
release.wait();
workers
.into_iter()
.map(|worker| worker.join().expect("UDP bridge worker joins"))
.fold(0_u64, u64::wrapping_add)
})
}
fn udp_async_concurrent(concurrency: usize) -> u64 {
udp_bench_runtime().block_on(async move {
let start = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let connected = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let release = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let payload = Arc::new(vec![b'p'; UDP_PAYLOAD_BYTES]);
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let payload = Arc::clone(&payload);
workers.push(tokio::spawn(async move {
let server = start_udp_echo_server().await;
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("UDP async client binds");
socket
.connect(server.addr)
.await
.expect("UDP async client connects");
let mut buffer = vec![0_u8; UDP_DATAGRAM_SIZE];
start.wait().await;
connected.wait().await;
release.wait().await;
let mut total = 0_u64;
for _ in 0..UDP_CONCURRENT_ROUNDTRIPS {
let sent = socket
.send(&payload)
.await
.expect("UDP async send succeeds");
assert_eq!(sent, UDP_PAYLOAD_BYTES, "UDP async short send");
let read = socket
.recv(&mut buffer)
.await
.expect("UDP async receives response");
total = total.wrapping_add(read as u64);
}
total
}));
}
start.wait().await;
connected.wait().await;
record_observed_thread_count();
release.wait().await;
let mut total = 0_u64;
for worker in workers {
total = total.wrapping_add(worker.await.expect("UDP async worker joins"));
}
total
})
}
fn quic_configs() -> (quinn::ServerConfig, quinn::ClientConfig) {
let cert_der = QuicCertificateDer::from(decode_base64(QUIC_CERT_DER_B64));
let key_der = QuicPrivateKeyDer::Pkcs8(QuicPrivatePkcs8KeyDer::from(decode_base64(
QUIC_KEY_DER_B64,
)));
let server_crypto = QuicRustlsServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der], key_der)
.expect("QUIC server config");
let ca_der = QuicCertificateDer::from(decode_base64(QUIC_CA_DER_B64));
let mut roots = QuicRootCertStore::empty();
roots.add(ca_der).expect("trust QUIC CA cert");
let client_crypto = QuicRustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
quinn::ServerConfig::with_crypto(Arc::new(
QuicServerConfig::try_from(server_crypto).expect("QUIC server crypto config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).expect("QUIC client crypto config"),
)),
)
}
struct RawQuicEchoServer {
addr: SocketAddr,
task: tokio::task::JoinHandle<()>,
}
impl Drop for RawQuicEchoServer {
fn drop(&mut self) {
self.task.abort();
}
}
struct RawQuicClient {
_endpoint: quinn::Endpoint,
connection: quinn::Connection,
}
async fn start_quic_echo_server(server_config: quinn::ServerConfig) -> RawQuicEchoServer {
let endpoint = quinn::Endpoint::server(
server_config,
"127.0.0.1:0"
.parse()
.expect("QUIC echo server bind address"),
)
.expect("QUIC echo server endpoint");
let addr = endpoint.local_addr().expect("QUIC echo local addr");
let task = tokio::spawn(async move {
let Some(incoming) = endpoint.accept().await else {
return;
};
let Ok(connection) = incoming.await else {
return;
};
let Ok((mut send, mut recv)) = connection.accept_bi().await else {
return;
};
let mut buffer = vec![0_u8; STREAMREF_CHUNK_SIZE];
loop {
match recv.read(&mut buffer).await {
Ok(Some(read)) => {
if send.write_all(&buffer[..read]).await.is_err() {
return;
}
}
Ok(None) => {
let _ = send.finish();
return;
}
Err(_) => return,
}
}
});
RawQuicEchoServer { addr, task }
}
async fn connect_quic_client(
addr: SocketAddr,
client_config: quinn::ClientConfig,
) -> RawQuicClient {
let mut endpoint =
quinn::Endpoint::client("0.0.0.0:0".parse().expect("QUIC echo client bind address"))
.expect("QUIC echo client endpoint");
endpoint.set_default_client_config(client_config);
let connection = endpoint
.connect(addr, "localhost")
.expect("QUIC echo client connect starts")
.await
.expect("QUIC echo client connects");
RawQuicClient {
_endpoint: endpoint,
connection,
}
}
async fn read_exact_quic_payload(recv: &mut quinn::RecvStream, buffer: &mut [u8]) -> bool {
let mut filled = 0_usize;
while filled < buffer.len() {
let read = match recv.read(&mut buffer[filled..]).await {
Ok(Some(read)) => read,
Ok(None) => return false,
Err(_) => return false,
};
filled += read;
}
true
}
fn quic_bridge_concurrent_1x1024() -> u64 {
quic_bridge_concurrent(1)
}
fn quic_async_concurrent_1x1024() -> u64 {
quic_async_concurrent(1)
}
fn quic_bridge_concurrent_16x1024() -> u64 {
quic_bridge_concurrent(16)
}
fn quic_async_concurrent_16x1024() -> u64 {
quic_async_concurrent(16)
}
fn quic_bridge_concurrent_64x1024() -> u64 {
quic_bridge_concurrent(64)
}
fn quic_async_concurrent_64x1024() -> u64 {
quic_async_concurrent(64)
}
fn quic_bridge_concurrent_256x1024() -> u64 {
quic_bridge_concurrent(256)
}
fn quic_async_concurrent_256x1024() -> u64 {
quic_async_concurrent(256)
}
fn quic_bridge_concurrent_1024x1024() -> u64 {
quic_bridge_concurrent(1024)
}
fn quic_async_concurrent_1024x1024() -> u64 {
quic_async_concurrent(1024)
}
fn quic_bridge_concurrent(concurrency: usize) -> u64 {
let start = Arc::new(Barrier::new(concurrency + 1));
let connected = Arc::new(Barrier::new(concurrency + 1));
let release = Arc::new(Barrier::new(concurrency + 1));
let handle = udp_bench_runtime().handle().clone();
let (server_config, client_config) = quic_configs();
let payload = Arc::new(vec![b'p'; QUIC_PAYLOAD_BYTES]);
thread::scope(|scope| {
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let handle = handle.clone();
let server_config = server_config.clone();
let client_config = client_config.clone();
let payload = Arc::clone(&payload);
workers.push(scope.spawn(move || {
let (mut send, mut recv, _client, _server) = handle.block_on(async move {
let server = start_quic_echo_server(server_config).await;
let client = connect_quic_client(server.addr, client_config).await;
let (send, recv) = client
.connection
.open_bi()
.await
.expect("QUIC bridge opens bi stream");
(send, recv, client, server)
});
let (sender, mut receiver) =
tokio_mpsc::channel::<Vec<u8>>(QUIC_CONCURRENT_RECEIVE_BUFFER);
let read_task = handle.spawn(async move {
let mut buffer = vec![0_u8; QUIC_PAYLOAD_BYTES];
while read_exact_quic_payload(&mut recv, &mut buffer).await {
if sender.send(buffer.clone()).await.is_err() {
return;
}
}
});
start.wait();
connected.wait();
release.wait();
let mut total = 0_u64;
for _ in 0..QUIC_CONCURRENT_ROUNDTRIPS {
handle
.block_on(async { send.write_all(&payload).await })
.expect("QUIC bridge send succeeds");
let response = receiver
.blocking_recv()
.expect("QUIC bridge receives response");
total = total.wrapping_add(response.len() as u64);
}
let _ = send.finish();
read_task.abort();
total
}));
}
start.wait();
connected.wait();
record_observed_thread_count();
release.wait();
workers
.into_iter()
.map(|worker| worker.join().expect("QUIC bridge worker joins"))
.fold(0_u64, u64::wrapping_add)
})
}
fn quic_async_concurrent(concurrency: usize) -> u64 {
let (server_config, client_config) = quic_configs();
udp_bench_runtime().block_on(async move {
let start = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let connected = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let release = Arc::new(tokio::sync::Barrier::new(concurrency + 1));
let payload = Arc::new(vec![b'p'; QUIC_PAYLOAD_BYTES]);
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let server_config = server_config.clone();
let client_config = client_config.clone();
let payload = Arc::clone(&payload);
workers.push(tokio::spawn(async move {
let server = start_quic_echo_server(server_config).await;
let client = connect_quic_client(server.addr, client_config).await;
let (mut send, mut recv) = client
.connection
.open_bi()
.await
.expect("QUIC async opens bi stream");
let mut buffer = vec![0_u8; QUIC_PAYLOAD_BYTES];
start.wait().await;
connected.wait().await;
release.wait().await;
let mut total = 0_u64;
for _ in 0..QUIC_CONCURRENT_ROUNDTRIPS {
send.write_all(&payload)
.await
.expect("QUIC async send succeeds");
assert!(
read_exact_quic_payload(&mut recv, &mut buffer).await,
"QUIC async receives response"
);
total = total.wrapping_add(buffer.len() as u64);
}
let _ = send.finish();
total
}));
}
start.wait().await;
connected.wait().await;
record_observed_thread_count();
release.wait().await;
let mut total = 0_u64;
for worker in workers {
total = total.wrapping_add(worker.await.expect("QUIC async worker joins"));
}
total
})
}
struct StreamRefQuicServer {
addr: SocketAddr,
client_config: quinn::ClientConfig,
_completion: StreamCompletion<NotUsed>,
}
fn streamref_quic_server() -> &'static StreamRefQuicServer {
static SERVER: OnceLock<StreamRefQuicServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config) = quic_configs();
let (binding, completion) =
TokioQuic::bind("127.0.0.1:0", server_config, STREAMREF_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: QuicIncomingConnection| {
let stream = connection
.accept_bi_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("accept bi materializes")
.wait()
.expect("accept bi stream");
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with(StreamRefs::source_ref())
.expect("source ref materializes");
let handle = serve_source_ref_over_quic(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("serve source ref over quic");
let _ = handle.wait();
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("QUIC streamref server materializes");
let binding = binding.wait().expect("QUIC streamref server binds");
StreamRefQuicServer {
addr: binding.local_addr(),
client_config,
_completion: completion,
}
})
}
fn streamref_remote_1024() -> u64 {
let server = streamref_quic_server();
let connection = TokioQuic::connect(
server.addr,
"localhost",
server.client_config.clone(),
STREAMREF_CHUNK_SIZE,
)
.run_with(Sink::head())
.expect("QUIC client connection materializes")
.wait()
.expect("QUIC client connects");
let client_stream = connection
.open_bi_stream_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("client bi stream materializes")
.wait()
.expect("client opens bi stream");
let (remote_source, handle) =
source_ref_over_quic::<u64>(client_stream, STREAMREF_ID, StreamRefSettings::default());
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("remote source fold materializes")
.wait()
.expect("remote source fold");
handle.wait().expect("stream ref handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(sum, expected_checksum, "streamref quic checksum mismatch");
sum
}
struct StreamRefQuicReuseServer {
addr: SocketAddr,
client_config: quinn::ClientConfig,
_completion: StreamCompletion<NotUsed>,
}
fn streamref_quic_reuse_server() -> &'static StreamRefQuicReuseServer {
static SERVER: OnceLock<StreamRefQuicReuseServer> = OnceLock::new();
SERVER.get_or_init(|| {
let (server_config, client_config) = quic_configs();
let (binding, completion) =
TokioQuic::bind("127.0.0.1:0", server_config, STREAMREF_CHUNK_SIZE)
.to_mat(
Sink::foreach(|connection: QuicIncomingConnection| {
let conn = connection.connection();
loop {
let stream = match conn
.accept_bi_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("reuse accept bi materializes")
.wait()
{
Ok(stream) => stream,
Err(_) => break,
};
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with(StreamRefs::source_ref())
.expect("reuse source ref materializes");
let handle = serve_source_ref_over_quic(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("reuse serve source ref over quic");
let _ = handle.wait();
}
}),
Keep::both,
)
.run_with_materializer(materializer())
.expect("QUIC reuse server materializes");
let binding = binding.wait().expect("QUIC reuse server binds");
StreamRefQuicReuseServer {
addr: binding.local_addr(),
client_config,
_completion: completion,
}
})
}
struct StreamRefQuicReuseConnection {
connection: QuicConnection,
}
fn streamref_quic_reuse_connection() -> &'static StreamRefQuicReuseConnection {
static CONN: OnceLock<StreamRefQuicReuseConnection> = OnceLock::new();
CONN.get_or_init(|| {
let server = streamref_quic_reuse_server();
let connection = TokioQuic::connect(
server.addr,
"localhost",
server.client_config.clone(),
STREAMREF_CHUNK_SIZE,
)
.run_with(Sink::head())
.expect("reuse QUIC client connection materializes")
.wait()
.expect("reuse QUIC client connects");
StreamRefQuicReuseConnection { connection }
})
}
fn streamref_remote_reuse_1024() -> u64 {
let conn_state = streamref_quic_reuse_connection();
let client_stream = conn_state
.connection
.open_bi_stream_available(STREAMREF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("reuse client bi stream materializes")
.wait()
.expect("reuse client opens bi stream");
let (remote_source, handle) =
source_ref_over_quic::<u64>(client_stream, STREAMREF_ID, StreamRefSettings::default());
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("reuse remote source fold materializes")
.wait()
.expect("reuse remote source fold");
handle.wait().expect("reuse stream ref handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(
sum, expected_checksum,
"reuse streamref quic checksum mismatch"
);
sum
}
fn streamref_remote_tcp_1024() -> u64 {
streamref_remote_tcp_n(STREAMREF_N)
}
fn streamref_remote_tcp_n_1() -> u64 {
streamref_remote_tcp_n(1)
}
fn streamref_remote_tcp_n_4() -> u64 {
streamref_remote_tcp_n(4)
}
fn streamref_remote_tcp_n_16() -> u64 {
streamref_remote_tcp_n(16)
}
fn streamref_remote_tcp_n_64() -> u64 {
streamref_remote_tcp_n(64)
}
fn streamref_remote_tcp_n_256() -> u64 {
streamref_remote_tcp_n(256)
}
fn streamref_remote_tcp_n_1024() -> u64 {
streamref_remote_tcp_n(1024)
}
fn streamref_remote_tcp_n_4096() -> u64 {
streamref_remote_tcp_n(4096)
}
fn streamref_remote_tcp_n_16384() -> u64 {
streamref_remote_tcp_n(16_384)
}
fn streamref_remote_tcp_n(n: u64) -> u64 {
let source_ref = Source::from_iter(0_u64..n)
.run_with_materializer(StreamRefs::source_ref(), materializer())
.expect("TCP source ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("serve source ref over tcp");
let (remote_source, handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAMREF_ID,
StreamRefSettings::default(),
)
.expect("source ref over tcp");
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("TCP remote source fold materializes")
.wait()
.expect("TCP remote source fold");
handle.wait().expect("TCP stream ref handle completes");
server_handle
.wait()
.expect("TCP stream ref server handle completes");
let expected_checksum: u64 = (0_u64..n).sum();
assert_eq!(sum, expected_checksum, "streamref tcp checksum mismatch");
sum
}
fn streamref_remote_tcp_1024_with_diagnostics() -> (u64, StreamRefProtocolMessageCounts) {
let diagnostics = StreamRefProtocolDiagnostics::new();
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with_materializer(StreamRefs::source_ref(), materializer())
.expect("diagnostic TCP source ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp_with_diagnostics(
"127.0.0.1:0",
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
Some(diagnostics.clone()),
)
.expect("diagnostic serve source ref over tcp");
let (remote_source, handle) = source_ref_over_tcp_with_diagnostics::<u64, _>(
binding.local_addr(),
STREAMREF_ID,
StreamRefSettings::default(),
Some(diagnostics.clone()),
)
.expect("diagnostic source ref over tcp");
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("diagnostic TCP remote source fold materializes")
.wait()
.expect("diagnostic TCP remote source fold");
handle
.wait()
.expect("diagnostic TCP stream ref handle completes");
server_handle
.wait()
.expect("diagnostic TCP stream ref server handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(
sum, expected_checksum,
"diagnostic streamref tcp checksum mismatch"
);
(sum, diagnostics.snapshot())
}
struct StreamRefTcpReuseServer {
addr: SocketAddr,
}
struct StreamRefTcpConcurrentServer {
addr: SocketAddr,
}
fn tcp_bench_runtime() -> &'static TokioRuntime {
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("datum-net-compare-tcp")
.enable_all()
.build()
.expect("TCP benchmark runtime")
})
}
fn streamref_tcp_reuse_server() -> &'static StreamRefTcpReuseServer {
static SERVER: OnceLock<StreamRefTcpReuseServer> = OnceLock::new();
SERVER.get_or_init(|| {
let runtime = tcp_bench_runtime();
let listener = runtime
.block_on(async { TcpListener::bind("127.0.0.1:0").await })
.expect("TCP reuse listener binds");
let addr = listener.local_addr().expect("TCP reuse local addr");
let handle = runtime.handle().clone();
thread::spawn(move || {
loop {
let stream = handle
.block_on(async { listener.accept().await })
.expect("TCP reuse accept")
.0;
stream.set_nodelay(true).expect("TCP reuse nodelay");
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with_materializer(StreamRefs::source_ref(), materializer())
.expect("TCP reuse source ref materializes");
let carrier = handle
.block_on(async {
serve_source_ref_over_tcp_stream(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
})
.expect("TCP reuse serve source ref over stream");
let _ = carrier.wait();
}
});
StreamRefTcpReuseServer { addr }
})
}
fn streamref_tcp_concurrent_server() -> &'static StreamRefTcpConcurrentServer {
static SERVER: OnceLock<StreamRefTcpConcurrentServer> = OnceLock::new();
SERVER.get_or_init(|| {
let runtime = tcp_bench_runtime();
let listener = runtime
.block_on(async { TcpListener::bind("127.0.0.1:0").await })
.expect("TCP concurrent listener binds");
let addr = listener.local_addr().expect("TCP concurrent local addr");
let handle = runtime.handle().clone();
thread::spawn(move || {
loop {
let stream = handle
.block_on(async { listener.accept().await })
.expect("TCP concurrent accept")
.0;
stream.set_nodelay(true).expect("TCP concurrent nodelay");
let source_ref = Source::from_iter(0_u64..STREAMREF_N)
.run_with_materializer(StreamRefs::source_ref(), materializer())
.expect("TCP concurrent source ref materializes");
let carrier = handle
.block_on(async {
serve_source_ref_over_tcp_stream(
stream,
source_ref,
STREAMREF_ID,
StreamRefSettings::default(),
)
})
.expect("TCP concurrent serve source ref over stream");
drop(carrier);
}
});
StreamRefTcpConcurrentServer { addr }
})
}
fn streamref_remote_tcp_reuse_1024() -> u64 {
let server = streamref_tcp_reuse_server();
let (remote_source, handle) = tcp_bench_runtime()
.block_on(async {
let stream = TcpStream::connect(server.addr)
.await
.expect("TCP reuse client connects");
stream.set_nodelay(true).expect("TCP reuse client nodelay");
source_ref_over_tcp_stream::<u64>(stream, STREAMREF_ID, StreamRefSettings::default())
})
.expect("TCP reuse source ref over stream");
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("TCP reuse remote source fold materializes")
.wait()
.expect("TCP reuse remote source fold");
handle
.wait()
.expect("TCP reuse stream ref handle completes");
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
assert_eq!(
sum, expected_checksum,
"reuse streamref tcp checksum mismatch"
);
sum
}
fn streamref_remote_tcp_concurrent_1x1024() -> u64 {
streamref_remote_tcp_concurrent(1)
}
fn streamref_remote_tcp_concurrent_16x1024() -> u64 {
streamref_remote_tcp_concurrent(16)
}
fn streamref_remote_tcp_concurrent_64x1024() -> u64 {
streamref_remote_tcp_concurrent(64)
}
fn streamref_remote_tcp_concurrent_256x1024() -> u64 {
streamref_remote_tcp_concurrent(256)
}
fn streamref_remote_tcp_concurrent(concurrency: usize) -> u64 {
let server = streamref_tcp_concurrent_server();
let start = Arc::new(Barrier::new(concurrency + 1));
let connected = Arc::new(Barrier::new(concurrency + 1));
let release = Arc::new(Barrier::new(concurrency + 1));
let expected_checksum: u64 = (0_u64..STREAMREF_N).sum();
let handle = tcp_bench_runtime().handle().clone();
thread::scope(|scope| {
let mut workers = Vec::with_capacity(concurrency);
for _ in 0..concurrency {
let start = Arc::clone(&start);
let connected = Arc::clone(&connected);
let release = Arc::clone(&release);
let handle = handle.clone();
workers.push(scope.spawn(move || {
start.wait();
let (remote_source, carrier) = handle
.block_on(async {
let stream = TcpStream::connect(server.addr)
.await
.expect("TCP concurrent client connects");
stream
.set_nodelay(true)
.expect("TCP concurrent client nodelay");
source_ref_over_tcp_stream::<u64>(
stream,
STREAMREF_ID,
StreamRefSettings::default(),
)
})
.expect("TCP concurrent source ref over stream");
record_observed_thread_count();
connected.wait();
release.wait();
let sum = remote_source
.run_with_materializer(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
materializer(),
)
.expect("TCP concurrent remote source fold materializes")
.wait()
.expect("TCP concurrent remote source fold");
carrier
.wait()
.expect("TCP concurrent stream ref handle completes");
assert_eq!(
sum, expected_checksum,
"concurrent streamref tcp checksum mismatch"
);
record_observed_thread_count();
sum
}));
}
start.wait();
connected.wait();
record_observed_thread_count();
release.wait();
workers
.into_iter()
.map(|worker| worker.join().expect("TCP concurrent worker joins"))
.fold(0_u64, u64::wrapping_add)
})
}
fn decode_base64(input: &str) -> Vec<u8> {
let mut output = Vec::with_capacity(input.len() * 3 / 4);
let mut buffer = 0_u32;
let mut bits = 0_u8;
for byte in input.bytes() {
let value = match byte {
b'A'..=b'Z' => byte - b'A',
b'a'..=b'z' => byte - b'a' + 26,
b'0'..=b'9' => byte - b'0' + 52,
b'+' => 62,
b'/' => 63,
b'=' => break,
_ => panic!("invalid base64 byte {byte}"),
};
buffer = (buffer << 6) | u32::from(value);
bits += 6;
if bits >= 8 {
bits -= 8;
output.push(((buffer >> bits) & 0xff) as u8);
}
}
output
}