mod common;
use std::time::Duration;
use bytes::Bytes;
use deadline::deadline;
use humansize::{format_size, ToF64, Unsigned, DECIMAL};
use peak_alloc::PeakAlloc;
use quickie::*;
#[global_allocator]
static PEAK_ALLOC: PeakAlloc = PeakAlloc;
fn fmt_size(size: impl ToF64 + Unsigned) -> String {
format_size(size, DECIMAL)
}
#[tokio::test]
async fn cleanups_conns() {
const NUM_CONNS: usize = 100;
let initial_heap = PEAK_ALLOC.current_usage();
let (client_cfg, server_cfg) = common::client_and_server_config();
let node = common::TestNode(Node::new(Config::new(
Some(client_cfg.clone()),
Some(server_cfg.clone()),
)));
let idle_node_size = PEAK_ALLOC.current_usage() - initial_heap;
let node_addr = node.start("127.0.0.1:0".parse().unwrap()).await.unwrap();
let heap_after_node_setup = PEAK_ALLOC.current_usage();
let mut avg_heap = 0;
let mut heap_after_32_conns = 0;
for i in 0..NUM_CONNS {
let raw_endpoint = common::raw_endpoint(client_cfg.clone(), server_cfg.clone());
let raw_endpoint_addr = raw_endpoint.local_addr().unwrap();
let (conn_id, connection, raw_endpoint) = if i % 2 == 0 {
let connection_and_endpoint = tokio::spawn(async move {
if let Some(conn) = raw_endpoint.accept().await {
(conn.await.unwrap(), raw_endpoint)
} else {
panic!("failed to accept a connection");
}
});
let conn_id = node
.connect(raw_endpoint_addr, common::SERVER_NAME)
.await
.unwrap();
let (connection, endpoint) = connection_and_endpoint.await.unwrap();
(conn_id, connection, endpoint)
} else {
let connection = raw_endpoint
.connect(node_addr, common::SERVER_NAME)
.unwrap()
.await
.unwrap();
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || node_clone.num_connections()
== 1);
let conn_id = node.get_connections().pop().unwrap().stable_id();
(conn_id, connection, raw_endpoint)
};
if i % 2 == 0 {
let stream_id = node.open_uni(conn_id).await.unwrap();
node.send_msg(conn_id, stream_id, Bytes::from_static(b"herp derp"))
.unwrap();
} else {
let mut send_stream = connection.open_uni().await.unwrap();
send_stream.write_all(b"herp derp").await.unwrap();
}
assert!(node.disconnect(conn_id, Default::default(), &[]).await);
connection.close(Default::default(), &[]);
connection.closed().await;
raw_endpoint.close(Default::default(), &[]);
raw_endpoint.wait_idle().await;
let current_heap = PEAK_ALLOC.current_usage();
if i == 32 {
heap_after_32_conns = current_heap;
}
avg_heap += current_heap;
}
avg_heap /= NUM_CONNS;
let final_heap = PEAK_ALLOC.current_usage();
let heap_growth = final_heap.saturating_sub(heap_after_32_conns);
let max_heap = PEAK_ALLOC.peak_usage();
let single_node_size = heap_after_node_setup - initial_heap;
println!("---- heap use summary ----\n");
println!("before node setup: {}", fmt_size(initial_heap));
println!("after node setup: {}", fmt_size(heap_after_node_setup));
println!("after 32 connections: {}", fmt_size(heap_after_32_conns));
println!("after {} connections: {}", NUM_CONNS, fmt_size(final_heap));
println!("average memory use: {}", fmt_size(avg_heap));
println!("maximum memory use: {}", fmt_size(max_heap)); println!();
println!("idle node size: {}", fmt_size(idle_node_size));
println!("started node size: {}", fmt_size(single_node_size));
println!("leaked memory: {}", fmt_size(heap_growth));
println!();
assert_eq!(heap_growth, 0);
}