use tari_shutdown::Shutdown;
use tari_test_utils::{async_assert_eventually, unpack_enum};
use tokio::sync::mpsc;
use crate::{
connection_manager::PeerConnection,
protocol::{
rpc::{
test::{
greeting_service::{GreetingClient, GreetingServer, GreetingService},
mock::create_mocked_rpc_context,
},
NamedProtocolService,
RpcServer,
},
ProtocolEvent,
ProtocolId,
ProtocolNotification,
},
runtime,
runtime::task,
test_utils::mocks::{new_peer_connection_mock_pair, PeerConnectionMockState},
};
async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectionMockState, Shutdown) {
let (conn1, conn1_state, conn2, conn2_state) = new_peer_connection_mock_pair().await;
let (notif_tx, notif_rx) = mpsc::channel(1);
let shutdown = Shutdown::new();
let (context, _) = create_mocked_rpc_context();
task::spawn(
RpcServer::builder()
.with_maximum_simultaneous_sessions(num_concurrent_sessions)
.finish()
.add_service(GreetingServer::new(GreetingService::default()))
.serve(notif_rx, context),
);
task::spawn(async move {
while let Some(stream) = conn2_state.next_incoming_substream().await {
notif_tx
.send(ProtocolNotification::new(
ProtocolId::from_static(GreetingClient::PROTOCOL_NAME),
ProtocolEvent::NewInboundSubstream(conn2.peer_node_id().clone(), stream),
))
.await
.unwrap();
}
});
(conn1, conn1_state, shutdown)
}
mod lazy_pool {
use super::*;
use crate::protocol::rpc::client::pool::{LazyPool, RpcClientPoolError};
#[tokio::test]
async fn it_connects_lazily() {
let (conn, mock_state, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 2, Default::default());
assert_eq!(mock_state.num_open_substreams(), 0);
let _conn1 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
let _conn2 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 2);
}
#[tokio::test]
async fn it_reuses_unused_connections() {
let (conn, mock_state, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 2, Default::default());
let _ = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(pool.refresh_num_active_connections(), 1);
async_assert_eventually!(mock_state.num_open_substreams(), expect = 1);
let _ = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(pool.refresh_num_active_connections(), 1);
async_assert_eventually!(mock_state.num_open_substreams(), expect = 1);
}
#[tokio::test]
async fn it_reuses_least_used_connections() {
let (conn, mock_state, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 2, Default::default());
let conn1 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
let conn2 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 2);
let conn3 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(conn3.lease_count(), 2);
assert!((conn1.lease_count() == 1) ^ (conn2.lease_count() == 1));
assert_eq!(mock_state.num_open_substreams(), 2);
let conn4 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(conn4.lease_count(), 2);
assert_eq!(mock_state.num_open_substreams(), 2);
assert_eq!(conn1.lease_count(), 2);
assert_eq!(conn2.lease_count(), 2);
assert_eq!(conn3.lease_count(), 2);
}
#[tokio::test]
async fn it_reuses_used_connections_if_necessary() {
let (conn, mock_state, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 1, Default::default());
let conn1 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
let conn2 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
drop(conn1);
drop(conn2);
}
#[tokio::test]
async fn it_gracefully_handles_insufficient_server_sessions() {
let (conn, mock_state, _shutdown) = setup(1).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 2, Default::default());
let conn1 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
let conn2 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
assert_eq!(conn1.lease_count(), 2);
assert_eq!(conn2.lease_count(), 2);
}
#[tokio::test]
async fn it_prunes_disconnected_sessions() {
let (conn, mock_state, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(conn, 2, Default::default());
let mut client1 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 1);
let _client2 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(mock_state.num_open_substreams(), 2);
client1.close().await;
drop(client1);
async_assert_eventually!(mock_state.num_open_substreams(), expect = 1);
assert_eq!(pool.refresh_num_active_connections(), 1);
let _client3 = pool.get_least_used_or_connect().await.unwrap();
assert_eq!(pool.refresh_num_active_connections(), 2);
assert_eq!(mock_state.num_open_substreams(), 2);
}
#[tokio::test]
async fn it_fails_when_peer_connected_disconnects() {
let (mut peer_conn, _, _shutdown) = setup(2).await;
let mut pool = LazyPool::<GreetingClient>::new(peer_conn.clone(), 2, Default::default());
let mut _conn1 = pool.get_least_used_or_connect().await.unwrap();
peer_conn.disconnect().await.unwrap();
let err = pool.get_least_used_or_connect().await.unwrap_err();
unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err);
}
}