Skip to main content

apollo_router/context/
mod.rs

1//! Provide a [`Context`] for the plugin chain of responsibilities.
2//!
3//! Router plugins accept a mutable [`Context`] when invoked and this contains a DashMap which
4//! allows additional data to be passed back and forth along the request invocation pipeline.
5
6use std::sync::Arc;
7use std::time::Duration;
8use std::time::Instant;
9
10use apollo_compiler::ExecutableDocument;
11use apollo_compiler::validation::Valid;
12use dashmap::DashMap;
13use dashmap::mapref::multiple::RefMulti;
14use dashmap::mapref::multiple::RefMutMulti;
15use derivative::Derivative;
16use extensions::sync::ExtensionsMutex;
17use parking_lot::Mutex;
18use serde::Deserialize;
19use serde::Serialize;
20use tower::BoxError;
21
22use crate::json_ext::Value;
23use crate::services::layers::query_analysis::ParsedDocument;
24
25pub(crate) mod extensions;
26
27/// The key of the resolved operation name. This is subject to change and should not be relied on.
28pub(crate) const OPERATION_NAME: &str = "operation_name";
29/// The key of the resolved operation kind. This is subject to change and should not be relied on.
30pub(crate) const OPERATION_KIND: &str = "operation_kind";
31/// The key to know if the response body contains at least 1 GraphQL error
32pub(crate) const CONTAINS_GRAPHQL_ERROR: &str = "apollo::telemetry::contains_graphql_error";
33
34/// Holds [`Context`] entries.
35pub(crate) type Entries = Arc<DashMap<String, Value>>;
36
37/// A map of arbitrary JSON values, for use by plugins.
38///
39/// Context makes use of [`DashMap`] under the hood which tries to handle concurrency
40/// by allowing concurrency across threads without requiring locking. This is great
41/// for usability but could lead to surprises when updates are highly contested.
42///
43/// Within the router, contention is likely to be highest within plugins which
44/// provide [`crate::services::SubgraphRequest`] or
45/// [`crate::services::SubgraphResponse`] processing. At such times,
46/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
47/// functions to minimise the possibility of mis-sequenced updates.
48#[derive(Clone, Deserialize, Serialize, Derivative)]
49#[serde(default)]
50#[derivative(Debug)]
51pub struct Context {
52    // Allows adding custom entries to the context.
53    entries: Entries,
54
55    #[serde(skip)]
56    extensions: ExtensionsMutex,
57
58    /// Creation time
59    #[serde(skip)]
60    pub(crate) created_at: Instant,
61
62    #[serde(skip)]
63    #[derivative(Debug = "ignore")]
64    busy_timer: Arc<Mutex<BusyTimer>>,
65
66    #[serde(skip)]
67    pub(crate) id: String,
68}
69
70impl Context {
71    /// Create a new context.
72    pub fn new() -> Self {
73        let id = uuid::Uuid::new_v4()
74            .as_hyphenated()
75            .encode_lower(&mut uuid::Uuid::encode_buffer())
76            .to_string();
77        Context {
78            entries: Default::default(),
79            extensions: ExtensionsMutex::default(),
80            created_at: Instant::now(),
81            busy_timer: Arc::new(Mutex::new(BusyTimer::new())),
82            id,
83        }
84    }
85}
86
87impl Context {
88    /// Returns extensions of the context.
89    ///
90    /// You can use `Extensions` to pass data between plugins that is not serializable. Such data is not accessible from Rhai or co-processoers.
91    ///
92    /// It is CRITICAL to avoid holding on to the mutex guard for too long, particularly across async calls.
93    /// Doing so may cause performance degradation or even deadlocks.
94    ///
95    /// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
96    pub fn extensions(&self) -> &ExtensionsMutex {
97        &self.extensions
98    }
99
100    /// Returns true if the context contains a value for the specified key.
101    pub fn contains_key<K>(&self, key: K) -> bool
102    where
103        K: Into<String>,
104    {
105        self.entries.contains_key(&key.into())
106    }
107
108    /// Get a value from the context using the provided key.
109    ///
110    /// Semantics:
111    ///  - If the operation fails, that's because we can't deserialize the value.
112    ///  - If the operation succeeds, the value is an [`Option`].
113    pub fn get<K, V>(&self, key: K) -> Result<Option<V>, BoxError>
114    where
115        K: Into<String>,
116        V: for<'de> serde::Deserialize<'de>,
117    {
118        self.entries
119            .get(&key.into())
120            .map(|v| serde_json_bytes::from_value(v.value().clone()))
121            .transpose()
122            .map_err(|e| e.into())
123    }
124
125    /// Insert a value int the context using the provided key and value.
126    ///
127    /// Semantics:
128    ///  - If the operation fails, then the pair has not been inserted.
129    ///  - If the operation succeeds, the result is the old value as an [`Option`].
130    pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<V>, BoxError>
131    where
132        K: Into<String>,
133        V: for<'de> serde::Deserialize<'de> + Serialize,
134    {
135        match serde_json_bytes::to_value(value) {
136            Ok(value) => self
137                .entries
138                .insert(key.into(), value)
139                .map(|v| serde_json_bytes::from_value(v))
140                .transpose()
141                .map_err(|e| e.into()),
142            Err(e) => Err(e.into()),
143        }
144    }
145
146    /// Insert a value in the context using the provided key and value.
147    ///
148    /// Semantics: the result is the old value as an [`Option`].
149    pub fn insert_json_value<K>(&self, key: K, value: Value) -> Option<Value>
150    where
151        K: Into<String>,
152    {
153        self.entries.insert(key.into(), value)
154    }
155
156    /// Get a json value from the context using the provided key.
157    pub fn get_json_value<K>(&self, key: K) -> Option<Value>
158    where
159        K: Into<String>,
160    {
161        self.entries.get(&key.into()).map(|v| v.value().clone())
162    }
163
164    /// Upsert a value in the context using the provided key and resolving
165    /// function.
166    ///
167    /// The resolving function must yield a value to be used in the context. It
168    /// is provided with the current value to use in evaluating which value to
169    /// yield.
170    ///
171    /// Semantics:
172    ///  - If the operation fails, then the pair has not been inserted (or a current
173    ///    value updated).
174    ///  - If the operation succeeds, the pair have either updated an existing value
175    ///    or been inserted.
176    pub fn upsert<K, V>(&self, key: K, upsert: impl FnOnce(V) -> V) -> Result<(), BoxError>
177    where
178        K: Into<String>,
179        V: for<'de> serde::Deserialize<'de> + Serialize + Default,
180    {
181        let key = key.into();
182        self.entries
183            .entry(key.clone())
184            .or_try_insert_with(|| serde_json_bytes::to_value::<V>(Default::default()))?;
185        let mut result = Ok(());
186        self.entries
187            .alter(&key, |_, v| match serde_json_bytes::from_value(v.clone()) {
188                Ok(value) => match serde_json_bytes::to_value((upsert)(value)) {
189                    Ok(value) => value,
190                    Err(e) => {
191                        result = Err(e);
192                        v
193                    }
194                },
195                Err(e) => {
196                    result = Err(e);
197                    v
198                }
199            });
200        result.map_err(|e| e.into())
201    }
202
203    /// Upsert a JSON value in the context using the provided key and resolving
204    /// function.
205    ///
206    /// The resolving function must yield a value to be used in the context. It
207    /// is provided with the current value to use in evaluating which value to
208    /// yield.
209    pub(crate) fn upsert_json_value<K>(&self, key: K, upsert: impl FnOnce(Value) -> Value)
210    where
211        K: Into<String>,
212    {
213        let key = key.into();
214        self.entries.entry(key.clone()).or_insert(Value::Null);
215        self.entries.alter(&key, |_, v| upsert(v));
216    }
217
218    /// Convert the context into an iterator.
219    pub(crate) fn try_into_iter(
220        self,
221    ) -> Result<impl IntoIterator<Item = (String, Value)>, BoxError> {
222        Ok(Arc::try_unwrap(self.entries)
223            .map_err(|_e| anyhow::anyhow!("cannot take ownership of dashmap"))?
224            .into_iter())
225    }
226
227    /// Iterate over the entries.
228    pub fn iter(&self) -> impl Iterator<Item = RefMulti<'_, String, Value>> + '_ {
229        self.entries.iter()
230    }
231
232    /// Iterate mutably over the entries.
233    pub fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, String, Value>> + '_ {
234        self.entries.iter_mut()
235    }
236
237    /// Notify the busy timer that we're waiting on a network request
238    ///
239    /// When a plugin makes a network call that would block request handling, this
240    /// indicates to the processing time counter that it should stop measuring while
241    /// we wait for the call to finish. When the value returned by this method is
242    /// dropped, the router will start measuring again, unless we are still covered
243    /// by another active request (ex: parallel subgraph calls)
244    pub fn enter_active_request(&self) -> BusyTimerGuard {
245        self.busy_timer.lock().increment_active_requests();
246        BusyTimerGuard {
247            busy_timer: self.busy_timer.clone(),
248        }
249    }
250
251    /// Time actually spent working on this request
252    ///
253    /// This is the request duration without the time spent waiting for external calls
254    /// (coprocessor and subgraph requests). This metric is an approximation of
255    /// the time spent, because in the case of parallel subgraph calls, some
256    /// router processing time could happen during a network call (and so would
257    /// not be accounted for) and make another task late.
258    /// This is reported under the `apollo_router_processing_time` metric
259    pub fn busy_time(&self) -> Duration {
260        self.busy_timer.lock().current()
261    }
262
263    pub(crate) fn extend(&self, other: &Context) {
264        for kv in other.entries.iter() {
265            self.entries.insert(kv.key().clone(), kv.value().clone());
266        }
267    }
268
269    /// Read only access to the executable document. This is UNSTABLE and may be changed or removed in future router releases.
270    /// In addition, ExecutableDocument is UNSTABLE, and may be changed or removed in future apollo-rs releases.
271    #[doc(hidden)]
272    pub fn unsupported_executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
273        self.extensions()
274            .with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
275    }
276}
277
278pub struct BusyTimerGuard {
279    busy_timer: Arc<Mutex<BusyTimer>>,
280}
281
282impl Drop for BusyTimerGuard {
283    fn drop(&mut self) {
284        self.busy_timer.lock().decrement_active_requests()
285    }
286}
287
288impl Default for Context {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294/// Measures the total overhead of the router
295///
296/// This works by measuring the time spent executing when there is no active subgraph request.
297/// This is still not a perfect solution, there are cases where preprocessing a subgraph request
298/// happens while another one is running and still shifts the end of the span, but for now this
299/// should serve as a reasonable solution without complex post processing of spans
300pub(crate) struct BusyTimer {
301    active_requests: u32,
302    busy_ns: Duration,
303    start: Option<Instant>,
304}
305
306impl BusyTimer {
307    fn new() -> Self {
308        BusyTimer::default()
309    }
310
311    fn increment_active_requests(&mut self) {
312        if self.active_requests == 0 {
313            if let Some(start) = self.start.take() {
314                self.busy_ns += start.elapsed();
315            }
316            self.start = None;
317        }
318
319        self.active_requests += 1;
320    }
321
322    fn decrement_active_requests(&mut self) {
323        self.active_requests -= 1;
324
325        if self.active_requests == 0 {
326            self.start = Some(Instant::now());
327        }
328    }
329
330    fn current(&mut self) -> Duration {
331        if let Some(start) = self.start {
332            self.busy_ns + start.elapsed()
333        } else {
334            self.busy_ns
335        }
336    }
337}
338
339impl Default for BusyTimer {
340    fn default() -> Self {
341        Self {
342            active_requests: 0,
343            busy_ns: Duration::new(0, 0),
344            start: Some(Instant::now()),
345        }
346    }
347}
348
349#[cfg(test)]
350mod test {
351    use crate::Configuration;
352    use crate::Context;
353    use crate::spec::Query;
354    use crate::spec::Schema;
355
356    #[test]
357    fn test_context_insert() {
358        let c = Context::new();
359        assert!(c.insert("key1", 1).is_ok());
360        assert_eq!(c.get("key1").unwrap(), Some(1));
361    }
362
363    #[test]
364    fn test_context_overwrite() {
365        let c = Context::new();
366        assert!(c.insert("overwrite", 2).is_ok());
367        assert!(c.insert("overwrite", 3).is_ok());
368        assert_eq!(c.get("overwrite").unwrap(), Some(3));
369    }
370
371    #[test]
372    fn test_context_upsert() {
373        let c = Context::new();
374        assert!(c.insert("present", 1).is_ok());
375        assert!(c.upsert("present", |v: usize| v + 1).is_ok());
376        assert_eq!(c.get("present").unwrap(), Some(2));
377        assert!(c.upsert("not_present", |v: usize| v + 1).is_ok());
378        assert_eq!(c.get("not_present").unwrap(), Some(1));
379    }
380
381    #[test]
382    fn test_context_marshall_errors() {
383        let c = Context::new();
384        assert!(c.insert("string", "Some value".to_string()).is_ok());
385        assert!(c.upsert("string", |v: usize| v + 1).is_err());
386    }
387
388    #[test]
389    fn it_iterates_over_context() {
390        let c = Context::new();
391        assert!(c.insert("one", 1).is_ok());
392        assert!(c.insert("two", 2).is_ok());
393        assert_eq!(c.iter().count(), 2);
394        assert_eq!(
395            c.iter()
396                // Fiddly because of the conversion from bytes to usize, but ...
397                .map(|r| serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap())
398                .sum::<usize>(),
399            3
400        );
401    }
402
403    #[test]
404    fn it_iterates_mutably_over_context() {
405        let c = Context::new();
406        assert!(c.insert("one", 1usize).is_ok());
407        assert!(c.insert("two", 2usize).is_ok());
408        assert_eq!(c.iter().count(), 2);
409        c.iter_mut().for_each(|mut r| {
410            // Fiddly because of the conversion from bytes to usize, but ...
411            let new: usize = serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap() + 1;
412            *r = new.into();
413        });
414        assert_eq!(c.get("one").unwrap(), Some(2));
415        assert_eq!(c.get("two").unwrap(), Some(3));
416    }
417
418    #[test]
419    fn context_extensions() {
420        // This is mostly tested in the extensions module.
421        let c = Context::new();
422        c.extensions().with_lock(|mut lock| lock.insert(1usize));
423        let v = c
424            .extensions()
425            .with_lock(|lock| lock.get::<usize>().cloned());
426        assert_eq!(v, Some(1usize));
427    }
428
429    #[test]
430    fn test_executable_document_access() {
431        let c = Context::new();
432        let schema = include_str!("../testdata/minimal_supergraph.graphql");
433        let schema = Schema::parse(schema, &Default::default()).unwrap();
434        let document =
435            Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
436        assert!(c.unsupported_executable_document().is_none());
437        c.extensions().with_lock(|mut lock| lock.insert(document));
438        assert!(c.unsupported_executable_document().is_some());
439    }
440}