use std::collections::HashMap;
use std::sync::Arc;
use crate::client::ConnectivityState;
use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::DynLbConfig;
use crate::client::load_balancing::DynLbPolicyBuilder;
use crate::client::load_balancing::GLOBAL_LB_REGISTRY;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbState;
use crate::client::load_balancing::ParsedJsonLbConfig;
use crate::client::load_balancing::Subchannel;
use crate::client::load_balancing::SubchannelState;
use crate::client::load_balancing::WorkScheduler;
use crate::client::load_balancing::child_manager::ChildManager;
use crate::client::load_balancing::child_manager::ChildUpdate;
use crate::client::name_resolution::ResolverUpdate;
use crate::rt::GrpcRuntime;
#[derive(Debug)]
pub(crate) struct GracefulSwitchLbConfig {
child_builder: Arc<DynLbPolicyBuilder>,
child_config: Option<DynLbConfig>,
}
#[derive(Debug)]
pub(crate) struct GracefulSwitchPolicy {
child_manager: ChildManager<()>, last_update: Option<LbState>, active_child_builder: Option<Arc<DynLbPolicyBuilder>>,
}
impl LbPolicy for GracefulSwitchPolicy {
type LbConfig = GracefulSwitchLbConfig;
fn resolver_update(
&mut self,
update: ResolverUpdate,
config: Option<&Self::LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
let config = config.ok_or("graceful switch received no config")?;
if self.active_child_builder.is_none() {
self.active_child_builder = Some(config.child_builder.clone());
}
let active_child_builder = self.active_child_builder.as_ref().unwrap();
let mut children = Vec::with_capacity(2);
children.push(ChildUpdate {
child_policy_builder: config.child_builder.clone(),
child_identifier: (),
child_update: Some((update, config.child_config.as_ref())),
});
if config.child_builder.name() != active_child_builder.name() {
children.push(ChildUpdate {
child_policy_builder: active_child_builder.clone(),
child_identifier: (),
child_update: None,
});
}
let res = self.child_manager.update(children, channel_controller);
self.update_picker(channel_controller);
res
}
fn subchannel_update(
&mut self,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
channel_controller: &mut dyn ChannelController,
) {
self.child_manager
.subchannel_update(subchannel, state, channel_controller);
self.update_picker(channel_controller);
}
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.work(channel_controller);
self.update_picker(channel_controller);
}
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.exit_idle(channel_controller);
self.update_picker(channel_controller);
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
enum ChildKind {
Current,
Pending,
}
impl GracefulSwitchPolicy {
pub fn new(runtime: GrpcRuntime, work_scheduler: Arc<dyn WorkScheduler>) -> Self {
GracefulSwitchPolicy {
child_manager: ChildManager::new(runtime, work_scheduler),
last_update: None,
active_child_builder: None,
}
}
pub fn parse_config(config: &ParsedJsonLbConfig) -> Result<GracefulSwitchLbConfig, String> {
let cfg: Vec<HashMap<String, serde_json::Value>> = match config.convert_to() {
Ok(c) => c,
Err(e) => {
return Err(format!("failed to parse JSON config: {}", e));
}
};
for c in cfg {
if c.len() != 1 {
return Err(format!(
"Each element in array must contain exactly one policy name/config; found {:?}",
c.keys()
));
}
let (policy_name, policy_config) = c.into_iter().next().unwrap();
let Some(child_builder) = GLOBAL_LB_REGISTRY.get_policy(policy_name.as_str()) else {
continue;
};
let parsed_config = ParsedJsonLbConfig {
value: policy_config,
};
let child_config = child_builder.parse_config(&parsed_config)?;
let gsb_config = GracefulSwitchLbConfig {
child_builder,
child_config,
};
return Ok(gsb_config);
}
Err("no supported policies found in config".into())
}
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController) {
let Some(update) = self.maybe_swap(channel_controller) else {
return;
};
if self.last_update.as_ref().is_some_and(|lu| lu == &update) {
return;
}
channel_controller.update_picker(update.clone());
self.last_update = Some(update);
}
fn maybe_swap(&mut self, channel_controller: &mut dyn ChannelController) -> Option<LbState> {
if !self.child_manager.child_updated() {
return None;
}
let Some(active_child_builder) = &self.active_child_builder else {
return None;
};
let active_name = active_child_builder.name();
let mut active_child = None;
let mut pending_child = None;
for child in self.child_manager.children() {
if child.builder.name() == active_name {
active_child = Some(child);
} else {
pending_child = Some(child);
}
}
let active_child = active_child.expect("There should always be an active child policy");
let Some(pending_child) = pending_child else {
return Some(active_child.state.clone());
};
if active_child.state.connectivity_state == ConnectivityState::Ready
&& pending_child.state.connectivity_state == ConnectivityState::Connecting
{
return Some(active_child.state.clone());
}
let pending_child_builder = pending_child.builder.clone();
let pending_state = pending_child.state.clone();
self.active_child_builder = Some(pending_child_builder.clone());
self.child_manager
.retain_children([((), pending_child_builder)]);
Some(pending_state)
}
}
#[cfg(test)]
mod test {
use std::panic;
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbState;
use crate::client::load_balancing::ParsedJsonLbConfig;
use crate::client::load_balancing::Pick;
use crate::client::load_balancing::PickResult;
use crate::client::load_balancing::Picker;
use crate::client::load_balancing::Subchannel;
use crate::client::load_balancing::SubchannelState;
use crate::client::load_balancing::graceful_switch::GracefulSwitchPolicy;
use crate::client::load_balancing::test_utils::StubPolicyData;
use crate::client::load_balancing::test_utils::StubPolicyFuncs;
use crate::client::load_balancing::test_utils::TestChannelController;
use crate::client::load_balancing::test_utils::TestEvent;
use crate::client::load_balancing::test_utils::TestSubchannel;
use crate::client::load_balancing::test_utils::TestWorkScheduler;
use crate::client::load_balancing::test_utils::reg_stub_policy;
use crate::client::load_balancing::test_utils::{self};
use crate::client::name_resolution::Address;
use crate::client::name_resolution::Endpoint;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::default_runtime;
const DEFAULT_TEST_SHORT_TIMEOUT: Duration = Duration::from_millis(10);
struct TestSubchannelList {
subchannels: Vec<Arc<dyn Subchannel>>,
}
impl TestSubchannelList {
fn new(addresses: &Vec<Address>, channel_controller: &mut dyn ChannelController) -> Self {
let mut scl = TestSubchannelList {
subchannels: Vec::new(),
};
for address in addresses {
let (sc, _state) = channel_controller.new_subchannel(address);
scl.subchannels.push(sc.clone());
}
scl
}
fn contains(&self, sc: &Arc<dyn Subchannel>) -> bool {
self.subchannels.contains(sc)
}
}
#[derive(Debug)]
struct TestPicker {
name: &'static str,
}
impl TestPicker {
fn new(name: &'static str) -> Self {
Self { name }
}
}
impl Picker for TestPicker {
fn pick(&self, _req: &RequestHeaders) -> PickResult {
PickResult::Pick(Pick {
subchannel: Arc::new(TestSubchannel::new(
Address {
address: self.name.to_string().into(),
..Default::default()
},
mpsc::channel().0,
)),
metadata: MetadataMap::new(),
on_complete: None,
})
}
}
struct TestState {
subchannel_list: TestSubchannelList,
}
fn create_funcs_for_gracefulswitch_tests(name: &'static str) -> StubPolicyFuncs {
StubPolicyFuncs {
resolver_update: Some(Arc::new(
move |data: &mut StubPolicyData, update: ResolverUpdate, _, channel_controller| {
if let Ok(ref endpoints) = update.endpoints {
let addresses: Vec<_> = endpoints
.iter()
.flat_map(|ep| ep.addresses.clone())
.collect();
let scl = TestSubchannelList::new(&addresses, channel_controller);
let child_state = TestState {
subchannel_list: scl,
};
data.test_data = Some(Box::new(child_state));
} else {
data.test_data = None;
}
Ok(())
},
)),
subchannel_update: Some(Arc::new(
move |data: &mut StubPolicyData, updated_subchannel, state, channel_controller| {
let test_data = data.test_data.as_mut().unwrap();
let test_state = test_data.downcast_mut::<TestState>().unwrap();
let scl = &mut test_state.subchannel_list;
assert!(
scl.contains(&updated_subchannel),
"subchannel_update received an update for a subchannel it does not own."
);
channel_controller.update_picker(LbState {
connectivity_state: state.connectivity_state,
picker: Arc::new(TestPicker { name }),
});
},
)),
..Default::default()
}
}
fn setup() -> (
mpsc::Receiver<TestEvent>,
GracefulSwitchPolicy,
Box<dyn ChannelController>,
) {
let (tx_events, rx_events) = mpsc::channel::<TestEvent>();
let work_scheduler = Arc::new(TestWorkScheduler {
tx_events: tx_events.clone(),
});
let tcc = Box::new(TestChannelController {
tx_events: tx_events.clone(),
});
let graceful_switch =
GracefulSwitchPolicy::new(default_runtime(), Arc::new(TestWorkScheduler { tx_events }));
(rx_events, graceful_switch, tcc)
}
fn create_endpoint_with_one_address(addr: String) -> Endpoint {
Endpoint {
addresses: vec![Address {
address: addr.into(),
..Default::default()
}],
..Default::default()
}
}
fn verify_subchannel_creation_from_policy(
rx_events: &mut mpsc::Receiver<TestEvent>,
) -> Arc<dyn Subchannel> {
match rx_events.recv().unwrap() {
TestEvent::NewSubchannel(sc) => sc,
other => panic!("unexpected event {:?}", other),
}
}
fn verify_correct_picker_from_policy(rx_events: &mut mpsc::Receiver<TestEvent>, name: &str) {
println!("verify ready picker");
let event = rx_events.recv().unwrap();
let TestEvent::UpdatePicker(update) = event else {
panic!("unexpected event {:?}", event);
};
let req = test_utils::new_request_headers();
println!("{:?}", update.connectivity_state);
let pick = update.picker.pick(&req);
let PickResult::Pick(pick) = pick else {
panic!("unexpected pick result: {:?}", pick);
};
let received_address = &pick.subchannel.address().address.to_string();
let expected_address = name.to_string();
assert_eq!(received_address, &expected_address);
}
fn move_subchannel_to_state(
lb_policy: &mut impl LbPolicy,
subchannel: Arc<dyn Subchannel>,
tcc: &mut dyn ChannelController,
state: &SubchannelState,
) {
lb_policy.subchannel_update(subchannel, state, tcc);
}
#[test]
fn gracefulswitch_successful_first_update() {
reg_stub_policy(
"stub-gracefulswitch_successful_first_update-one",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_successful_first_update-one",
),
);
reg_stub_policy(
"stub-gracefulswitch_successful_first_update-two",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_successful_first_update-two",
),
);
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
let service_config = serde_json::json!([
{ "stub-gracefulswitch_successful_first_update-one": serde_json::json!({}) },
{ "stub-gracefulswitch_successful_first_update-two": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
subchannel,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_successful_first_update-one",
);
}
#[test]
fn gracefulswitch_switching_to_resolver_update() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_switching_to_resolver_update-one",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_switching_to_resolver_update-one",
),
);
reg_stub_policy(
"stub-gracefulswitch_switching_to_resolver_update-two",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_switching_to_resolver_update-two",
),
);
let service_config = serde_json::json!([
{ "stub-gracefulswitch_switching_to_resolver_update-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
subchannel,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_switching_to_resolver_update-one",
);
let new_service_config = serde_json::json!([
{ "stub-gracefulswitch_switching_to_resolver_update-two": serde_json::json!({}) }
]
);
let new_parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: new_service_config,
})
.unwrap();
graceful_switch
.resolver_update(update.clone(), Some(&new_parsed_config), &mut *tcc)
.unwrap();
let subchannel_two = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
subchannel_two,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_switching_to_resolver_update-two",
);
assert_channel_empty(&mut rx_events);
}
fn assert_channel_empty(rx_events: &mut mpsc::Receiver<TestEvent>) {
assert!(rx_events.try_recv().is_err());
}
#[test]
fn gracefulswitch_two_policies_same_type() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_two_policies_same_type-one",
create_funcs_for_gracefulswitch_tests("stub-gracefulswitch_two_policies_same_type-one"),
);
let service_config = serde_json::json!(
[
{ "stub-gracefulswitch_two_policies_same_type-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
subchannel,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_two_policies_same_type-one",
);
let service_config2 = serde_json::json!(
[
{ "stub-gracefulswitch_two_policies_same_type-one": serde_json::json!({}) }
]
);
let parsed_config2 = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config2,
})
.unwrap();
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config2), &mut *tcc)
.unwrap();
let subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
assert_eq!(&*subchannel.address().address, "127.0.0.1:1234");
assert_channel_empty(&mut rx_events);
}
#[test]
fn gracefulswitch_current_not_ready_pending_update() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_current_not_ready_pending_update-one",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_current_not_ready_pending_update-one",
),
);
reg_stub_policy(
"stub-gracefulswitch_current_not_ready_pending_update-two",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_current_not_ready_pending_update-two",
),
);
let service_config = serde_json::json!([
{ "stub-gracefulswitch_current_not_ready_pending_update-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let second_endpoint = create_endpoint_with_one_address("0.0.0.0.0".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let current_subchannels = verify_subchannel_creation_from_policy(&mut rx_events);
assert_channel_empty(&mut rx_events);
let new_service_config = serde_json::json!([
{ "stub-gracefulswitch_current_not_ready_pending_update-two": serde_json::json!({ "shuffleAddressList": false }) },
]
);
let second_update = ResolverUpdate {
endpoints: Ok(vec![second_endpoint.clone()]),
..Default::default()
};
let new_parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: new_service_config,
})
.unwrap();
graceful_switch
.resolver_update(second_update.clone(), Some(&new_parsed_config), &mut *tcc)
.unwrap();
let second_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
assert_channel_empty(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
second_subchannel,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_not_ready_pending_update-two",
);
assert_channel_empty(&mut rx_events);
}
#[test]
fn gracefulswitch_current_leaving_ready() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_current_leaving_ready-one",
create_funcs_for_gracefulswitch_tests("stub-gracefulswitch_current_leaving_ready-one"),
);
reg_stub_policy(
"stub-gracefulswitch_current_leaving_ready-two",
create_funcs_for_gracefulswitch_tests("stub-gracefulswitch_current_leaving_ready-two"),
);
let service_config = serde_json::json!([
{ "stub-gracefulswitch_current_leaving_ready-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let endpoint2 = create_endpoint_with_one_address("127.0.0.1:1235".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let current_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
current_subchannel.clone(),
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_leaving_ready-one",
);
let new_service_config = serde_json::json!(
[
{ "stub-gracefulswitch_current_leaving_ready-two": serde_json::json!({}) },
]
);
let new_update = ResolverUpdate {
endpoints: Ok(vec![endpoint2.clone()]),
..Default::default()
};
let new_parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: new_service_config,
})
.unwrap();
graceful_switch
.resolver_update(new_update.clone(), Some(&new_parsed_config), &mut *tcc)
.unwrap();
let pending_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
pending_subchannel,
tcc.as_mut(),
&SubchannelState::connecting(),
);
assert_channel_empty(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
current_subchannel,
tcc.as_mut(),
&SubchannelState::connecting(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_leaving_ready-two",
);
}
#[test]
fn gracefulswitch_pending_leaving_connecting() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_current_leaving_ready-one",
create_funcs_for_gracefulswitch_tests("stub-gracefulswitch_current_leaving_ready-one"),
);
reg_stub_policy(
"stub-gracefulswitch_current_leaving_ready-two",
create_funcs_for_gracefulswitch_tests("stub-gracefulswitch_current_leaving_ready-two"),
);
let service_config = serde_json::json!(
[
{ "stub-gracefulswitch_current_leaving_ready-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let endpoint2 = create_endpoint_with_one_address("127.0.0.1:1235".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let current_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
current_subchannel,
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_leaving_ready-one",
);
let new_service_config = serde_json::json!(
[
{ "stub-gracefulswitch_current_leaving_ready-two": serde_json::json!({}) },
]
);
let new_update = ResolverUpdate {
endpoints: Ok(vec![endpoint2.clone()]),
..Default::default()
};
let new_parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: new_service_config,
})
.unwrap();
graceful_switch
.resolver_update(new_update.clone(), Some(&new_parsed_config), &mut *tcc)
.unwrap();
let pending_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
pending_subchannel.clone(),
tcc.as_mut(),
&SubchannelState::transient_failure("n/a"),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_leaving_ready-two",
);
move_subchannel_to_state(
&mut graceful_switch,
pending_subchannel,
tcc.as_mut(),
&SubchannelState::connecting(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_current_leaving_ready-two",
);
}
#[test]
fn gracefulswitch_subchannels_removed_after_current_child_swapped() {
let (mut rx_events, mut graceful_switch, mut tcc) = setup();
reg_stub_policy(
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-one",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-one",
),
);
reg_stub_policy(
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-two",
create_funcs_for_gracefulswitch_tests(
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-two",
),
);
let service_config = serde_json::json!(
[
{ "stub-gracefulswitch_subchannels_removed_after_current_child_swapped-one": serde_json::json!({}) }
]
);
let parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: service_config,
})
.unwrap();
let endpoint = create_endpoint_with_one_address("127.0.0.1:1234".to_string());
let update = ResolverUpdate {
endpoints: Ok(vec![endpoint.clone()]),
..Default::default()
};
graceful_switch
.resolver_update(update.clone(), Some(&parsed_config), &mut *tcc)
.unwrap();
let current_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
move_subchannel_to_state(
&mut graceful_switch,
current_subchannel.clone(),
tcc.as_mut(),
&SubchannelState::ready(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-one",
);
let new_service_config = serde_json::json!(
[
{ "stub-gracefulswitch_subchannels_removed_after_current_child_swapped-two": serde_json::json!({ "shuffleAddressList": false }) },
]
);
let second_endpoint = create_endpoint_with_one_address("127.0.0.1:1235".to_string());
let second_update = ResolverUpdate {
endpoints: Ok(vec![second_endpoint.clone()]),
..Default::default()
};
let new_parsed_config = GracefulSwitchPolicy::parse_config(&ParsedJsonLbConfig {
value: new_service_config,
})
.unwrap();
graceful_switch
.resolver_update(second_update.clone(), Some(&new_parsed_config), &mut *tcc)
.unwrap();
let pending_subchannel = verify_subchannel_creation_from_policy(&mut rx_events);
println!("moving subchannel to idle");
move_subchannel_to_state(
&mut graceful_switch,
pending_subchannel,
tcc.as_mut(),
&SubchannelState::idle(),
);
verify_correct_picker_from_policy(
&mut rx_events,
"stub-gracefulswitch_subchannels_removed_after_current_child_swapped-two",
);
assert!(Arc::strong_count(¤t_subchannel) == 1);
}
}