async_graphql_extension_apollo_tracing/
lib.rs

1//! # Apollo Extensions for async_graphql
2//!  <div align="center">
3//!  <!-- CI -->
4//!  <img src="https://github.com/Miaxos/async_graphql_apollo_studio_extension/actions/workflows/ci.yml/badge.svg" />
5//!  <!-- Crates version -->
6//!  <a href="https://crates.io/crates/async-graphql-extension-apollo-tracing">
7//!    <img src="https://img.shields.io/crates/v/async-graphql-extension-apollo-tracing.svg?style=flat-square"
8//!    alt="Crates.io version" />
9//!  </a>
10//!  <!-- Downloads -->
11//!  <a href="https://crates.io/crates/async-graphql-extension-apollo-tracing">
12//!    <img src="https://img.shields.io/crates/d/async-graphql-extension-apollo-tracing.svg?style=flat-square"
13//!      alt="Download" />
14//!  </a>
15//! </div>
16//!
17//! ## Features
18//!
19//! * Fully support traces & errors
20//! * Batched traces transfer
21//! * Client segmentation
22//! * Tracing
23//! * Schema register protocol implemented
24//!
25//! ## Crate Features
26//!
27//! * `compression` - To enable GZIP Compression when sending traces to Apollo Studio.
28mod compression;
29mod proto;
30pub mod register;
31mod report_aggregator;
32
33mod runtime;
34mod packages;
35
36use futures::SinkExt;
37use protobuf::{well_known_types::timestamp::Timestamp, EnumOrUnknown, MessageField};
38use report_aggregator::ReportAggregator;
39use runtime::spawn;
40use packages::serde_json;
41
42#[macro_use]
43extern crate tracing;
44
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::RwLock;
48
49use async_graphql::QueryPathSegment;
50use chrono::{DateTime, Utc};
51use futures::lock::Mutex;
52use std::convert::TryFrom;
53
54use async_graphql::extensions::{
55    Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextResolve,
56    ResolveInfo,
57};
58use async_graphql::parser::types::{ExecutableDocument, OperationType, Selection};
59use async_graphql::{Response, ServerResult, Value, Variables};
60use proto::reports::{
61    trace::{self, node, Node},
62    Trace,
63};
64use std::convert::TryInto;
65
66pub use proto::reports::trace::http::Method;
67
68/// Apollo Tracing Extension to send traces to Apollo Studio
69/// The extension to include to your `async_graphql` instance to connect with Apollo Studio.
70///
71/// <https://www.apollographql.com/docs/studio/setup-analytics/#adding-support-to-a-third-party-server-advanced>
72///
73/// Apollo Tracing works by creating traces from GraphQL calls, which contains extra data about the
74/// request being processed. These traces are then batched sent to Apollo Studio.
75///
76/// The extension will start a separate function on a separate thread which will aggregate traces
77/// and batch send them.
78///
79/// To add additional data to your metrics, you should add a ApolloTracingDataExt to your
80/// query_data when you process a query with async_graphql.
81pub struct ApolloTracing {
82    report: Arc<ReportAggregator>,
83}
84
85/// The structure where you can add additional context for Apollo Studio.
86/// This structure must be added to your query data.
87///
88/// It'll allow you to [segment your
89/// users](https://www.apollographql.com/docs/studio/client-awareness/)
90///
91/// * `client_name` - You can segment your users by the client they are using to access your
92/// GraphQL API, it's really usefull when you have mobile and web users for instance. Usually we
93/// add a header `apollographql-client-name` to store this data. Apollo Studio will allow you to
94/// aggregate your metrics by Client Name.
95/// * `client_version` - You can segment your users by the client but it's usefull to also have the
96/// version your clients are using, especially when you are serving your API for mobile users,
97/// it'll allow you to follow metrics depending on which version your users are. Usually we add a
98/// header `apollographql-client-version` to store this data.
99/// * `method` - The HTTP Method.
100/// * `status_code` - The status code return by your GraphQL API. It's a little weird to have to put it
101/// before executing the graphql function, it'll be changed later but usually it's just a 200.
102#[derive(Debug, Clone, Default, derive_builder::Builder)]
103#[builder(pattern = "owned", setter(into, strip_option))]
104pub struct ApolloTracingDataExt {
105    #[builder(default)]
106    pub client_name: Option<String>,
107    #[builder(default)]
108    pub client_version: Option<String>,
109    #[builder(default)]
110    pub method: Option<Method>,
111    #[builder(default)]
112    pub status_code: Option<u32>,
113}
114
115impl ApolloTracing {
116    /// We initialize the ApolloTracing Extension by starting our aggregator async function which
117    /// will receive every traces and send them to the Apollo Studio Ingress for processing
118    ///
119    /// * autorization_token - Token to send metrics to apollo studio.
120    /// * hostname - Hostname like yourdomain-graphql-1.io
121    /// * graph_ref - `ref@variant`  Graph reference with variant
122    /// * release_name - Your release version or release name from Git for example
123    pub fn new(
124        authorization_token: String,
125        hostname: String,
126        graph_id: String,
127        variant: String,
128        service_version: String,
129    ) -> ApolloTracing {
130        let report = ReportAggregator::initialize(
131            authorization_token,
132            hostname,
133            graph_id,
134            variant,
135            service_version,
136        );
137
138        ApolloTracing {
139            report: Arc::new(report),
140        }
141    }
142}
143
144impl ExtensionFactory for ApolloTracing {
145    fn create(&self) -> Arc<dyn Extension> {
146        Arc::new(ApolloTracingExtension {
147            inner: Mutex::new(Inner {
148                start_time: Utc::now(),
149                end_time: Utc::now(),
150            }),
151            report: self.report.clone(),
152            nodes: RwLock::new(HashMap::new()),
153            root_node: Arc::new(RwLock::new(Node::default())),
154            operation_name: RwLock::new("schema".to_string()),
155        })
156    }
157}
158
159struct Inner {
160    start_time: DateTime<Utc>,
161    end_time: DateTime<Utc>,
162}
163
164struct ApolloTracingExtension {
165    inner: Mutex<Inner>,
166    report: Arc<ReportAggregator>,
167    nodes: RwLock<HashMap<String, Arc<RwLock<Node>>>>,
168    root_node: Arc<RwLock<Node>>,
169    operation_name: RwLock<String>,
170}
171
172#[async_trait::async_trait]
173impl Extension for ApolloTracingExtension {
174    #[instrument(level = "debug", skip(self, ctx, next))]
175    async fn parse_query(
176        &self,
177        ctx: &ExtensionContext<'_>,
178        query: &str,
179        variables: &Variables,
180        next: NextParseQuery<'_>,
181    ) -> ServerResult<ExecutableDocument> {
182        let document = next.run(ctx, query, variables).await?;
183        let is_schema = document
184            .operations
185            .iter()
186            .filter(|(_, operation)| operation.node.ty == OperationType::Query)
187            .any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
188        if !is_schema {
189            let result: String =
190                ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::from_str("{}").unwrap()));
191            let name = document
192                .operations
193                .iter()
194                .next()
195                .and_then(|x| x.0)
196                .map(|x| x.as_str())
197                .unwrap_or("no_name");
198            let query_type = format!("# {name}\n {query}", name = name, query = result);
199            *self.operation_name.write().unwrap() = query_type;
200        }
201        Ok(document)
202    }
203
204    #[instrument(level = "debug", skip(self, ctx, next))]
205    async fn execute(
206        &self,
207        ctx: &ExtensionContext<'_>,
208        operation_name: Option<&str>,
209        next: NextExecute<'_>,
210    ) -> Response {
211        let start_time = Utc::now();
212        self.inner.lock().await.start_time = start_time;
213
214        let resp = next.run(ctx, operation_name).await;
215        // Here every responses are executed
216        // The next execute should aggregates a node a not a trace
217        let mut inner = self.inner.lock().await;
218        inner.end_time = Utc::now();
219
220        let tracing_extension = ctx
221            .data::<ApolloTracingDataExt>()
222            .ok()
223            .cloned()
224            .unwrap_or_default();
225
226        let client_name = tracing_extension
227            .client_name
228            .unwrap_or_else(|| "no client name".to_string());
229        let client_version = tracing_extension
230            .client_version
231            .unwrap_or_else(|| "no client version".to_string());
232        let method = tracing_extension
233            .method
234            .or(<Method as protobuf::Enum>::from_str("UNKNOWN"));
235        let status_code = tracing_extension.status_code.unwrap_or(0);
236
237        let mut trace: Trace = Trace {
238            client_name,
239            client_version,
240            duration_ns: (inner.end_time - inner.start_time)
241                .num_nanoseconds()
242                .map(|x| x.try_into().unwrap())
243                .unwrap_or(0),
244            ..Default::default()
245        };
246
247        trace.details = Some(trace::Details {
248            operation_name: operation_name
249                .map(|x| x.to_string())
250                .unwrap_or_else(|| "no operation".to_string()),
251            ..Default::default()
252        })
253        .into();
254
255        trace.http = Some(trace::HTTP {
256            method: EnumOrUnknown::new(method.unwrap()),
257            status_code,
258            ..Default::default()
259        })
260        .into();
261
262        trace.end_time = MessageField::some(Timestamp {
263            nanos: inner.end_time.timestamp_subsec_nanos().try_into().unwrap(),
264            seconds: inner.end_time.timestamp(),
265            special_fields: Default::default(),
266        });
267
268        trace.start_time =
269            protobuf::MessageField::some(protobuf::well_known_types::timestamp::Timestamp {
270                nanos: inner
271                    .start_time
272                    .timestamp_subsec_nanos()
273                    .try_into()
274                    .unwrap(),
275                seconds: inner.start_time.timestamp(),
276                special_fields: Default::default(),
277            });
278
279        let root_node = self.root_node.read().unwrap();
280        trace.root = Some(root_node.clone()).into();
281
282        let mut sender = self.report.sender();
283
284        let operation_name = self.operation_name.read().unwrap().clone();
285
286        let _handle = spawn(async move {
287            if let Err(e) = sender.send((operation_name, trace)).await {
288                error!(error = ?e);
289            }
290        });
291        resp
292    }
293
294    #[instrument(level = "debug", skip(self, ctx, info, next))]
295    async fn resolve(
296        &self,
297        ctx: &ExtensionContext<'_>,
298        info: ResolveInfo<'_>,
299        next: NextResolve<'_>,
300    ) -> ServerResult<Option<Value>> {
301        // We do create a node when it's invoked which we insert at the right place inside the
302        // struct.
303
304        let path = info.path_node.to_string_vec().join(".");
305        let field_name = info.path_node.field_name().to_string();
306        let parent_type = info.parent_type.to_string();
307        let _return_type = info.return_type.to_string();
308        let start_time = Utc::now() - self.inner.lock().await.start_time;
309        let path_node = info.path_node;
310
311        let node: Node = Node {
312            end_time: 0,
313            id: match path_node.segment {
314                QueryPathSegment::Name(name) => Some(node::Id::ResponseName(name.to_string())),
315                QueryPathSegment::Index(index) => {
316                    Some(node::Id::Index(index.try_into().unwrap_or(0)))
317                }
318            },
319            start_time: match start_time
320                .num_nanoseconds()
321                .and_then(|x| u64::try_from(x).ok())
322            {
323                Some(duration) => duration,
324                None => Utc::now()
325                    .timestamp_nanos_opt()
326                    .unwrap_or_default()
327                    .try_into()
328                    .unwrap_or_default(),
329            },
330            parent_type: parent_type.to_string(),
331            original_field_name: field_name,
332            ..Default::default()
333        };
334
335        let node = Arc::new(RwLock::new(node));
336        self.nodes.write().unwrap().insert(path, node.clone());
337        let parent_node = path_node.parent.map(|x| x.to_string_vec().join("."));
338        // Use the path to create a new node
339        // https://github.com/apollographql/apollo-server/blob/291c17e255122d4733b23177500188d68fac55ce/packages/apollo-server-core/src/plugin/traceTreeBuilder.ts
340        let res = match next.run(ctx, info).await {
341            Ok(res) => Ok(res),
342            Err(e) => {
343                let json = match serde_json::to_string(&e) {
344                    Ok(content) => content,
345                    Err(e) => format!("{{ \"error\": \"{e:?}\" }}"),
346                };
347                let error = trace::Error {
348                    message: e.message.clone(),
349                    location: e
350                        .locations
351                        .clone()
352                        .into_iter()
353                        .map(|x| trace::Location {
354                            line: x.line as u32,
355                            column: x.column as u32,
356                            special_fields: protobuf::SpecialFields::default(),
357                        })
358                        .collect(),
359                    json,
360                    ..Default::default()
361                };
362
363                node.write().unwrap().error = vec![error];
364                Err(e)
365            }
366        };
367        let end_time = Utc::now() - self.inner.lock().await.start_time;
368
369        node.write().unwrap().end_time = match end_time
370            .num_nanoseconds()
371            .and_then(|x| u64::try_from(x).ok())
372        {
373            Some(duration) => duration,
374            None => Utc::now()
375                .timestamp_nanos_opt()
376                .unwrap_or_default()
377                .try_into()
378                .unwrap_or_default(),
379        };
380
381        match parent_node {
382            None => {
383                let mut root_node = self.root_node.write().unwrap();
384                let child = &mut root_node.child;
385                let node = node.read().unwrap();
386                // Can't copy or pass a ref to Protobuf
387                // So we clone
388                child.push(node.clone());
389            }
390            Some(parent) => {
391                let nodes = self.nodes.read().unwrap();
392                let node_read = nodes.get(&parent).unwrap();
393                let mut parent = node_read.write().unwrap();
394                let child = &mut parent.child;
395                let node = node.read().unwrap();
396                // Can't copy or pass a ref to Protobuf
397                // So we clone
398                child.push(node.clone());
399            }
400        };
401
402        res
403    }
404}