use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use opentelemetry::KeyValue;
use schemars::JsonSchema;
use schemars::Schema;
use schemars::SchemaGenerator;
use serde::Deserialize;
use serde::Deserializer;
use serde::de::Error;
use serde::de::MapAccess;
use serde::de::Visitor;
use serde_json::Map;
use serde_json::Value;
use tower::BoxError;
use super::Stage;
use crate::Context;
use crate::plugins::telemetry::config_new::DefaultForLevel;
use crate::plugins::telemetry::config_new::Selector;
use crate::plugins::telemetry::config_new::Selectors;
use crate::plugins::telemetry::config_new::attributes::DefaultAttributeRequirementLevel;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
#[derive(Clone, Debug)]
pub(crate) struct Extendable<Att, Ext>
where
Att: Default,
{
pub(crate) attributes: Att,
pub(crate) custom: HashMap<String, Ext>,
}
impl<Att, Ext> DefaultForLevel for Extendable<Att, Ext>
where
Att: DefaultForLevel + Default,
{
fn defaults_for_level(
&mut self,
requirement_level: DefaultAttributeRequirementLevel,
kind: TelemetryDataKind,
) {
self.attributes.defaults_for_level(requirement_level, kind);
}
}
impl Extendable<(), ()> {
pub(crate) fn empty_arc<A, E>() -> Arc<Extendable<A, E>>
where
A: Default,
{
Default::default()
}
}
impl<'de, Att, Ext> Deserialize<'de> for Extendable<Att, Ext>
where
Att: Default + Deserialize<'de> + Debug + Sized,
Ext: Deserialize<'de> + Debug + Sized,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct ExtendableVisitor<Att, Ext> {
_phantom: std::marker::PhantomData<(Att, Ext)>,
}
impl<'de, Att, Ext> Visitor<'de> for ExtendableVisitor<Att, Ext>
where
Att: Default + Deserialize<'de> + Debug,
Ext: Deserialize<'de> + Debug,
{
type Value = Extendable<Att, Ext>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a map structure")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut attributes = Map::new();
let mut custom: HashMap<String, Ext> = HashMap::new();
while let Some(key) = map.next_key()? {
let value: Value = map.next_value()?;
match Ext::deserialize(value.clone()) {
Ok(value) => {
custom.insert(key, value);
}
Err(_err) => {
let mut temp_attributes: Map<String, Value> = Map::new();
temp_attributes.insert(key.clone(), value.clone());
Att::deserialize(Value::Object(temp_attributes)).map_err(|e| {
A::Error::custom(format!("failed to parse attribute '{key}': {e}"))
})?;
attributes.insert(key, value);
}
}
}
let attributes =
Att::deserialize(Value::Object(attributes)).map_err(A::Error::custom)?;
Ok(Extendable { attributes, custom })
}
}
deserializer.deserialize_map(ExtendableVisitor::<Att, Ext> {
_phantom: Default::default(),
})
}
}
impl<A, E> JsonSchema for Extendable<A, E>
where
A: Default + JsonSchema,
E: JsonSchema,
{
fn schema_name() -> std::borrow::Cow<'static, str> {
format!("Extended{}With{}", A::schema_name(), E::schema_name()).into()
}
fn json_schema(generator: &mut SchemaGenerator) -> Schema {
let custom = generator.subschema_for::<HashMap<String, E>>();
let attribute_schema = A::json_schema(generator);
let mut properties = BTreeMap::<String, Schema>::new();
if let Some(attribute_properties) = attribute_schema
.as_object()
.and_then(|object| object.get("properties"))
.and_then(|properties| properties.as_object())
{
for key in attribute_properties.keys() {
properties.insert(key.clone(), true.into());
}
}
let mut schema = attribute_schema.clone();
if let Some(object) = schema.as_object_mut()
&& let Some(additional_properties) = custom.get("additionalProperties")
{
object["additionalProperties"] = additional_properties.clone();
}
schema
}
}
impl<A, E> Default for Extendable<A, E>
where
A: Default,
{
fn default() -> Self {
Self {
attributes: Default::default(),
custom: HashMap::new(),
}
}
}
impl<A, E, Request, Response, EventResponse> Selectors<Request, Response, EventResponse>
for Extendable<A, E>
where
A: Default + Selectors<Request, Response, EventResponse>,
E: Selector<Request = Request, Response = Response, EventResponse = EventResponse>,
{
fn on_request(&self, request: &Request) -> Vec<KeyValue> {
let mut attrs = self.attributes.on_request(request);
let custom_attributes = self.custom.iter().filter_map(|(key, value)| {
value
.on_request(request)
.map(|v| KeyValue::new(key.clone(), v))
});
attrs.extend(custom_attributes);
attrs
}
fn on_response(&self, response: &Response) -> Vec<KeyValue> {
let mut attrs = self.attributes.on_response(response);
let custom_attributes = self.custom.iter().filter_map(|(key, value)| {
value
.on_response(response)
.map(|v| KeyValue::new(key.clone(), v))
});
attrs.extend(custom_attributes);
attrs
}
fn on_error(&self, error: &BoxError, ctx: &Context) -> Vec<KeyValue> {
let mut attrs = self.attributes.on_error(error, ctx);
let custom_attributes = self.custom.iter().filter_map(|(key, value)| {
value
.on_error(error, ctx)
.map(|v| KeyValue::new(key.clone(), v))
});
attrs.extend(custom_attributes);
attrs
}
fn on_response_event(&self, response: &EventResponse, ctx: &Context) -> Vec<KeyValue> {
let mut attrs = self.attributes.on_response_event(response, ctx);
let custom_attributes = self.custom.iter().filter_map(|(key, value)| {
value
.on_response_event(response, ctx)
.map(|v| KeyValue::new(key.clone(), v))
});
attrs.extend(custom_attributes);
attrs
}
fn on_response_field(
&self,
attrs: &mut Vec<KeyValue>,
ty: &apollo_compiler::executable::NamedType,
field: &apollo_compiler::executable::Field,
value: &serde_json_bytes::Value,
ctx: &Context,
) {
self.attributes
.on_response_field(attrs, ty, field, value, ctx);
let custom_attributes = self.custom.iter().filter_map(|(key, v)| {
v.on_response_field(ty, field, value, ctx)
.map(|v| KeyValue::new(key.clone(), v))
});
attrs.extend(custom_attributes);
}
}
impl<A, E, Request, Response, EventResponse> Extendable<A, E>
where
A: Default + Selectors<Request, Response, EventResponse>,
E: Selector<Request = Request, Response = Response, EventResponse = EventResponse>,
{
pub(crate) fn validate(&self, restricted_stage: Option<Stage>) -> Result<(), String> {
if let Some(Stage::Request) = &restricted_stage {
for (name, custom) in &self.custom {
if !custom.is_active(Stage::Request) {
return Err(format!(
"cannot set the attribute {name:?} because it is using a selector computed in another stage than 'request' so it will not be computed"
));
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use parking_lot::Mutex;
use crate::plugins::telemetry::config::AttributeValue;
use crate::plugins::telemetry::config_new::attributes::StandardAttribute;
use crate::plugins::telemetry::config_new::conditional::Conditional;
use crate::plugins::telemetry::config_new::conditions::Condition;
use crate::plugins::telemetry::config_new::conditions::SelectorOrValue;
use crate::plugins::telemetry::config_new::extendable::Extendable;
use crate::plugins::telemetry::config_new::http_common::attributes::HttpCommonAttributes;
use crate::plugins::telemetry::config_new::http_server::attributes::HttpServerAttributes;
use crate::plugins::telemetry::config_new::router::attributes::RouterAttributes;
use crate::plugins::telemetry::config_new::router::selectors::RouterSelector;
use crate::plugins::telemetry::config_new::selectors::OperationName;
use crate::plugins::telemetry::config_new::selectors::ResponseStatus;
use crate::plugins::telemetry::config_new::supergraph::attributes::SupergraphAttributes;
use crate::plugins::telemetry::config_new::supergraph::selectors::SupergraphSelector;
#[test]
fn test_extendable_serde() {
let extendable_conf = serde_json::from_value::<
Extendable<SupergraphAttributes, SupergraphSelector>,
>(serde_json::json!({
"graphql.operation.name": true,
"graphql.operation.type": true,
"custom_1": {
"operation_name": "string"
},
"custom_2": {
"operation_name": "string"
}
}))
.unwrap();
assert_eq!(
extendable_conf.attributes,
SupergraphAttributes {
graphql_document: None,
graphql_operation_name: Some(StandardAttribute::Bool(true)),
graphql_operation_type: Some(StandardAttribute::Bool(true)),
cost: Default::default()
}
);
assert_eq!(
extendable_conf.custom.get("custom_1"),
Some(&SupergraphSelector::OperationName {
operation_name: OperationName::String,
redact: None,
default: None
})
);
assert_eq!(
extendable_conf.custom.get("custom_2"),
Some(&SupergraphSelector::OperationName {
operation_name: OperationName::String,
redact: None,
default: None
})
);
}
#[test]
fn test_extendable_serde_fail() {
serde_json::from_value::<Extendable<SupergraphAttributes, SupergraphSelector>>(
serde_json::json!({
"graphql.operation": true,
"graphql.operation.type": true,
"custom_1": {
"operation_name": "string"
},
"custom_2": {
"operation_name": "string"
}
}),
)
.expect_err("Should have errored");
}
#[test]
fn test_extendable_serde_conditional() {
let extendable_conf = serde_json::from_value::<
Extendable<RouterAttributes, Conditional<RouterSelector>>,
>(serde_json::json!({
"http.request.method": true,
"http.response.status_code": true,
"url.path": true,
"http.request.header.x-my-header": {
"request_header": "x-my-header",
"condition": {
"eq": [
200,
{
"response_status": "code"
}
]
}
},
"http.request.header.x-not-present": {
"request_header": "x-not-present",
"default": "nope"
}
}))
.unwrap();
assert_eq!(
extendable_conf.attributes,
RouterAttributes {
datadog_trace_id: None,
trace_id: None,
baggage: None,
common: HttpCommonAttributes {
http_request_method: Some(StandardAttribute::Bool(true)),
http_response_status_code: Some(StandardAttribute::Bool(true)),
..Default::default()
},
server: HttpServerAttributes {
url_path: Some(StandardAttribute::Bool(true)),
..Default::default()
}
}
);
assert_eq!(
extendable_conf
.custom
.get("http.request.header.x-my-header"),
Some(&Conditional {
selector: RouterSelector::RequestHeader {
request_header: String::from("x-my-header"),
redact: None,
default: None
},
condition: Some(Arc::new(Mutex::new(Condition::Eq([
SelectorOrValue::Value(200.into()),
SelectorOrValue::Selector(RouterSelector::ResponseStatus {
response_status: ResponseStatus::Code
})
])))),
value: Default::default(),
})
);
assert_eq!(
extendable_conf
.custom
.get("http.request.header.x-not-present"),
Some(&Conditional {
selector: RouterSelector::RequestHeader {
request_header: String::from("x-not-present"),
redact: None,
default: Some(AttributeValue::String("nope".to_string()))
},
condition: None,
value: Default::default(),
})
);
}
}