#ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
#define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "upb/reflection/def.hpp"
#include <grpc/event_engine/event_engine.h>
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/ext/xds/xds_transport.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
extern TraceFlag grpc_xds_client_trace;
extern TraceFlag grpc_xds_client_refcount_trace;
class XdsClient : public DualRefCounted<XdsClient> {
public:
class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
public:
virtual void OnGenericResourceChanged(
const XdsResourceType::ResourceData* resource)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
XdsClient(
std::unique_ptr<XdsBootstrap> bootstrap,
OrphanablePtr<XdsTransportFactory> transport_factory,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
std::string user_agent_name, std::string user_agent_version,
Duration resource_request_timeout = Duration::Seconds(15));
~XdsClient() override;
const XdsBootstrap& bootstrap() const {
return *bootstrap_; }
XdsTransportFactory* transport_factory() const {
return transport_factory_.get();
}
void Orphan() override;
void WatchResource(const XdsResourceType* type, absl::string_view name,
RefCountedPtr<ResourceWatcherInterface> watcher);
void CancelResourceWatch(const XdsResourceType* type,
absl::string_view listener_name,
ResourceWatcherInterface* watcher,
bool delay_unsubscription = false);
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name);
void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats);
RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality);
void RemoveClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats);
void ResetBackoff();
std::string DumpClientConfigBinary();
grpc_event_engine::experimental::EventEngine* engine() {
return engine_.get();
}
private:
struct XdsResourceKey {
std::string id;
std::vector<URI::QueryParam> query_params;
bool operator<(const XdsResourceKey& other) const {
int c = id.compare(other.id);
if (c != 0) return c < 0;
return query_params < other.query_params;
}
};
struct XdsResourceName {
std::string authority;
XdsResourceKey key;
};
class ChannelState : public DualRefCounted<ChannelState> {
public:
template <typename T>
class RetryableCall;
class AdsCallState;
class LrsCallState;
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
const XdsBootstrap::XdsServer& server);
~ChannelState() override;
void Orphan() override;
XdsClient* xds_client() const { return xds_client_.get(); }
AdsCallState* ads_calld() const;
LrsCallState* lrs_calld() const;
void ResetBackoff();
void MaybeStartLrsCall();
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
const absl::Status& status() const { return status_; }
void SubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void UnsubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name,
bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
void OnConnectivityFailure(absl::Status status);
void SetChannelStatusLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
WeakRefCountedPtr<XdsClient> xds_client_;
const XdsBootstrap::XdsServer& server_;
OrphanablePtr<XdsTransportFactory::XdsTransport> transport_;
bool shutting_down_ = false;
OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
std::map<const XdsResourceType*, std::string >
resource_type_version_map_;
absl::Status status_;
};
struct ResourceState {
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
watchers;
std::unique_ptr<XdsResourceType::ResourceData> resource;
XdsApi::ResourceMetadata meta;
bool ignored_deletion = false;
};
struct AuthorityState {
RefCountedPtr<ChannelState> channel_state;
std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
resource_map;
};
struct LoadReportState {
struct LocalityState {
XdsClusterLocalityStats* locality_stats = nullptr;
XdsClusterLocalityStats::Snapshot deleted_locality_stats;
};
XdsClusterDropStats* drop_stats = nullptr;
XdsClusterDropStats::Snapshot deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
Timestamp last_report_time = Timestamp::Now();
};
using LoadReportMap = std::map<
std::pair<std::string , std::string >,
LoadReportState>;
struct LoadReportServer {
RefCountedPtr<ChannelState> channel_state;
LoadReportMap load_report_map;
};
void NotifyWatchersOnErrorLocked(
const std::map<ResourceWatcherInterface*,
RefCountedPtr<ResourceWatcherInterface>>& watchers,
absl::Status status);
void NotifyWatchersOnResourceDoesNotExist(
const std::map<ResourceWatcherInterface*,
RefCountedPtr<ResourceWatcherInterface>>& watchers);
void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
absl::StatusOr<XdsResourceName> ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type);
static std::string ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type,
const XdsResourceKey& key);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::unique_ptr<XdsBootstrap> bootstrap_;
OrphanablePtr<XdsTransportFactory> transport_factory_;
const Duration request_timeout_;
const bool xds_federation_enabled_;
XdsApi api_;
WorkSerializer work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
Mutex mu_;
std::map<absl::string_view , const XdsResourceType*>
resource_types_ ABSL_GUARDED_BY(mu_);
upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_);
std::map<const XdsBootstrap::XdsServer*, ChannelState*>
xds_server_channel_map_ ABSL_GUARDED_BY(mu_);
std::map<std::string , AuthorityState> authority_state_map_
ABSL_GUARDED_BY(mu_);
std::map<const XdsBootstrap::XdsServer*, LoadReportServer>
xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
invalid_watchers_ ABSL_GUARDED_BY(mu_);
bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
};
}
#endif