use std::sync::Arc;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use parking_lot::Mutex;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use crate::Context;
use crate::plugins::telemetry::config_new::conditions::Condition;
use crate::plugins::telemetry::config_new::connector::ConnectorRequest;
use crate::plugins::telemetry::config_new::connector::ConnectorResponse;
use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes;
use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector;
use crate::plugins::telemetry::config_new::events::CustomEvent;
use crate::plugins::telemetry::config_new::events::CustomEvents;
use crate::plugins::telemetry::config_new::events::Event;
use crate::plugins::telemetry::config_new::events::EventLevel;
use crate::plugins::telemetry::config_new::events::StandardEvent;
use crate::plugins::telemetry::config_new::events::StandardEventConfig;
use crate::plugins::telemetry::config_new::events::log_event;
use crate::plugins::telemetry::config_new::extendable::Extendable;
#[derive(Clone)]
pub(crate) struct ConnectorEventRequest {
pub(crate) level: EventLevel,
pub(crate) condition: Arc<Mutex<Condition<ConnectorSelector>>>,
}
#[derive(Clone)]
pub(crate) struct ConnectorEventResponse {
pub(crate) level: EventLevel,
pub(crate) condition: Arc<Condition<ConnectorSelector>>,
}
#[derive(Clone, Deserialize, JsonSchema, Debug, Default)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ConnectorEventsConfig {
pub(crate) request: StandardEventConfig<ConnectorSelector>,
pub(crate) response: StandardEventConfig<ConnectorSelector>,
pub(crate) error: StandardEventConfig<ConnectorSelector>,
}
pub(crate) type ConnectorEvents =
CustomEvents<ConnectorRequest, ConnectorResponse, (), ConnectorAttributes, ConnectorSelector>;
pub(crate) fn new_connector_events(
config: &Extendable<ConnectorEventsConfig, Event<ConnectorAttributes, ConnectorSelector>>,
) -> ConnectorEvents {
let custom_events = config
.custom
.iter()
.filter_map(|(name, config)| CustomEvent::from_config(name, config))
.collect();
ConnectorEvents {
request: StandardEvent::from_config(&config.attributes.request),
response: StandardEvent::from_config(&config.attributes.response),
error: StandardEvent::from_config(&config.attributes.error),
custom: custom_events,
}
}
impl CustomEvents<ConnectorRequest, ConnectorResponse, (), ConnectorAttributes, ConnectorSelector> {
pub(crate) fn on_request(&mut self, request: &ConnectorRequest) {
if let Some(request_event) = self.request.take() {
request.context.extensions().with_lock(|lock| {
lock.insert(ConnectorEventRequest {
level: request_event.level,
condition: Arc::new(Mutex::new(request_event.condition)),
})
});
}
if let Some(response_event) = self.response.take() {
request.context.extensions().with_lock(|lock| {
lock.insert(ConnectorEventResponse {
level: response_event.level,
condition: Arc::new(response_event.condition),
})
});
}
for custom_event in &mut self.custom {
custom_event.on_request(request);
}
}
pub(crate) fn on_response(&mut self, response: &ConnectorResponse) {
for custom_event in &mut self.custom {
custom_event.on_response(response);
}
}
pub(crate) fn on_error(&mut self, error: &BoxError, ctx: &Context) {
if let Some(error_event) = &mut self.error
&& error_event.condition.evaluate_error(error, ctx)
{
log_event(
error_event.level,
"connector.http.error",
vec![KeyValue::new(
Key::from_static_str("error"),
opentelemetry::Value::String(error.to_string().into()),
)],
"",
);
}
for custom_event in &mut self.custom {
custom_event.on_error(error, ctx);
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use apollo_compiler::name;
use apollo_federation::connectors::ConnectId;
use apollo_federation::connectors::ConnectSpec;
use apollo_federation::connectors::Connector;
use apollo_federation::connectors::HttpJsonTransport;
use apollo_federation::connectors::JSONSelection;
use apollo_federation::connectors::SourceName;
use apollo_federation::connectors::StringTemplate;
use apollo_federation::connectors::runtime::http_json_transport::HttpRequest;
use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
use apollo_federation::connectors::runtime::http_json_transport::TransportRequest;
use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
use apollo_federation::connectors::runtime::key::ResponseKey;
use apollo_federation::connectors::runtime::responses::MappedResponse;
use http::HeaderValue;
use tracing::instrument::WithSubscriber;
use super::*;
use crate::assert_snapshot_subscriber;
use crate::plugins::telemetry::Telemetry;
use crate::plugins::test::PluginTestHarness;
use crate::services::connector::request_service::Request;
use crate::services::connector::request_service::Response;
use crate::services::router::body;
#[tokio::test(flavor = "multi_thread")]
async fn test_connector_events_request() {
let test_harness: PluginTestHarness<Telemetry> = PluginTestHarness::builder()
.config(include_str!("../../testdata/custom_events.router.yaml"))
.build()
.await
.expect("test harness");
async {
let context = crate::Context::default();
let mut http_request = http::Request::builder().body("".into()).unwrap();
http_request
.headers_mut()
.insert("x-log-request", HeaderValue::from_static("log"));
let transport_request = TransportRequest::Http(HttpRequest {
inner: http_request,
debug: Default::default(),
});
let connector = Connector {
id: ConnectId::new(
"subgraph".into(),
Some(SourceName::cast("source")),
name!(Query),
name!(users),
None,
0,
),
transport: HttpJsonTransport {
source_template: None,
connect_template: StringTemplate::from_str("/test").unwrap(),
..Default::default()
},
selection: JSONSelection::empty(),
config: None,
max_requests: None,
entity_resolver: None,
spec: ConnectSpec::V0_1,
schema_subtypes_map: Default::default(),
batch_settings: None,
request_headers: Default::default(),
response_headers: Default::default(),
request_variable_keys: Default::default(),
response_variable_keys: Default::default(),
error_settings: Default::default(),
label: "label".into(),
};
let response_key = ResponseKey::RootField {
name: "hello".to_string(),
inputs: Default::default(),
selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
};
let connector_request = Request {
context: context.clone(),
connector: Arc::new(connector.clone()),
transport_request,
key: response_key.clone(),
mapping_problems: vec![],
supergraph_request: Default::default(),
operation: Default::default(),
};
test_harness
.call_connector_request_service(connector_request, |request| Response {
context: request.context.clone(),
transport_result: Ok(TransportResponse::Http(HttpResponse {
inner: http::Response::builder()
.status(200)
.header("x-log-request", HeaderValue::from_static("log"))
.body(body::empty())
.expect("expecting valid response")
.into_parts()
.0,
})),
mapped_response: MappedResponse::Data {
data: serde_json::json!({})
.try_into()
.expect("expecting valid JSON"),
key: request.key.clone(),
problems: vec![],
},
})
.await
.expect("expecting successful response");
}
.with_subscriber(assert_snapshot_subscriber!())
.await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_connector_events_response() {
let test_harness: PluginTestHarness<Telemetry> = PluginTestHarness::builder()
.config(include_str!("../../testdata/custom_events.router.yaml"))
.build()
.await
.expect("test harness");
async {
let context = crate::Context::default();
let mut http_request = http::Request::builder().body("".into()).unwrap();
http_request
.headers_mut()
.insert("x-log-response", HeaderValue::from_static("log"));
let transport_request = TransportRequest::Http(HttpRequest {
inner: http_request,
debug: Default::default(),
});
let connector = Connector {
id: ConnectId::new(
"subgraph".into(),
Some(SourceName::cast("source")),
name!(Query),
name!(users),
None,
0,
),
transport: HttpJsonTransport {
source_template: None,
connect_template: StringTemplate::from_str("/test").unwrap(),
..Default::default()
},
selection: JSONSelection::empty(),
config: None,
max_requests: None,
entity_resolver: None,
spec: ConnectSpec::V0_1,
schema_subtypes_map: Default::default(),
batch_settings: None,
request_headers: Default::default(),
response_headers: Default::default(),
request_variable_keys: Default::default(),
response_variable_keys: Default::default(),
error_settings: Default::default(),
label: "label".into(),
};
let response_key = ResponseKey::RootField {
name: "hello".to_string(),
inputs: Default::default(),
selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
};
let connector_request = Request {
context: context.clone(),
connector: Arc::new(connector.clone()),
transport_request,
key: response_key.clone(),
mapping_problems: vec![],
supergraph_request: Default::default(),
operation: Default::default(),
};
test_harness
.call_connector_request_service(connector_request, |request| Response {
context: request.context.clone(),
transport_result: Ok(TransportResponse::Http(HttpResponse {
inner: http::Response::builder()
.status(200)
.header("x-log-response", HeaderValue::from_static("log"))
.body(body::empty())
.expect("expecting valid response")
.into_parts()
.0,
})),
mapped_response: MappedResponse::Data {
data: serde_json::json!({})
.try_into()
.expect("expecting valid JSON"),
key: request.key.clone(),
problems: vec![],
},
})
.await
.expect("expecting successful response");
}
.with_subscriber(assert_snapshot_subscriber!())
.await
}
}