use std::{
net::SocketAddr,
net::{IpAddr, Ipv4Addr},
pin::Pin,
};
use futures::Stream;
use tonic::{Request, Response, Status, Streaming};
use xds_api::pb::envoy::{
admin::v3::ClientResourceStatus,
service::status::v3::{
client_config::GenericXdsConfig,
client_status_discovery_service_server::{
ClientStatusDiscoveryService, ClientStatusDiscoveryServiceServer,
},
ClientConfig, ClientStatusRequest, ClientStatusResponse,
},
};
use crate::xds::{cache::CacheReader, XdsConfig};
pub async fn local_server(cache: CacheReader, port: u16) -> Result<(), tonic::transport::Error> {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(xds_api::FILE_DESCRIPTOR_SET)
.with_service_name("envoy.service.status.v3.ClientStatusDiscoveryService")
.build_v1()
.unwrap();
tonic::transport::Server::builder()
.add_service(reflection)
.add_service(ClientStatusDiscoveryServiceServer::new(Server { cache }))
.serve(socket_addr)
.await
}
struct Server {
cache: CacheReader,
}
type ClientStatusResponseStream =
Pin<Box<dyn Stream<Item = Result<ClientStatusResponse, Status>> + Send>>;
#[tonic::async_trait]
impl ClientStatusDiscoveryService for Server {
type StreamClientStatusStream = ClientStatusResponseStream;
async fn stream_client_status(
&self,
_request: Request<Streaming<ClientStatusRequest>>,
) -> Result<Response<Self::StreamClientStatusStream>, Status> {
return Err(Status::unimplemented(
"streaming client status is not supported",
));
}
async fn fetch_client_status(
&self,
request: Request<ClientStatusRequest>,
) -> Result<Response<ClientStatusResponse>, Status> {
let request = request.into_inner();
if !request.node_matchers.is_empty() {
return Err(Status::invalid_argument(
"node_matchers are unsupported for a single client CSDS endpoint",
));
}
let node = request.node;
let generic_xds_configs: Vec<_> = self.cache.iter_xds().map(to_generic_config).collect();
Ok(Response::new(ClientStatusResponse {
config: vec![ClientConfig {
node,
generic_xds_configs,
..Default::default()
}],
}))
}
}
fn to_generic_config(config: XdsConfig) -> GenericXdsConfig {
let client_status = match (&config.xds, &config.last_error) {
(_, Some(_)) => ClientResourceStatus::Nacked,
(Some(_), None) => ClientResourceStatus::Acked,
_ => ClientResourceStatus::Unknown,
};
let version_info = config.version.map(|v| v.to_string()).unwrap_or_default();
GenericXdsConfig {
type_url: config.type_url,
name: config.name,
version_info,
xds_config: config.xds,
client_status: client_status.into(),
..Default::default()
}
}