use core::panic;
use std::any::Any;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;
use crate::StatusCodeError;
use crate::StatusError;
use crate::client::ConnectivityState;
use crate::client::load_balancing::subchannel::Subchannel;
use crate::client::load_balancing::subchannel::SubchannelState;
use crate::client::name_resolution::Address;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::GrpcRuntime;
pub(crate) mod child_manager;
pub(crate) mod graceful_switch;
pub(crate) mod lazy;
pub(crate) mod pick_first;
pub(crate) mod round_robin;
pub(crate) mod subchannel;
pub(crate) mod subchannel_sharing;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod registry;
pub(crate) use registry::GLOBAL_LB_REGISTRY;
pub(crate) trait LbPolicyBuilder: Send + Sync + Debug + 'static {
type LbPolicy: LbPolicy;
fn build(&self, options: LbPolicyOptions) -> Self::LbPolicy;
fn name(&self) -> &'static str;
fn parse_config(
&self,
_config: &ParsedJsonLbConfig,
) -> Result<Option<<Self::LbPolicy as LbPolicy>::LbConfig>, String> {
Ok(None)
}
}
pub(crate) trait LbPolicy: Send + Sync + Debug + 'static {
type LbConfig: Any + Send + Sync + Debug + 'static;
fn resolver_update(
&mut self,
update: ResolverUpdate,
config: Option<&Self::LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String>;
fn subchannel_update(
&mut self,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
channel_controller: &mut dyn ChannelController,
);
fn work(&mut self, channel_controller: &mut dyn ChannelController);
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController);
}
#[derive(Debug)]
pub(crate) struct LbPolicyOptions {
pub work_scheduler: Arc<dyn WorkScheduler>,
pub runtime: GrpcRuntime,
}
pub(crate) trait WorkScheduler: Send + Sync + Debug {
fn schedule_work(&self);
}
#[derive(Debug)]
pub(crate) struct ParsedJsonLbConfig {
value: serde_json::Value,
}
impl ParsedJsonLbConfig {
pub fn new(json: &str) -> Result<Self, String> {
match serde_json::from_str(json) {
Ok(value) => Ok(ParsedJsonLbConfig { value }),
Err(e) => Err(format!("failed to parse LB config JSON: {e}")),
}
}
pub(crate) fn from_value(value: serde_json::Value) -> Self {
Self { value }
}
pub fn convert_to<T: serde::de::DeserializeOwned>(
&self,
) -> Result<T, Box<dyn Error + Send + Sync>> {
let res: T = match serde_json::from_value(self.value.clone()) {
Ok(v) => v,
Err(e) => {
return Err(format!("{e}").into());
}
};
Ok(res)
}
}
pub(crate) trait ChannelController: Send + Sync {
fn new_subchannel(&mut self, address: &Address) -> (Arc<dyn Subchannel>, SubchannelState);
fn update_picker(&mut self, update: LbState);
fn request_resolution(&mut self);
}
pub(crate) trait Picker: Send + Sync + Debug {
fn pick(&self, request: &RequestHeaders) -> PickResult;
}
#[derive(Debug)]
pub(crate) enum PickResult {
Pick(Pick),
Queue,
Fail(StatusError),
Drop(StatusError),
}
impl PickResult {
pub fn unwrap_pick(self) -> Pick {
let PickResult::Pick(pick) = self else {
panic!("Called `PickResult::unwrap_pick` on a `Queue` or `Err` value");
};
pick
}
}
impl PartialEq for PickResult {
fn eq(&self, other: &Self) -> bool {
match self {
PickResult::Pick(pick) => match other {
PickResult::Pick(other_pick) => pick.subchannel == other_pick.subchannel.clone(),
_ => false,
},
PickResult::Queue => matches!(other, PickResult::Queue),
PickResult::Fail(status) => {
false
}
PickResult::Drop(status) => {
false
}
}
}
}
impl Display for PickResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pick(_) => write!(f, "Pick"),
Self::Queue => write!(f, "Queue"),
Self::Fail(st) => write!(f, "Fail({st:?})"),
Self::Drop(st) => write!(f, "Drop({st:?})"),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct LbState {
pub connectivity_state: super::ConnectivityState,
pub picker: Arc<dyn Picker>,
}
impl PartialEq for LbState {
fn eq(&self, other: &Self) -> bool {
self.connectivity_state == other.connectivity_state
&& std::ptr::addr_eq(Arc::as_ptr(&self.picker), Arc::as_ptr(&other.picker))
}
}
impl Eq for LbState {}
impl LbState {
pub fn initial() -> Self {
Self {
connectivity_state: ConnectivityState::Connecting,
picker: Arc::new(QueuingPicker {}),
}
}
}
pub(crate) type CompletionCallback = Box<dyn Fn() + Send + Sync>;
pub(crate) struct Pick {
pub subchannel: Arc<dyn Subchannel>,
pub metadata: MetadataMap,
pub on_complete: Option<CompletionCallback>,
}
impl Debug for Pick {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pick")
.field("subchannel", &self.subchannel)
.field("metadata", &self.metadata)
.field("on_complete", &format_args!("{:p}", &self.on_complete))
.finish()
}
}
#[derive(Debug)]
pub(crate) struct OneSubchannelPicker {
sc: Arc<dyn Subchannel>,
}
impl Picker for OneSubchannelPicker {
fn pick(&self, _: &RequestHeaders) -> PickResult {
PickResult::Pick(Pick {
subchannel: self.sc.clone(),
metadata: MetadataMap::new(),
on_complete: None,
})
}
}
#[derive(Debug)]
pub(crate) struct QueuingPicker;
impl Picker for QueuingPicker {
fn pick(&self, _request: &RequestHeaders) -> PickResult {
PickResult::Queue
}
}
#[derive(Debug)]
pub(crate) struct FailingPicker {
pub error: String,
}
impl Picker for FailingPicker {
fn pick(&self, _: &RequestHeaders) -> PickResult {
PickResult::Fail(StatusError::new(
StatusCodeError::Unavailable,
self.error.clone(),
))
}
}
pub(crate) type DynLbConfig = Arc<dyn Any + Send + Sync>;
pub(crate) type DynLbPolicyBuilder = dyn LbPolicyBuilder<LbPolicy = Box<DynLbPolicy>>;
pub(crate) type DynLbPolicy = dyn LbPolicy<LbConfig = DynLbConfig>;
impl<T: LbPolicy + ?Sized> LbPolicy for Box<T> {
type LbConfig = T::LbConfig;
fn resolver_update(
&mut self,
update: ResolverUpdate,
config: Option<&Self::LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), String> {
(**self).resolver_update(update, config, channel_controller)
}
fn subchannel_update(
&mut self,
subchannel: Arc<dyn Subchannel>,
state: &SubchannelState,
channel_controller: &mut dyn ChannelController,
) {
(**self).subchannel_update(subchannel, state, channel_controller);
}
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
(**self).work(channel_controller);
}
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
(**self).exit_idle(channel_controller)
}
}