use std::mem::replace;
use std::sync::Arc;
use crate::client::ConnectivityState;
use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbPolicyBuilder;
use crate::client::load_balancing::LbPolicyOptions;
use crate::client::load_balancing::LbState;
use crate::client::load_balancing::PickResult;
use crate::client::load_balancing::Picker;
use crate::client::load_balancing::Subchannel;
use crate::client::load_balancing::SubchannelState;
use crate::client::load_balancing::WorkScheduler;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
#[derive(Debug)]
pub(crate) struct Lazy<T: LbPolicyBuilder> {
inner: Inner<T>,
}
#[derive(Debug)]
enum Inner<T: LbPolicyBuilder> {
Void,
Pending(Pending<T>),
Built(T::LbPolicy),
}
#[derive(Debug)]
struct Pending<T: LbPolicyBuilder> {
delegate_builder: T,
options: LbPolicyOptions,
latest_state: Option<(ResolverUpdate, Option<<T::LbPolicy as LbPolicy>::LbConfig>)>,
}
impl<T: LbPolicyBuilder> Lazy<T> {
pub fn new(
delegate_builder: T,
options: LbPolicyOptions,
channel_controller: &mut dyn ChannelController,
) -> Self {
channel_controller.update_picker(LbState {
connectivity_state: ConnectivityState::Idle,
picker: Arc::new(WakeUpPicker::new(options.work_scheduler.clone())),
});
Self {
inner: Inner::Pending(Pending {
delegate_builder,
options,
latest_state: None,
}),
}
}
}
impl<T: LbPolicyBuilder> LbPolicy for Lazy<T>
where
<T::LbPolicy as LbPolicy>::LbConfig: Clone,
{
type LbConfig = <T::LbPolicy as LbPolicy>::LbConfig;
fn resolver_update(
&mut self,
update: ResolverUpdate,
config: Option<&Self::LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
match &mut self.inner {
Inner::Void => unreachable!(),
Inner::Pending(pending) => {
pending.latest_state = Some((update, config.cloned()));
Ok(())
}
Inner::Built(delegate) => delegate.resolver_update(update, config, channel_controller),
}
}
fn subchannel_update(
&mut self,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
channel_controller: &mut dyn ChannelController,
) {
if let Inner::Built(delegate) = &mut self.inner {
delegate.subchannel_update(subchannel, state, channel_controller);
}
}
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
if let Inner::Built(delegate) = &mut self.inner {
delegate.work(channel_controller);
} else {
self.exit_idle(channel_controller);
}
}
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
if let Inner::Built(delegate) = &mut self.inner {
delegate.exit_idle(channel_controller);
return;
}
let Inner::Pending(Pending {
delegate_builder,
options,
latest_state,
}) = replace(&mut self.inner, Inner::Void)
else {
unreachable!();
};
let mut delegate = delegate_builder.build(options);
if let Some((update, config)) = latest_state {
if delegate
.resolver_update(update, config.as_ref(), channel_controller)
.is_err()
{
channel_controller.request_resolution();
}
} else {
delegate.exit_idle(channel_controller);
}
self.inner = Inner::Built(delegate);
}
}
#[derive(Debug)]
pub struct WakeUpPicker {
work_scheduler: Arc<dyn WorkScheduler>,
}
impl WakeUpPicker {
fn new(work_scheduler: Arc<dyn WorkScheduler>) -> Self {
Self { work_scheduler }
}
}
impl Picker for WakeUpPicker {
fn pick(&self, request: &RequestHeaders) -> PickResult {
self.work_scheduler.schedule_work();
PickResult::Queue
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::mpsc;
use super::*;
use crate::client::load_balancing::test_utils::TestChannelController;
use crate::client::load_balancing::test_utils::TestEvent;
use crate::client::load_balancing::test_utils::TestWorkScheduler;
use crate::client::load_balancing::test_utils::new_request_headers;
#[derive(Debug, PartialEq, Eq)]
enum MockEvent {
Build,
ResolverUpdate,
SubchannelUpdate,
Work,
ExitIdle,
}
#[test]
fn test_lazy_build_on_exit_idle() {
let (builder, rx) = MockPolicy::new();
let (tx_events, rx_events) = mpsc::channel();
let mut cc = TestChannelController {
tx_events: tx_events.clone(),
};
let options = LbPolicyOptions {
work_scheduler: Arc::new(TestWorkScheduler { tx_events }),
runtime: crate::rt::default_runtime(),
};
let mut lazy = Lazy::new(builder, options, &mut cc);
let event = rx_events.recv().unwrap();
let TestEvent::UpdatePicker(lb_state) = event else {
panic!("expected UpdatePicker event");
};
assert_eq!(lb_state.connectivity_state, ConnectivityState::Idle);
lazy.resolver_update(ResolverUpdate::default(), None, &mut cc)
.unwrap();
assert!(rx.try_recv().is_err());
lazy.exit_idle(&mut cc);
assert_eq!(rx.recv().unwrap(), MockEvent::Build);
assert_eq!(rx.recv().unwrap(), MockEvent::ResolverUpdate);
assert!(rx.try_recv().is_err());
}
#[test]
fn test_lazy_build_on_pick() {
let (builder, rx) = MockPolicy::new();
let (tx_events, rx_events) = mpsc::channel();
let mut cc = TestChannelController {
tx_events: tx_events.clone(),
};
let options = LbPolicyOptions {
work_scheduler: Arc::new(TestWorkScheduler { tx_events }),
runtime: crate::rt::default_runtime(),
};
let mut lazy = Lazy::new(builder, options, &mut cc);
let event = rx_events.recv().unwrap();
let TestEvent::UpdatePicker(lb_state) = event else {
panic!("expected UpdatePicker event");
};
lazy.resolver_update(ResolverUpdate::default(), None, &mut cc)
.unwrap();
let res = lb_state.picker.pick(&new_request_headers());
assert!(matches!(res, PickResult::Queue));
let event = rx_events.recv().unwrap();
assert!(matches!(event, TestEvent::ScheduleWork));
lazy.work(&mut cc);
assert_eq!(rx.recv().unwrap(), MockEvent::Build);
assert_eq!(rx.recv().unwrap(), MockEvent::ResolverUpdate);
assert!(rx.try_recv().is_err());
}
#[test]
fn test_lazy_exit_idle_without_update() {
let (builder, rx) = MockPolicy::new();
let (tx_events, rx_events) = mpsc::channel();
let mut cc = TestChannelController {
tx_events: tx_events.clone(),
};
let options = LbPolicyOptions {
work_scheduler: Arc::new(TestWorkScheduler { tx_events }),
runtime: crate::rt::default_runtime(),
};
let mut lazy = Lazy::new(builder, options, &mut cc);
assert!(matches!(
rx_events.recv().unwrap(),
TestEvent::UpdatePicker(_)
));
lazy.exit_idle(&mut cc);
assert_eq!(rx.recv().unwrap(), MockEvent::Build);
assert_eq!(rx.recv().unwrap(), MockEvent::ExitIdle);
assert!(rx.try_recv().is_err());
}
#[test]
fn test_lazy_build_on_pick_without_update() {
let (builder, rx) = MockPolicy::new();
let (tx_events, rx_events) = mpsc::channel();
let mut cc = TestChannelController {
tx_events: tx_events.clone(),
};
let options = LbPolicyOptions {
work_scheduler: Arc::new(TestWorkScheduler { tx_events }),
runtime: crate::rt::default_runtime(),
};
let mut lazy = Lazy::new(builder, options, &mut cc);
let event = rx_events.recv().unwrap();
let TestEvent::UpdatePicker(lb_state) = event else {
panic!("expected UpdatePicker event");
};
let res = lb_state.picker.pick(&new_request_headers());
assert!(matches!(res, PickResult::Queue));
let event = rx_events.recv().unwrap();
assert!(matches!(event, TestEvent::ScheduleWork));
lazy.work(&mut cc);
assert_eq!(rx.recv().unwrap(), MockEvent::Build);
assert_eq!(rx.recv().unwrap(), MockEvent::ExitIdle);
assert!(rx.try_recv().is_err());
}
#[derive(Debug, Clone)]
struct MockPolicy {
tx: mpsc::Sender<MockEvent>,
}
impl MockPolicy {
fn new() -> (Self, mpsc::Receiver<MockEvent>) {
let (tx, rx) = mpsc::channel();
(Self { tx }, rx)
}
}
impl LbPolicyBuilder for MockPolicy {
type LbPolicy = Self;
fn build(&self, _options: LbPolicyOptions) -> Self {
self.tx.send(MockEvent::Build).unwrap();
self.clone()
}
fn name(&self) -> &'static str {
"mock"
}
}
impl LbPolicy for MockPolicy {
type LbConfig = ();
fn resolver_update(
&mut self,
_update: ResolverUpdate,
_config: Option<&()>,
_channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
self.tx.send(MockEvent::ResolverUpdate).unwrap();
Ok(())
}
fn subchannel_update(
&mut self,
_subchannel: Arc<dyn Subchannel>,
_state: &SubchannelState,
_channel_controller: &mut dyn ChannelController,
) {
self.tx.send(MockEvent::SubchannelUpdate).unwrap();
}
fn work(&mut self, _channel_controller: &mut dyn ChannelController) {
self.tx.send(MockEvent::Work).unwrap();
}
fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) {
self.tx.send(MockEvent::ExitIdle).unwrap();
}
}
}