use std::fmt::Debug;
use std::sync::Arc;
use std::sync::Once;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use crate::client::ConnectivityState;
use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::DynLbPolicyBuilder;
use crate::client::load_balancing::FailingPicker;
use crate::client::load_balancing::GLOBAL_LB_REGISTRY;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbPolicyBuilder;
use crate::client::load_balancing::LbPolicyOptions;
use crate::client::load_balancing::LbState;
use crate::client::load_balancing::PickResult;
use crate::client::load_balancing::Picker;
use crate::client::load_balancing::Subchannel;
use crate::client::load_balancing::SubchannelState;
use crate::client::load_balancing::child_manager::ChildManager;
use crate::client::load_balancing::child_manager::ChildUpdate;
use crate::client::load_balancing::pick_first;
use crate::client::name_resolution::Endpoint;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
pub(crate) static POLICY_NAME: &str = "round_robin";
static START: Once = Once::new();
#[derive(Debug)]
pub(crate) struct RoundRobinBuilder {}
impl LbPolicyBuilder for RoundRobinBuilder {
type LbPolicy = RoundRobinPolicy;
fn build(&self, options: LbPolicyOptions) -> Self::LbPolicy {
let child_manager = ChildManager::new(options.runtime, options.work_scheduler);
RoundRobinPolicy::new(
child_manager,
GLOBAL_LB_REGISTRY
.get_policy(pick_first::POLICY_NAME)
.unwrap(),
)
}
fn name(&self) -> &'static str {
POLICY_NAME
}
}
#[derive(Debug)]
pub(crate) struct RoundRobinPolicy {
child_manager: ChildManager<Endpoint>,
pick_first_builder: Arc<DynLbPolicyBuilder>,
}
impl RoundRobinPolicy {
fn new(
child_manager: ChildManager<Endpoint>,
pick_first_builder: Arc<DynLbPolicyBuilder>,
) -> Self {
Self {
child_manager,
pick_first_builder,
}
}
fn move_to_transient_failure(
&mut self,
error: String,
channel_controller: &mut dyn ChannelController,
) {
channel_controller.update_picker(LbState {
connectivity_state: ConnectivityState::TransientFailure,
picker: Arc::new(FailingPicker { error }),
});
channel_controller.request_resolution();
}
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController) {
if !self.child_manager.child_updated() {
return;
}
let aggregate_state = self.child_manager.aggregate_states();
let pickers = self
.child_manager
.children()
.filter(|cs| cs.state.connectivity_state == aggregate_state)
.map(|cs| cs.state.picker.clone())
.collect();
let picker_update = LbState {
connectivity_state: aggregate_state,
picker: Arc::new(RoundRobinPicker::new(pickers)),
};
channel_controller.update_picker(picker_update);
}
fn handle_resolver_error(
&mut self,
resolver_update: ResolverUpdate,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
let err = format!(
"Received error from name resolver: {}",
resolver_update.endpoints.as_ref().unwrap_err()
);
if self.child_manager.children().next().is_none() {
self.move_to_transient_failure(err.clone(), channel_controller);
return Err(err);
}
let _ = self
.child_manager
.resolver_update(resolver_update, None, channel_controller);
self.update_picker(channel_controller);
Err(err)
}
}
impl LbPolicy for RoundRobinPolicy {
type LbConfig = ();
fn resolver_update(
&mut self,
update: ResolverUpdate,
config: Option<&Self::LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
if update.endpoints.is_err() {
return self.handle_resolver_error(update, channel_controller);
}
let updates = update.endpoints.as_ref().unwrap().iter().map(|e| {
let update = ResolverUpdate {
attributes: crate::attributes::Attributes::default(),
endpoints: Ok(vec![e.clone()]),
service_config: update.service_config.clone(),
resolution_note: None,
};
ChildUpdate {
child_identifier: e.clone(),
child_policy_builder: self.pick_first_builder.clone(),
child_update: Some((update, None)),
}
});
self.child_manager
.update(updates, channel_controller)
.unwrap();
if self.child_manager.children().next().is_none() {
let err = "Received empty address list from the name resolver";
self.move_to_transient_failure(err.into(), channel_controller);
return Err(err.into());
}
self.update_picker(channel_controller);
Ok(())
}
fn subchannel_update(
&mut self,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
channel_controller: &mut dyn ChannelController,
) {
self.child_manager
.subchannel_update(subchannel, state, channel_controller);
self.update_picker(channel_controller);
}
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.work(channel_controller);
self.update_picker(channel_controller);
}
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.exit_idle(channel_controller);
self.update_picker(channel_controller);
}
}
pub(crate) fn reg() {
START.call_once(|| {
GLOBAL_LB_REGISTRY.add_builder(RoundRobinBuilder {});
});
}
#[derive(Debug)]
struct RoundRobinPicker {
pickers: Vec<Arc<dyn Picker>>,
next: AtomicUsize,
}
impl RoundRobinPicker {
fn new(pickers: Vec<Arc<dyn Picker>>) -> Self {
let random_index: usize = rand::random_range(..pickers.len());
Self {
pickers,
next: AtomicUsize::new(random_index),
}
}
}
impl Picker for RoundRobinPicker {
fn pick(&self, request_headers: &RequestHeaders) -> PickResult {
let len = self.pickers.len();
let idx = self.next.fetch_add(1, Ordering::Relaxed) % len;
self.pickers[idx].pick(request_headers)
}
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::panic;
use std::sync::Arc;
use std::sync::mpsc;
use crate::StatusCodeError;
use crate::client::ConnectivityState;
use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::FailingPicker;
use crate::client::load_balancing::GLOBAL_LB_REGISTRY;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbState;
use crate::client::load_balancing::Pick;
use crate::client::load_balancing::PickResult;
use crate::client::load_balancing::Picker;
use crate::client::load_balancing::QueuingPicker;
use crate::client::load_balancing::Subchannel;
use crate::client::load_balancing::SubchannelState;
use crate::client::load_balancing::child_manager::ChildManager;
use crate::client::load_balancing::pick_first;
use crate::client::load_balancing::round_robin::RoundRobinPolicy;
use crate::client::load_balancing::round_robin::{self};
use crate::client::load_balancing::test_utils::StubPolicyData;
use crate::client::load_balancing::test_utils::StubPolicyFuncs;
use crate::client::load_balancing::test_utils::TestChannelController;
use crate::client::load_balancing::test_utils::TestEvent;
use crate::client::load_balancing::test_utils::TestWorkScheduler;
use crate::client::load_balancing::test_utils::{self};
use crate::client::name_resolution::Address;
use crate::client::name_resolution::Endpoint;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::default_runtime;
const DEFAULT_TEST_SHORT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
type SetupResult = (
mpsc::Receiver<TestEvent>,
RoundRobinPolicy,
Box<dyn ChannelController>,
);
fn setup(test_name: &'static str) -> SetupResult {
pick_first::reg();
round_robin::reg();
test_utils::reg_stub_policy(test_name, create_funcs_for_roundrobin_tests());
let (tx_events, rx_events) = mpsc::channel();
let work_scheduler = Arc::new(TestWorkScheduler {
tx_events: tx_events.clone(),
});
let child_manager = ChildManager::new(default_runtime(), work_scheduler);
let tcc = Box::new(TestChannelController { tx_events });
let child_policy_builder = GLOBAL_LB_REGISTRY.get_policy(test_name).unwrap();
let lb_policy = RoundRobinPolicy::new(child_manager, child_policy_builder);
(rx_events, lb_policy, tcc)
}
struct TestSubchannelList {
subchannels: Vec<Arc<dyn Subchannel>>,
}
impl TestSubchannelList {
fn new(addresses: &[Address], channel_controller: &mut dyn ChannelController) -> Self {
TestSubchannelList {
subchannels: addresses
.iter()
.map(|a| channel_controller.new_subchannel(a).0)
.collect(),
}
}
fn contains(&self, sc: &Arc<dyn Subchannel>) -> bool {
self.subchannels.contains(sc)
}
}
fn create_endpoints(num_endpoints: usize, num_addresses: usize) -> Vec<Endpoint> {
let mut endpoints = Vec::with_capacity(num_endpoints);
for i in 0..num_endpoints {
let mut addresses: Vec<Address> = Vec::with_capacity(num_addresses);
for j in 0..num_addresses {
addresses.push(Address {
address: format!("{}.{}.{}.{}:{}", i + 1, i + 1, i + 1, i + 1, j).into(),
..Default::default()
});
}
endpoints.push(Endpoint {
addresses,
..Default::default()
})
}
endpoints
}
fn send_resolver_update_to_policy(
lb_policy: &mut impl LbPolicy,
endpoints: Vec<Endpoint>,
tcc: &mut dyn ChannelController,
) {
let update = ResolverUpdate {
endpoints: Ok(endpoints),
..Default::default()
};
let _ = lb_policy.resolver_update(update, None, tcc);
}
fn send_resolver_error_to_policy(
lb_policy: &mut RoundRobinPolicy,
err: String,
tcc: &mut dyn ChannelController,
) {
let update = ResolverUpdate {
endpoints: Err(err),
..Default::default()
};
let _ = lb_policy.resolver_update(update, None, tcc);
}
fn move_subchannel_to_state(
lb_policy: &mut impl LbPolicy,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
tcc: &mut dyn ChannelController,
) {
lb_policy.subchannel_update(subchannel, state, tcc);
}
fn move_subchannel_to_transient_failure(
lb_policy: &mut impl LbPolicy,
subchannel: Arc<dyn Subchannel>,
err: &str,
tcc: &mut dyn ChannelController,
) {
lb_policy.subchannel_update(
subchannel,
&SubchannelState {
connectivity_state: ConnectivityState::TransientFailure,
last_connection_error: Some(err.into()),
},
tcc,
);
}
#[derive(Debug)]
struct OneSubchannelPicker {
sc: Arc<dyn Subchannel>,
}
impl Picker for OneSubchannelPicker {
fn pick(&self, _: &RequestHeaders) -> PickResult {
PickResult::Pick(Pick {
subchannel: self.sc.clone(),
on_complete: None,
metadata: MetadataMap::new(),
})
}
}
fn addresses_from_endpoints(endpoints: &[Endpoint]) -> Vec<Address> {
let mut addresses: Vec<Address> = endpoints
.iter()
.flat_map(|ep| ep.addresses.clone())
.collect();
let mut uniques = HashSet::new();
addresses.retain(|e| uniques.insert(e.clone()));
addresses
}
struct PickFirstState {
subchannel_list: Option<TestSubchannelList>,
selected_subchannel: Option<Arc<dyn Subchannel>>,
addresses: Vec<Address>,
connectivity_state: ConnectivityState,
}
fn create_funcs_for_roundrobin_tests() -> StubPolicyFuncs {
StubPolicyFuncs {
resolver_update: Some(Arc::new(
|data: &mut StubPolicyData, update: ResolverUpdate, _, channel_controller| {
let state = data
.test_data
.get_or_insert_with(|| {
Box::new(PickFirstState {
subchannel_list: None,
selected_subchannel: None,
addresses: vec![],
connectivity_state: ConnectivityState::Connecting,
})
})
.downcast_mut::<PickFirstState>()
.unwrap();
if let Err(error) = update.endpoints {
if state.addresses.is_empty()
|| state.connectivity_state == ConnectivityState::TransientFailure
{
channel_controller.update_picker(LbState {
connectivity_state: ConnectivityState::TransientFailure,
picker: Arc::new(FailingPicker {
error: error.to_string(),
}),
});
state.connectivity_state = ConnectivityState::TransientFailure;
channel_controller.request_resolution();
}
return Ok(());
};
let endpoints = update.endpoints.unwrap();
let new_addresses = addresses_from_endpoints(&endpoints);
if new_addresses.is_empty() {
channel_controller.update_picker(LbState {
connectivity_state: ConnectivityState::TransientFailure,
picker: Arc::new(FailingPicker {
error: "Received empty address list from the name resolver"
.to_string(),
}),
});
state.connectivity_state = ConnectivityState::TransientFailure;
channel_controller.request_resolution();
return Err("Received empty address list from the name resolver".into());
}
if state.connectivity_state != ConnectivityState::Idle {
state.subchannel_list =
Some(TestSubchannelList::new(&new_addresses, channel_controller));
}
state.addresses = new_addresses;
Ok(())
},
)),
subchannel_update: Some(Arc::new(
|data: &mut StubPolicyData, subchannel, state, channel_controller| {
let test_data = data.test_data.as_mut().unwrap(); let test_state = test_data.downcast_mut::<PickFirstState>().unwrap();
let scl = &mut test_state.subchannel_list.as_ref().unwrap();
assert!(
scl.contains(&subchannel),
"subchannel_update received an update for a subchannel it does not own."
);
test_state.connectivity_state = state.connectivity_state;
match state.connectivity_state {
ConnectivityState::Ready => {
channel_controller.update_picker(LbState {
connectivity_state: state.connectivity_state,
picker: Arc::new(OneSubchannelPicker { sc: subchannel }),
});
}
ConnectivityState::Idle => {}
ConnectivityState::Connecting => {
channel_controller.update_picker(LbState {
connectivity_state: state.connectivity_state,
picker: Arc::new(QueuingPicker {}),
});
}
ConnectivityState::TransientFailure => {
channel_controller.update_picker(LbState {
connectivity_state: state.connectivity_state,
picker: Arc::new(FailingPicker {
error: state
.last_connection_error
.as_ref()
.unwrap()
.to_string(),
}),
});
}
}
},
)),
..Default::default()
}
}
fn create_endpoint(num_addresses: usize) -> Endpoint {
let mut addresses = Vec::with_capacity(num_addresses);
for i in 0..num_addresses {
addresses.push(Address {
address: format!("{}.{}.{}.{}:{}", i, i, i, i, i).into(),
..Default::default()
});
}
Endpoint {
addresses,
..Default::default()
}
}
fn verify_subchannel_creation(
rx_events: &mut mpsc::Receiver<TestEvent>,
number_of_subchannels: usize,
) -> Vec<Arc<dyn Subchannel>> {
let mut subchannels = Vec::new();
for _ in 0..number_of_subchannels {
match rx_events.recv().unwrap() {
TestEvent::NewSubchannel(sc) => {
subchannels.push(sc);
}
other => panic!("unexpected event {:?}", other),
};
}
subchannels
}
fn verify_connecting_picker(rx_events: &mut mpsc::Receiver<TestEvent>) -> Arc<dyn Picker> {
println!("verify connecting picker");
match rx_events.recv().unwrap() {
TestEvent::UpdatePicker(update) => {
println!("connectivity state is {}", update.connectivity_state);
assert!(update.connectivity_state == ConnectivityState::Connecting);
let req = test_utils::new_request_headers();
assert!(update.picker.pick(&req) == PickResult::Queue);
update.picker
}
other => panic!("unexpected event {:?}", other),
}
}
fn verify_ready_picker(
rx_events: &mut mpsc::Receiver<TestEvent>,
subchannel: Arc<dyn Subchannel>,
) -> Arc<dyn Picker> {
println!("verify ready picker");
match rx_events.recv().unwrap() {
TestEvent::UpdatePicker(update) => {
println!(
"connectivity state for ready picker is {}",
update.connectivity_state
);
assert!(update.connectivity_state == ConnectivityState::Ready);
let req = test_utils::new_request_headers();
match update.picker.pick(&req) {
PickResult::Pick(pick) => {
println!("selected subchannel is {}", pick.subchannel);
println!("should've been selected subchannel is {}", subchannel);
assert!(pick.subchannel == subchannel.clone());
update.picker.clone()
}
other => panic!("unexpected pick result {}", other),
}
}
other => panic!("unexpected event {:?}", other),
}
}
fn verify_roundrobin_ready_picker(
rx_events: &mut mpsc::Receiver<TestEvent>,
) -> Arc<dyn Picker> {
println!("verify ready picker");
match rx_events.recv().unwrap() {
TestEvent::UpdatePicker(update) => {
println!(
"connectivity state for ready picker is {}",
update.connectivity_state
);
assert!(update.connectivity_state == ConnectivityState::Ready);
let req = test_utils::new_request_headers();
match update.picker.pick(&req) {
PickResult::Pick(pick) => update.picker.clone(),
other => panic!("unexpected pick result {}", other),
}
}
other => panic!("unexpected event {:?}", other),
}
}
fn verify_transient_failure_picker(
rx_events: &mut mpsc::Receiver<TestEvent>,
want_error: String,
) -> Arc<dyn Picker> {
(match rx_events.recv().unwrap() {
TestEvent::UpdatePicker(update) => {
assert!(update.connectivity_state == ConnectivityState::TransientFailure);
let req = test_utils::new_request_headers();
match update.picker.pick(&req) {
PickResult::Fail(status) => {
assert!(status.code() == StatusCodeError::Unavailable);
dbg!(status.message());
dbg!(&want_error);
assert!(status.message().contains(&want_error));
update.picker.clone()
}
other => panic!("unexpected pick result {}", other),
}
}
other => panic!("unexpected event {:?}", other),
}) as _
}
fn verify_resolution_request(rx_events: &mut mpsc::Receiver<TestEvent>) {
println!("verifying resolution request");
match rx_events.recv().unwrap() {
TestEvent::RequestResolution => {}
other => panic!("unexpected event {:?}", other),
};
}
fn verify_no_activity(rx_events: &mut mpsc::Receiver<TestEvent>) {
assert!(rx_events.try_recv().is_err());
}
#[test]
fn roundrobin_resolver_error_before_a_valid_update() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_resolver_error_before_a_valid_update");
let tcc = tcc.as_mut();
let resolver_error = String::from("resolver error");
send_resolver_error_to_policy(&mut lb_policy, resolver_error.clone(), tcc);
verify_transient_failure_picker(&mut rx_events, resolver_error);
}
#[test]
fn roundrobin_resolver_error_after_a_valid_update_in_ready() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_resolver_error_after_a_valid_update_in_ready");
let tcc = tcc.as_mut();
let endpoint = create_endpoint(1);
send_resolver_update_to_policy(&mut lb_policy, vec![endpoint], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 1);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_ready_picker(&mut rx_events, subchannels[0].clone());
let resolver_error = String::from("resolver error");
send_resolver_error_to_policy(&mut lb_policy, resolver_error.clone(), tcc);
verify_no_activity(&mut rx_events);
let req = test_utils::new_request_headers();
match picker.pick(&req) {
PickResult::Pick(pick) => {
assert!(pick.subchannel == subchannels[0].clone());
}
other => panic!("unexpected pick result {}", other),
}
}
#[test]
fn roundrobin_resolver_error_after_a_valid_update_in_connecting() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_resolver_error_after_a_valid_update_in_connecting");
let tcc = tcc.as_mut();
let endpoint = create_endpoint(1);
send_resolver_update_to_policy(&mut lb_policy, vec![endpoint], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 1);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
let picker = verify_connecting_picker(&mut rx_events);
let resolver_error = String::from("resolver error");
send_resolver_error_to_policy(&mut lb_policy, resolver_error, tcc);
verify_no_activity(&mut rx_events);
let req = test_utils::new_request_headers();
match picker.pick(&req) {
PickResult::Queue => {}
other => panic!("unexpected pick result {}", other),
}
}
#[test]
fn roundrobin_resolver_error_after_a_valid_update_in_tf() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_resolver_error_after_a_valid_update_in_tf");
let tcc = tcc.as_mut();
let endpoint = create_endpoint(1);
send_resolver_update_to_policy(&mut lb_policy, vec![endpoint], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 1);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
let connection_error = String::from("test connection error");
move_subchannel_to_transient_failure(
&mut lb_policy,
subchannels[0].clone(),
&connection_error,
tcc,
);
verify_transient_failure_picker(&mut rx_events, connection_error);
let resolver_error = String::from("resolver error");
send_resolver_error_to_policy(&mut lb_policy, resolver_error.clone(), tcc);
verify_resolution_request(&mut rx_events);
verify_transient_failure_picker(&mut rx_events, resolver_error);
}
#[test]
fn roundrobin_picks_are_round_robin() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_picks_are_round_robin");
let tcc = tcc.as_mut();
let endpoints = create_endpoints(2, 1);
send_resolver_update_to_policy(&mut lb_policy, endpoints, tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
verify_ready_picker(&mut rx_events, subchannels[0].clone());
move_subchannel_to_state(
&mut lb_policy,
subchannels[1].clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_roundrobin_ready_picker(&mut rx_events);
let req = test_utils::new_request_headers();
let mut picked = Vec::new();
for _ in 0..4 {
match picker.pick(&req) {
PickResult::Pick(pick) => {
println!("picked subchannel is {}", pick.subchannel);
picked.push(pick.subchannel.clone())
}
other => panic!("unexpected pick result {}", other),
}
}
assert!(
picked[0] != picked[1].clone(),
"Should alternate between subchannels"
);
assert_eq!(&picked[0], &picked[2]);
assert_eq!(&picked[1], &picked[3]);
assert!(picked.contains(&subchannels[0]));
assert!(picked.contains(&subchannels[1]));
}
#[test]
fn roundrobin_endpoints_removed() {
let (mut rx_events, mut lb_policy, mut tcc) = setup("stub-roundrobin_addresses_removed");
let tcc = tcc.as_mut();
let endpoints = create_endpoints(2, 1);
send_resolver_update_to_policy(&mut lb_policy, endpoints, tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
let update = ResolverUpdate {
endpoints: Ok(vec![]),
..Default::default()
};
let _ = lb_policy.resolver_update(update, None, tcc);
let want_error = "Received empty address list from the name resolver";
verify_transient_failure_picker(&mut rx_events, want_error.to_string());
verify_resolution_request(&mut rx_events);
}
#[test]
fn roundrobin_one_endpoint_down() {
let (mut rx_events, mut lb_policy, mut tcc) = setup("stub-roundrobin_one_endpoint_down");
let tcc = tcc.as_mut();
let endpoints = create_endpoints(2, 1);
send_resolver_update_to_policy(&mut lb_policy, endpoints, tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_ready_picker(&mut rx_events, subchannels[0].clone());
move_subchannel_to_state(
&mut lb_policy,
subchannels[1].clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_roundrobin_ready_picker(&mut rx_events);
let req = test_utils::new_request_headers();
let mut picked = Vec::new();
for _ in 0..4 {
match picker.pick(&req) {
PickResult::Pick(pick) => {
println!("picked subchannel is {}", pick.subchannel);
picked.push(pick.subchannel.clone())
}
other => panic!("unexpected pick result {}", other),
}
}
assert!(
picked[0] != picked[1].clone(),
"Should alternate between subchannels"
);
assert_eq!(&picked[0], &picked[2]);
assert_eq!(&picked[1], &picked[3]);
assert!(picked.contains(&subchannels[0]));
assert!(picked.contains(&subchannels[1]));
let subchannel_being_removed = subchannels[1].clone();
let error = "endpoint down";
move_subchannel_to_transient_failure(&mut lb_policy, subchannels[1].clone(), error, tcc);
let new_picker = verify_roundrobin_ready_picker(&mut rx_events);
let req = test_utils::new_request_headers();
let mut picked = Vec::new();
for _ in 0..4 {
match new_picker.pick(&req) {
PickResult::Pick(pick) => {
println!("picked subchannel is {}", pick.subchannel);
picked.push(pick.subchannel.clone())
}
other => panic!("unexpected pick result {}", other),
}
}
assert_eq!(&picked[0], &picked[2]);
assert_eq!(&picked[1], &picked[3]);
assert!(picked.contains(&subchannels[0]));
assert!(!picked.contains(&subchannel_being_removed));
}
#[test]
fn roundrobin_pick_after_resolved_updated_hosts() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_pick_after_resolved_updated_hosts");
let tcc = tcc.as_mut();
let addr_one = Address {
address: "subchannel_one".to_string().into(),
..Default::default()
};
let addr_two = Address {
address: "subchannel_two".to_string().into(),
..Default::default()
};
let endpoint_one = Endpoint {
addresses: vec![addr_one],
..Default::default()
};
let endpoint_two = Endpoint {
addresses: vec![addr_two],
..Default::default()
};
send_resolver_update_to_policy(
&mut lb_policy,
vec![endpoint_one, endpoint_two.clone()],
tcc,
);
let all_subchannels = verify_subchannel_creation(&mut rx_events, 2);
let subchannel_one = all_subchannels
.iter()
.find(|sc| sc.address().address == "subchannel_one".to_string().into())
.unwrap();
let subchannel_two = all_subchannels
.iter()
.find(|sc| sc.address().address == "subchannel_two".to_string().into())
.unwrap();
move_subchannel_to_state(
&mut lb_policy,
subchannel_one.clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannel_two.clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannel_one.clone(),
&SubchannelState::ready(),
tcc,
);
verify_ready_picker(&mut rx_events, subchannel_one.clone());
move_subchannel_to_state(
&mut lb_policy,
subchannel_two.clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_roundrobin_ready_picker(&mut rx_events);
let req = test_utils::new_request_headers();
let mut picked = Vec::new();
for _ in 0..4 {
match picker.pick(&req) {
PickResult::Pick(pick) => picked.push(pick.subchannel.clone()),
other => panic!("unexpected pick result {}", other),
}
}
assert!(picked.contains(subchannel_one));
assert!(picked.contains(subchannel_two));
let new_addr = Address {
address: "new".to_string().into(),
..Default::default()
};
let new_endpoint = Endpoint {
addresses: vec![new_addr],
..Default::default()
};
send_resolver_update_to_policy(&mut lb_policy, vec![endpoint_two, new_endpoint], tcc);
let new_subchannels = verify_subchannel_creation(&mut rx_events, 2);
let new_sc = new_subchannels
.iter()
.find(|sc| sc.address().address == "new".to_string().into())
.unwrap();
let old_sc = new_subchannels
.iter()
.find(|sc| sc.address().address == "subchannel_two".to_string().into())
.unwrap();
move_subchannel_to_state(
&mut lb_policy,
old_sc.clone(),
&SubchannelState::ready(),
tcc,
);
let _ = verify_roundrobin_ready_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
new_sc.clone(),
&SubchannelState::connecting(),
tcc,
);
let _ = verify_roundrobin_ready_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
new_sc.clone(),
&SubchannelState::ready(),
tcc,
);
let new_picker = verify_roundrobin_ready_picker(&mut rx_events);
let req = test_utils::new_request_headers();
let mut picked = Vec::new();
for _ in 0..4 {
match new_picker.pick(&req) {
PickResult::Pick(pick) => picked.push(pick.subchannel.clone()),
other => panic!("unexpected pick result {}", other),
}
}
assert!(picked.contains(old_sc));
assert!(picked.contains(new_sc));
assert!(!picked.contains(subchannel_one));
}
#[test]
fn roundrobin_stay_transient_failure_until_ready() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_stay_transient_failure_until_ready");
let tcc = tcc.as_mut();
let endpoints = create_endpoints(2, 1);
send_resolver_update_to_policy(&mut lb_policy, endpoints, tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[1].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
let first_error = String::from("test connection error 1");
move_subchannel_to_transient_failure(
&mut lb_policy,
subchannels[0].clone(),
&first_error,
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_transient_failure(
&mut lb_policy,
subchannels[1].clone(),
&first_error,
tcc,
);
verify_transient_failure_picker(&mut rx_events, first_error);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
verify_ready_picker(&mut rx_events, subchannels[0].clone());
}
#[test]
fn roundrobin_zero_endpoints_from_resolver_before_valid_update() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_zero_endpoints_from_resolver_before_valid_update");
let tcc = tcc.as_mut();
send_resolver_update_to_policy(&mut lb_policy, vec![], tcc);
verify_transient_failure_picker(
&mut rx_events,
"Received empty address list from the name resolver".to_string(),
);
}
#[test]
fn roundrobin_zero_endpoints_from_resolver_after_valid_update() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_zero_endpoints_from_resolver_after_valid_update");
let tcc = tcc.as_mut();
let endpoint = create_endpoint(1);
send_resolver_update_to_policy(&mut lb_policy, vec![endpoint], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 1);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
verify_ready_picker(&mut rx_events, subchannels[0].clone());
let update = ResolverUpdate {
endpoints: Ok(vec![]),
..Default::default()
};
assert!(lb_policy.resolver_update(update, None, tcc).is_err());
verify_transient_failure_picker(
&mut rx_events,
"Received empty address list from the name resolver".to_string(),
);
verify_resolution_request(&mut rx_events);
}
#[test]
fn roundrobin_with_multiple_backends_first_backend_is_ready() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_with_multiple_backends_first_backend_is_ready");
let tcc = tcc.as_mut();
let endpoint = create_endpoints(2, 1);
send_resolver_update_to_policy(&mut lb_policy, endpoint, tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
let picker = verify_ready_picker(&mut rx_events, subchannels[0].clone());
let req = test_utils::new_request_headers();
let first_sc = match picker.pick(&req) {
PickResult::Pick(p) => p.subchannel.clone(),
other => panic!("unexpected pick result {}", other),
};
for _ in 0..7 {
match picker.pick(&req) {
PickResult::Pick(p) => {
assert!(
Arc::ptr_eq(&first_sc, &p.subchannel),
"READY picker should contain exactly one subchannel"
);
}
other => panic!("unexpected pick result {}", other),
}
}
}
#[test]
fn roundrobin_resolver_update_contains_currently_ready_subchannel() {
let (mut rx_events, mut lb_policy, mut tcc) =
setup("stub-roundrobin_resolver_update_contains_currently_ready_subchannel");
let tcc = tcc.as_mut();
let endpoints = create_endpoint(2);
send_resolver_update_to_policy(&mut lb_policy, vec![endpoints], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 2);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::connecting(),
tcc,
);
verify_connecting_picker(&mut rx_events);
move_subchannel_to_state(
&mut lb_policy,
subchannels[0].clone(),
&SubchannelState::ready(),
tcc,
);
verify_ready_picker(&mut rx_events, subchannels[0].clone());
let mut endpoints = create_endpoint(4);
endpoints.addresses.reverse();
send_resolver_update_to_policy(&mut lb_policy, vec![endpoints], tcc);
let subchannels = verify_subchannel_creation(&mut rx_events, 4);
lb_policy.subchannel_update(subchannels[0].clone(), &SubchannelState::idle(), tcc);
lb_policy.subchannel_update(subchannels[1].clone(), &SubchannelState::idle(), tcc);
lb_policy.subchannel_update(subchannels[2].clone(), &SubchannelState::idle(), tcc);
lb_policy.subchannel_update(subchannels[3].clone(), &SubchannelState::ready(), tcc);
verify_ready_picker(&mut rx_events, subchannels[3].clone());
}
}