async_graphql_extension_apollo_tracing/
lib.rs1mod compression;
29mod proto;
30pub mod register;
31mod report_aggregator;
32
33mod runtime;
34mod packages;
35
36use futures::SinkExt;
37use protobuf::{well_known_types::timestamp::Timestamp, EnumOrUnknown, MessageField};
38use report_aggregator::ReportAggregator;
39use runtime::spawn;
40use packages::serde_json;
41
42#[macro_use]
43extern crate tracing;
44
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::RwLock;
48
49use async_graphql::QueryPathSegment;
50use chrono::{DateTime, Utc};
51use futures::lock::Mutex;
52use std::convert::TryFrom;
53
54use async_graphql::extensions::{
55 Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextResolve,
56 ResolveInfo,
57};
58use async_graphql::parser::types::{ExecutableDocument, OperationType, Selection};
59use async_graphql::{Response, ServerResult, Value, Variables};
60use proto::reports::{
61 trace::{self, node, Node},
62 Trace,
63};
64use std::convert::TryInto;
65
66pub use proto::reports::trace::http::Method;
67
68pub struct ApolloTracing {
82 report: Arc<ReportAggregator>,
83}
84
85#[derive(Debug, Clone, Default, derive_builder::Builder)]
103#[builder(pattern = "owned", setter(into, strip_option))]
104pub struct ApolloTracingDataExt {
105 #[builder(default)]
106 pub client_name: Option<String>,
107 #[builder(default)]
108 pub client_version: Option<String>,
109 #[builder(default)]
110 pub method: Option<Method>,
111 #[builder(default)]
112 pub status_code: Option<u32>,
113}
114
115impl ApolloTracing {
116 pub fn new(
124 authorization_token: String,
125 hostname: String,
126 graph_id: String,
127 variant: String,
128 service_version: String,
129 ) -> ApolloTracing {
130 let report = ReportAggregator::initialize(
131 authorization_token,
132 hostname,
133 graph_id,
134 variant,
135 service_version,
136 );
137
138 ApolloTracing {
139 report: Arc::new(report),
140 }
141 }
142}
143
144impl ExtensionFactory for ApolloTracing {
145 fn create(&self) -> Arc<dyn Extension> {
146 Arc::new(ApolloTracingExtension {
147 inner: Mutex::new(Inner {
148 start_time: Utc::now(),
149 end_time: Utc::now(),
150 }),
151 report: self.report.clone(),
152 nodes: RwLock::new(HashMap::new()),
153 root_node: Arc::new(RwLock::new(Node::default())),
154 operation_name: RwLock::new("schema".to_string()),
155 })
156 }
157}
158
159struct Inner {
160 start_time: DateTime<Utc>,
161 end_time: DateTime<Utc>,
162}
163
164struct ApolloTracingExtension {
165 inner: Mutex<Inner>,
166 report: Arc<ReportAggregator>,
167 nodes: RwLock<HashMap<String, Arc<RwLock<Node>>>>,
168 root_node: Arc<RwLock<Node>>,
169 operation_name: RwLock<String>,
170}
171
172#[async_trait::async_trait]
173impl Extension for ApolloTracingExtension {
174 #[instrument(level = "debug", skip(self, ctx, next))]
175 async fn parse_query(
176 &self,
177 ctx: &ExtensionContext<'_>,
178 query: &str,
179 variables: &Variables,
180 next: NextParseQuery<'_>,
181 ) -> ServerResult<ExecutableDocument> {
182 let document = next.run(ctx, query, variables).await?;
183 let is_schema = document
184 .operations
185 .iter()
186 .filter(|(_, operation)| operation.node.ty == OperationType::Query)
187 .any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
188 if !is_schema {
189 let result: String =
190 ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::from_str("{}").unwrap()));
191 let name = document
192 .operations
193 .iter()
194 .next()
195 .and_then(|x| x.0)
196 .map(|x| x.as_str())
197 .unwrap_or("no_name");
198 let query_type = format!("# {name}\n {query}", name = name, query = result);
199 *self.operation_name.write().unwrap() = query_type;
200 }
201 Ok(document)
202 }
203
204 #[instrument(level = "debug", skip(self, ctx, next))]
205 async fn execute(
206 &self,
207 ctx: &ExtensionContext<'_>,
208 operation_name: Option<&str>,
209 next: NextExecute<'_>,
210 ) -> Response {
211 let start_time = Utc::now();
212 self.inner.lock().await.start_time = start_time;
213
214 let resp = next.run(ctx, operation_name).await;
215 let mut inner = self.inner.lock().await;
218 inner.end_time = Utc::now();
219
220 let tracing_extension = ctx
221 .data::<ApolloTracingDataExt>()
222 .ok()
223 .cloned()
224 .unwrap_or_default();
225
226 let client_name = tracing_extension
227 .client_name
228 .unwrap_or_else(|| "no client name".to_string());
229 let client_version = tracing_extension
230 .client_version
231 .unwrap_or_else(|| "no client version".to_string());
232 let method = tracing_extension
233 .method
234 .or(<Method as protobuf::Enum>::from_str("UNKNOWN"));
235 let status_code = tracing_extension.status_code.unwrap_or(0);
236
237 let mut trace: Trace = Trace {
238 client_name,
239 client_version,
240 duration_ns: (inner.end_time - inner.start_time)
241 .num_nanoseconds()
242 .map(|x| x.try_into().unwrap())
243 .unwrap_or(0),
244 ..Default::default()
245 };
246
247 trace.details = Some(trace::Details {
248 operation_name: operation_name
249 .map(|x| x.to_string())
250 .unwrap_or_else(|| "no operation".to_string()),
251 ..Default::default()
252 })
253 .into();
254
255 trace.http = Some(trace::HTTP {
256 method: EnumOrUnknown::new(method.unwrap()),
257 status_code,
258 ..Default::default()
259 })
260 .into();
261
262 trace.end_time = MessageField::some(Timestamp {
263 nanos: inner.end_time.timestamp_subsec_nanos().try_into().unwrap(),
264 seconds: inner.end_time.timestamp(),
265 special_fields: Default::default(),
266 });
267
268 trace.start_time =
269 protobuf::MessageField::some(protobuf::well_known_types::timestamp::Timestamp {
270 nanos: inner
271 .start_time
272 .timestamp_subsec_nanos()
273 .try_into()
274 .unwrap(),
275 seconds: inner.start_time.timestamp(),
276 special_fields: Default::default(),
277 });
278
279 let root_node = self.root_node.read().unwrap();
280 trace.root = Some(root_node.clone()).into();
281
282 let mut sender = self.report.sender();
283
284 let operation_name = self.operation_name.read().unwrap().clone();
285
286 let _handle = spawn(async move {
287 if let Err(e) = sender.send((operation_name, trace)).await {
288 error!(error = ?e);
289 }
290 });
291 resp
292 }
293
294 #[instrument(level = "debug", skip(self, ctx, info, next))]
295 async fn resolve(
296 &self,
297 ctx: &ExtensionContext<'_>,
298 info: ResolveInfo<'_>,
299 next: NextResolve<'_>,
300 ) -> ServerResult<Option<Value>> {
301 let path = info.path_node.to_string_vec().join(".");
305 let field_name = info.path_node.field_name().to_string();
306 let parent_type = info.parent_type.to_string();
307 let _return_type = info.return_type.to_string();
308 let start_time = Utc::now() - self.inner.lock().await.start_time;
309 let path_node = info.path_node;
310
311 let node: Node = Node {
312 end_time: 0,
313 id: match path_node.segment {
314 QueryPathSegment::Name(name) => Some(node::Id::ResponseName(name.to_string())),
315 QueryPathSegment::Index(index) => {
316 Some(node::Id::Index(index.try_into().unwrap_or(0)))
317 }
318 },
319 start_time: match start_time
320 .num_nanoseconds()
321 .and_then(|x| u64::try_from(x).ok())
322 {
323 Some(duration) => duration,
324 None => Utc::now()
325 .timestamp_nanos_opt()
326 .unwrap_or_default()
327 .try_into()
328 .unwrap_or_default(),
329 },
330 parent_type: parent_type.to_string(),
331 original_field_name: field_name,
332 ..Default::default()
333 };
334
335 let node = Arc::new(RwLock::new(node));
336 self.nodes.write().unwrap().insert(path, node.clone());
337 let parent_node = path_node.parent.map(|x| x.to_string_vec().join("."));
338 let res = match next.run(ctx, info).await {
341 Ok(res) => Ok(res),
342 Err(e) => {
343 let json = match serde_json::to_string(&e) {
344 Ok(content) => content,
345 Err(e) => format!("{{ \"error\": \"{e:?}\" }}"),
346 };
347 let error = trace::Error {
348 message: e.message.clone(),
349 location: e
350 .locations
351 .clone()
352 .into_iter()
353 .map(|x| trace::Location {
354 line: x.line as u32,
355 column: x.column as u32,
356 special_fields: protobuf::SpecialFields::default(),
357 })
358 .collect(),
359 json,
360 ..Default::default()
361 };
362
363 node.write().unwrap().error = vec![error];
364 Err(e)
365 }
366 };
367 let end_time = Utc::now() - self.inner.lock().await.start_time;
368
369 node.write().unwrap().end_time = match end_time
370 .num_nanoseconds()
371 .and_then(|x| u64::try_from(x).ok())
372 {
373 Some(duration) => duration,
374 None => Utc::now()
375 .timestamp_nanos_opt()
376 .unwrap_or_default()
377 .try_into()
378 .unwrap_or_default(),
379 };
380
381 match parent_node {
382 None => {
383 let mut root_node = self.root_node.write().unwrap();
384 let child = &mut root_node.child;
385 let node = node.read().unwrap();
386 child.push(node.clone());
389 }
390 Some(parent) => {
391 let nodes = self.nodes.read().unwrap();
392 let node_read = nodes.get(&parent).unwrap();
393 let mut parent = node_read.write().unwrap();
394 let child = &mut parent.child;
395 let node = node.read().unwrap();
396 child.push(node.clone());
399 }
400 };
401
402 res
403 }
404}