use std::fmt::Debug;
use std::io::Write;
use std::str::FromStr;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use bytes::BytesMut;
use flate2::Compression;
use flate2::write::GzEncoder;
use http::StatusCode;
use http::header::ACCEPT;
use http::header::CONTENT_ENCODING;
use http::header::CONTENT_TYPE;
use http::header::RETRY_AFTER;
use http::header::USER_AGENT;
use opentelemetry::ExportError;
pub(crate) use prost::*;
use reqwest::Client;
use serde::ser::SerializeStruct;
use serde_json::Value;
use sys_info::hostname;
use tokio::sync::mpsc;
use tower::BoxError;
use url::Url;
use super::apollo::Report;
use super::apollo::SingleReport;
use super::config::ApolloMetricsReferenceMode;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
const BACKOFF_INCREMENT: Duration = Duration::from_millis(50);
const ROUTER_REPORT_TYPE_METRICS: &str = "metrics";
pub(crate) const ROUTER_REPORT_TYPE_TRACES: &str = "traces";
const ROUTER_TRACING_PROTOCOL_APOLLO: &str = "apollo";
pub(crate) const ROUTER_TRACING_PROTOCOL_OTLP: &str = "otlp";
#[derive(thiserror::Error, Debug)]
pub(crate) enum ApolloExportError {
#[error("Apollo exporter server error: {0}")]
ServerError(String),
#[error("Apollo exporter client error: {0}")]
ClientError(String),
#[error("Apollo exporter unavailable error: {0}")]
Unavailable(String),
#[error("Apollo Studio not accepting reports for {1} seconds")]
StudioBackoff(Report, u64),
}
impl ExportError for ApolloExportError {
fn exporter_name(&self) -> &'static str {
"ApolloExporter"
}
}
#[derive(Clone, Default)]
pub(crate) enum Sender {
#[default]
Noop,
Apollo(mpsc::Sender<SingleReport>),
}
impl Sender {
pub(crate) fn send(&self, report: SingleReport) {
match &self {
Sender::Noop => {}
Sender::Apollo(channel) => {
if let Err(err) = channel.to_owned().try_send(report) {
tracing::warn!(
"could not send metrics to telemetry, metric will be dropped: {}",
err
);
}
}
}
}
}
pub(crate) struct ApolloExporter {
batch_config: BatchProcessorConfig,
endpoint: Url,
apollo_key: String,
header: proto::reports::ReportHeader,
client: Client,
strip_traces: AtomicBool,
studio_backoff: Mutex<Instant>,
metrics_reference_mode: ApolloMetricsReferenceMode,
}
impl ApolloExporter {
pub(crate) fn new(
endpoint: &Url,
batch_config: &BatchProcessorConfig,
apollo_key: &str,
apollo_graph_ref: &str,
schema_id: &str,
metrics_reference_mode: ApolloMetricsReferenceMode,
) -> Result<ApolloExporter, BoxError> {
let header = proto::reports::ReportHeader {
graph_ref: apollo_graph_ref.to_string(),
hostname: hostname()?,
agent_version: format!(
"{}@{}",
std::env!("CARGO_PKG_NAME"),
std::env!("CARGO_PKG_VERSION")
),
runtime_version: "rust".to_string(),
uname: get_uname()?,
executable_schema_id: schema_id.to_string(),
..Default::default()
};
tracing::debug!("creating apollo exporter {}", endpoint);
Ok(ApolloExporter {
endpoint: endpoint.clone(),
batch_config: batch_config.clone(),
apollo_key: apollo_key.to_string(),
client: reqwest::Client::builder()
.no_gzip()
.timeout(batch_config.max_export_timeout)
.build()
.map_err(BoxError::from)?,
header,
strip_traces: Default::default(),
studio_backoff: Mutex::new(Instant::now()),
metrics_reference_mode,
})
}
pub(crate) fn start(self) -> Sender {
let (tx, mut rx) = mpsc::channel::<SingleReport>(self.batch_config.max_queue_size);
tokio::spawn(async move {
let timeout = tokio::time::interval(self.batch_config.scheduled_delay);
let mut report = Report::default();
let mut backoff_warn = true;
tokio::pin!(timeout);
loop {
tokio::select! {
biased;
_ = timeout.tick() => {
match self.submit_report(std::mem::take(&mut report)).await {
Ok(_) => backoff_warn = true,
Err(err) => {
match err {
ApolloExportError::StudioBackoff(unsubmitted, remaining) => {
if backoff_warn {
tracing::warn!("Apollo Studio not accepting reports for {remaining} seconds");
backoff_warn = false;
}
report = unsubmitted;
},
_ => tracing::error!("failed to submit Apollo report: {}", err)
}
}
}
}
single_report = rx.recv() => {
if let Some(r) = single_report {
report += r;
} else {
tracing::debug!("terminating apollo exporter");
break;
}
},
};
}
if let Err(e) = self.submit_report(std::mem::take(&mut report)).await {
tracing::error!("failed to submit Apollo report: {}", e)
}
});
Sender::Apollo(tx)
}
pub(crate) async fn submit_report(&self, report: Report) -> Result<(), ApolloExportError> {
if report.licensed_operation_count_by_type.is_empty() && report.traces_per_query.is_empty()
{
return Ok(());
}
let expires_at = *self.studio_backoff.lock().unwrap();
let now = Instant::now();
if expires_at > now {
let remaining = expires_at - now;
return Err(ApolloExportError::StudioBackoff(
report,
remaining.as_secs(),
));
}
let extended_references_enabled = matches!(
self.metrics_reference_mode,
ApolloMetricsReferenceMode::Extended
);
tracing::debug!("submitting report: {:?}", report);
let mut content = BytesMut::new();
let mut proto_report =
report.build_proto_report(self.header.clone(), extended_references_enabled);
prost::Message::encode(&proto_report, &mut content)
.map_err(|e| ApolloExportError::ClientError(e.to_string()))?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&content)
.map_err(|e| ApolloExportError::ClientError(e.to_string()))?;
let compressed_content = encoder
.finish()
.map_err(|e| ApolloExportError::ClientError(e.to_string()))?;
let mut backoff = Duration::from_millis(0);
let req = self
.client
.post(self.endpoint.clone())
.body(compressed_content)
.header("X-Api-Key", self.apollo_key.clone())
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_TYPE, "application/protobuf")
.header(ACCEPT, "application/json")
.header(
USER_AGENT,
format!(
"{} / {} usage reporting",
std::env!("CARGO_PKG_NAME"),
std::env!("CARGO_PKG_VERSION")
),
)
.build()
.map_err(|e| ApolloExportError::Unavailable(e.to_string()))?;
let mut msg = "default error message".to_string();
let mut has_traces = false;
for (_, traces_and_stats) in proto_report.traces_per_query.iter_mut() {
if !traces_and_stats.trace.is_empty() {
has_traces = true;
if self.strip_traces.load(Ordering::SeqCst) {
traces_and_stats.trace.clear();
}
}
}
let retries = if has_traces { 5 } else { 1 };
for i in 0..retries {
let task_req = req.try_clone().expect("requests must be clone-able");
match self.client.execute(task_req).await {
Ok(v) => {
let status = v.status();
let opt_header_retry = v.headers().get(RETRY_AFTER).cloned();
let data = v
.text()
.await
.map_err(|e| ApolloExportError::ServerError(e.to_string()))?;
if status.is_client_error() {
tracing::error!("client error reported at ingress: {}", data);
return Err(ApolloExportError::ClientError(data));
} else if status.is_server_error() {
tracing::warn!("attempt: {}, could not transfer: {}", i + 1, data);
msg = data;
if status == StatusCode::TOO_MANY_REQUESTS {
let mut retry_after = 0;
if let Some(returned_retry_after) =
opt_header_retry.and_then(|v| v.to_str().ok()?.parse::<u64>().ok())
{
retry_after = returned_retry_after;
*self.studio_backoff.lock().unwrap() =
Instant::now() + Duration::from_secs(retry_after);
}
return Err(ApolloExportError::StudioBackoff(report, retry_after));
}
} else {
tracing::debug!("ingress response text: {:?}", data);
let report_type = if has_traces {
ROUTER_REPORT_TYPE_TRACES
} else {
ROUTER_REPORT_TYPE_METRICS
};
u64_counter!(
"apollo.router.telemetry.studio.reports",
"The number of reports submitted to Studio by the Router",
1,
report.type = report_type,
report.protocol = ROUTER_TRACING_PROTOCOL_APOLLO,
report.extended_references_enabled = extended_references_enabled
);
if has_traces && !self.strip_traces.load(Ordering::SeqCst) {
if let Ok(response) = serde_json::Value::from_str(&data) {
if let Some(Value::Bool(true)) = response.get("tracesIgnored") {
tracing::warn!(
"traces will not be sent to Apollo as this account is on a free plan"
);
self.strip_traces.store(true, Ordering::SeqCst);
}
}
}
return Ok(());
}
}
Err(e) => {
tracing::warn!("attempt: {}, could not transfer: {}", i + 1, e);
msg = e.to_string();
}
}
backoff += BACKOFF_INCREMENT;
tokio::time::sleep(backoff).await;
}
Err(ApolloExportError::Unavailable(msg))
}
}
#[cfg(not(target_os = "windows"))]
pub(crate) fn get_uname() -> Result<String, std::io::Error> {
let u = uname::uname()?;
Ok(format!(
"{}, {}, {}, {}, {},",
u.sysname, u.nodename, u.release, u.version, u.machine
))
}
#[cfg(target_os = "windows")]
pub(crate) fn get_uname() -> Result<String, std::io::Error> {
let sysname = sys_info::os_type().unwrap_or_else(|_| "Windows".to_owned());
let nodename = sys_info::hostname().unwrap_or_else(|_| "unknown".to_owned());
let release = sys_info::os_release().unwrap_or_else(|_| "unknown".to_owned());
let version = "unknown";
let machine = "unknown";
Ok(format!(
"{}, {}, {}, {}, {}",
sysname, nodename, release, version, machine
))
}
#[allow(unreachable_pub)]
pub(crate) mod proto {
pub(crate) mod reports {
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("reports");
}
}
pub(crate) fn serialize_timestamp<S>(
timestamp: &Option<prost_types::Timestamp>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match timestamp {
Some(ts) => {
let mut ts_strukt = serializer.serialize_struct("Timestamp", 2)?;
ts_strukt.serialize_field("seconds", &ts.seconds)?;
ts_strukt.serialize_field("nanos", &ts.nanos)?;
ts_strukt.end()
}
None => serializer.serialize_none(),
}
}