ydb 0.10.2

Crate contains generated low-level grpc code from YDB API protobuf, used as base for ydb crate
Documentation
use crate::grpc_wrapper::runtime_interceptors::{
    ChannelResponse, GrpcInterceptor, InterceptorError, InterceptorRequest, InterceptorResult,
    RequestMetadata,
};
use crate::Discovery;
use http::uri::PathAndQuery;
use http::Uri;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::trace;

pub(crate) struct ChannelErrorInfo {
    pub(crate) endpoint: Uri,
}

pub(crate) struct DiscoveryPessimizationInterceptor {
    sender: UnboundedSender<ChannelErrorInfo>,
}

impl DiscoveryPessimizationInterceptor {
    pub fn new(discovery: Arc<Box<dyn Discovery>>) -> Self {
        let (channel_error_sender, channel_error_receiver) = mpsc::unbounded_channel();
        tokio::spawn(async move {
            Self::node_pessimization_loop(discovery, channel_error_receiver).await;
        });
        Self {
            sender: channel_error_sender,
        }
    }

    async fn node_pessimization_loop(
        discovery: Arc<Box<dyn Discovery>>,
        mut errors: UnboundedReceiver<ChannelErrorInfo>,
    ) {
        loop {
            if let Some(err) = errors.recv().await {
                discovery.pessimization(&err.endpoint)
            } else {
                return;
            };
        }
    }
}

impl GrpcInterceptor for DiscoveryPessimizationInterceptor {
    fn on_call(
        &self,
        metadata: &mut RequestMetadata,
        req: InterceptorRequest,
    ) -> InterceptorResult<InterceptorRequest> {
        *metadata = Some(Box::new(req.uri().clone()));
        Ok(req)
    }

    fn on_feature_poll_ready(
        &self,
        metadata: &mut RequestMetadata,
        res: Result<ChannelResponse, InterceptorError>,
    ) -> Result<ChannelResponse, InterceptorError> {
        if res.is_err() {
            let uri = metadata
                .as_mut()
                .unwrap()
                .downcast_mut::<Uri>()
                .unwrap()
                .clone();

            let mut parts = uri.into_parts();
            parts.path_and_query = Some(PathAndQuery::from_static(""));
            let uri = Uri::from_parts(parts).map_err(|err| {
                InterceptorError::custom(format!(
                    "failed to trim uri path for send node pessimize err: '{err:?}'"
                ))
            })?;

            fn result_to_str(res: Result<(), SendError<ChannelErrorInfo>>) -> &'static str {
                if res.is_ok() {
                    "OK"
                } else {
                    "receiver closed"
                }
            }

            let send_result = self.sender.send(ChannelErrorInfo {
                endpoint: uri.clone(),
            });
            trace!(
                "GrpcInterceptor sent error for uri: '{}' with result: {:?}",
                &uri,
                result_to_str(send_result)
            );
        };
        res
    }
}