Skip to main content

apollo_router/context/
mod.rs

1//! Provide a [`Context`] for the plugin chain of responsibilities.
2//!
3//! Router plugins accept a mutable [`Context`] when invoked and this contains a DashMap which
4//! allows additional data to be passed back and forth along the request invocation pipeline.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use apollo_compiler::ExecutableDocument;
10use apollo_compiler::validation::Valid;
11use dashmap::DashMap;
12use dashmap::mapref::multiple::RefMulti;
13use dashmap::mapref::multiple::RefMutMulti;
14use derivative::Derivative;
15use extensions::sync::ExtensionsMutex;
16use serde::Deserialize;
17use serde::Serialize;
18use tower::BoxError;
19
20use crate::json_ext::Value;
21use crate::services::layers::query_analysis::ParsedDocument;
22
23pub(crate) mod deprecated;
24pub(crate) mod extensions;
25
26/// Context key for the operation name.
27pub(crate) const OPERATION_NAME: &str = "apollo::supergraph::operation_name";
28/// Context key for the operation kind.
29pub(crate) const OPERATION_KIND: &str = "apollo::supergraph::operation_kind";
30/// Context key for the persisted query ID.
31pub(crate) const PERSISTED_QUERY_ID: &str = "apollo::supergraph::persisted_query_id";
32/// The key to know if the response body contains at least 1 GraphQL error. This value is sticky:
33/// once set to true for any chunk in a deferred response stream, it remains true for the lifetime
34/// of the request.
35pub(crate) const CONTAINS_GRAPHQL_ERROR: &str = "apollo::telemetry::contains_graphql_error";
36/// The key to know if the *current* stream chunk contains GraphQL errors. Updated per-chunk (not
37/// sticky) by the supergraph `check_for_errors` stream map so that the router layer can evaluate
38/// per-chunk conditions without parsing raw bytes. This is safe despite streaming because Rust
39/// async streams are lazy and pull-based: when the router layer pulls a bytes chunk, it drives the
40/// entire upstream chain synchronously — `check_for_errors` sets this key on the graphql::Response
41/// before it is serialized to bytes and returned to the router layer, so the key is always current
42/// for the chunk being processed.
43pub(crate) const CHUNK_CONTAINS_GRAPHQL_ERROR: &str =
44    "apollo::telemetry::chunk_contains_graphql_error";
45/// The key to a map of errors that were already counted in a previous layer. This is subject to
46/// change and is NOT supported for user access.
47pub(crate) const COUNTED_ERRORS: &str = "apollo::telemetry::counted_errors";
48/// The key for the full list of errors in the router response. This allows us to pull the value in
49/// plugins without having to deserialize the router response. This is subject to change and is NOT
50/// supported for user access.
51pub(crate) const ROUTER_RESPONSE_ERRORS: &str = "apollo::router::response_errors";
52
53pub(crate) use deprecated::context_key_from_deprecated;
54pub(crate) use deprecated::context_key_to_deprecated;
55
56/// Holds [`Context`] entries.
57pub(crate) type Entries = Arc<DashMap<String, Value>>;
58
59/// A map of arbitrary JSON values, for use by plugins.
60///
61/// Context makes use of [`DashMap`] under the hood which tries to handle concurrency
62/// by allowing concurrency across threads without requiring locking. This is great
63/// for usability but could lead to surprises when updates are highly contested.
64///
65/// Within the router, contention is likely to be highest within plugins which
66/// provide [`crate::services::SubgraphRequest`] or
67/// [`crate::services::SubgraphResponse`] processing. At such times,
68/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
69/// functions to minimise the possibility of mis-sequenced updates.
70#[derive(Clone, Deserialize, Serialize, Derivative)]
71#[serde(default)]
72#[derivative(Debug)]
73pub struct Context {
74    // Allows adding custom entries to the context.
75    entries: Entries,
76
77    #[serde(skip)]
78    extensions: ExtensionsMutex,
79
80    /// Creation time
81    #[serde(skip)]
82    pub(crate) created_at: Instant,
83
84    #[serde(skip)]
85    pub(crate) id: String,
86}
87
88impl Context {
89    /// Create a new context.
90    pub fn new() -> Self {
91        let id = uuid::Uuid::new_v4()
92            .as_hyphenated()
93            .encode_lower(&mut uuid::Uuid::encode_buffer())
94            .to_string();
95        Context {
96            entries: Default::default(),
97            extensions: ExtensionsMutex::default(),
98            created_at: Instant::now(),
99            id,
100        }
101    }
102}
103
104impl FromIterator<(String, Value)> for Context {
105    fn from_iter<T: IntoIterator<Item = (String, Value)>>(iter: T) -> Self {
106        Self {
107            entries: Arc::new(DashMap::from_iter(iter)),
108            extensions: ExtensionsMutex::default(),
109            created_at: Instant::now(),
110            id: String::new(),
111        }
112    }
113}
114
115impl Context {
116    /// Returns extensions of the context.
117    ///
118    /// You can use `Extensions` to pass data between plugins that is not serializable. Such data is not accessible from Rhai or co-processoers.
119    ///
120    /// It is CRITICAL to avoid holding on to the mutex guard for too long, particularly across async calls.
121    /// Doing so may cause performance degradation or even deadlocks.
122    ///
123    /// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
124    pub fn extensions(&self) -> &ExtensionsMutex {
125        &self.extensions
126    }
127
128    /// Returns true if the context contains a value for the specified key.
129    pub fn contains_key<K>(&self, key: K) -> bool
130    where
131        K: Into<String>,
132    {
133        self.entries.contains_key(&key.into())
134    }
135
136    /// Get a value from the context using the provided key.
137    ///
138    /// Semantics:
139    ///  - If the operation fails, that's because we can't deserialize the value.
140    ///  - If the operation succeeds, the value is an [`Option`].
141    pub fn get<K, V>(&self, key: K) -> Result<Option<V>, BoxError>
142    where
143        K: Into<String>,
144        V: for<'de> serde::Deserialize<'de>,
145    {
146        self.entries
147            .get(&key.into())
148            .map(|v| serde_json_bytes::from_value(v.value().clone()))
149            .transpose()
150            .map_err(|e| e.into())
151    }
152
153    /// Insert a value int the context using the provided key and value.
154    ///
155    /// Semantics:
156    ///  - If the operation fails, then the pair has not been inserted.
157    ///  - If the operation succeeds, the result is the old value as an [`Option`].
158    pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<V>, BoxError>
159    where
160        K: Into<String>,
161        V: for<'de> serde::Deserialize<'de> + Serialize,
162    {
163        match serde_json_bytes::to_value(value) {
164            Ok(value) => self
165                .entries
166                .insert(key.into(), value)
167                .map(|v| serde_json_bytes::from_value(v))
168                .transpose()
169                .map_err(|e| e.into()),
170            Err(e) => Err(e.into()),
171        }
172    }
173
174    /// Insert a value in the context using the provided key and value.
175    ///
176    /// Semantics: the result is the old value as an [`Option`].
177    pub fn insert_json_value<K>(&self, key: K, value: Value) -> Option<Value>
178    where
179        K: Into<String>,
180    {
181        self.entries.insert(key.into(), value)
182    }
183
184    /// Get a json value from the context using the provided key.
185    pub fn get_json_value<K>(&self, key: K) -> Option<Value>
186    where
187        K: Into<String>,
188    {
189        self.entries.get(&key.into()).map(|v| v.value().clone())
190    }
191
192    /// Upsert a value in the context using the provided key and resolving
193    /// function.
194    ///
195    /// The resolving function must yield a value to be used in the context. It
196    /// is provided with the current value to use in evaluating which value to
197    /// yield.
198    ///
199    /// Semantics:
200    ///  - If the operation fails, then the pair has not been inserted (or a current
201    ///    value updated).
202    ///  - If the operation succeeds, the pair have either updated an existing value
203    ///    or been inserted.
204    pub fn upsert<K, V>(&self, key: K, upsert: impl FnOnce(V) -> V) -> Result<(), BoxError>
205    where
206        K: Into<String>,
207        V: for<'de> serde::Deserialize<'de> + Serialize + Default,
208    {
209        let key = key.into();
210        self.entries
211            .entry(key.clone())
212            .or_try_insert_with(|| serde_json_bytes::to_value::<V>(Default::default()))?;
213        let mut result = Ok(());
214        self.entries
215            .alter(&key, |_, v| match serde_json_bytes::from_value(v.clone()) {
216                Ok(value) => match serde_json_bytes::to_value((upsert)(value)) {
217                    Ok(value) => value,
218                    Err(e) => {
219                        result = Err(e);
220                        v
221                    }
222                },
223                Err(e) => {
224                    result = Err(e);
225                    v
226                }
227            });
228        result.map_err(|e| e.into())
229    }
230
231    /// Upsert a JSON value in the context using the provided key and resolving
232    /// function.
233    ///
234    /// The resolving function must yield a value to be used in the context. It
235    /// is provided with the current value to use in evaluating which value to
236    /// yield.
237    pub(crate) fn upsert_json_value<K>(&self, key: K, upsert: impl FnOnce(Value) -> Value)
238    where
239        K: Into<String>,
240    {
241        let key = key.into();
242        self.entries.entry(key.clone()).or_insert(Value::Null);
243        self.entries.alter(&key, |_, v| upsert(v));
244    }
245
246    /// Convert the context into an iterator.
247    pub(crate) fn try_into_iter(
248        self,
249    ) -> Result<impl IntoIterator<Item = (String, Value)>, BoxError> {
250        Ok(Arc::try_unwrap(self.entries)
251            .map_err(|_e| anyhow::anyhow!("cannot take ownership of dashmap"))?
252            .into_iter())
253    }
254
255    /// Iterate over the entries.
256    pub fn iter(&self) -> impl Iterator<Item = RefMulti<'_, String, Value>> + '_ {
257        self.entries.iter()
258    }
259
260    /// Iterate mutably over the entries.
261    pub fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, String, Value>> + '_ {
262        self.entries.iter_mut()
263    }
264
265    pub(crate) fn extend(&self, other: &Context) {
266        for kv in other.entries.iter() {
267            self.entries.insert(kv.key().clone(), kv.value().clone());
268        }
269    }
270
271    pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) {
272        self.entries.retain(|k, v| f(k, v));
273    }
274
275    pub(crate) fn len(&self) -> usize {
276        self.entries.len()
277    }
278
279    /// Read only access to the executable document for internal router plugins.
280    pub(crate) fn executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
281        self.extensions()
282            .with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
283    }
284}
285
286impl Default for Context {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292#[cfg(test)]
293mod test {
294    use crate::Configuration;
295    use crate::Context;
296    use crate::spec::Query;
297    use crate::spec::Schema;
298
299    #[test]
300    fn test_context_insert() {
301        let c = Context::new();
302        assert!(c.insert("key1", 1).is_ok());
303        assert_eq!(c.get("key1").unwrap(), Some(1));
304    }
305
306    #[test]
307    fn test_context_overwrite() {
308        let c = Context::new();
309        assert!(c.insert("overwrite", 2).is_ok());
310        assert!(c.insert("overwrite", 3).is_ok());
311        assert_eq!(c.get("overwrite").unwrap(), Some(3));
312    }
313
314    #[test]
315    fn test_context_upsert() {
316        let c = Context::new();
317        assert!(c.insert("present", 1).is_ok());
318        assert!(c.upsert("present", |v: usize| v + 1).is_ok());
319        assert_eq!(c.get("present").unwrap(), Some(2));
320        assert!(c.upsert("not_present", |v: usize| v + 1).is_ok());
321        assert_eq!(c.get("not_present").unwrap(), Some(1));
322    }
323
324    #[test]
325    fn test_context_marshall_errors() {
326        let c = Context::new();
327        assert!(c.insert("string", "Some value".to_string()).is_ok());
328        assert!(c.upsert("string", |v: usize| v + 1).is_err());
329    }
330
331    #[test]
332    fn it_iterates_over_context() {
333        let c = Context::new();
334        assert!(c.insert("one", 1).is_ok());
335        assert!(c.insert("two", 2).is_ok());
336        assert_eq!(c.iter().count(), 2);
337        assert_eq!(
338            c.iter()
339                // Fiddly because of the conversion from bytes to usize, but ...
340                .map(|r| serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap())
341                .sum::<usize>(),
342            3
343        );
344    }
345
346    #[test]
347    fn it_iterates_mutably_over_context() {
348        let c = Context::new();
349        assert!(c.insert("one", 1usize).is_ok());
350        assert!(c.insert("two", 2usize).is_ok());
351        assert_eq!(c.iter().count(), 2);
352        c.iter_mut().for_each(|mut r| {
353            // Fiddly because of the conversion from bytes to usize, but ...
354            let new: usize = serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap() + 1;
355            *r = new.into();
356        });
357        assert_eq!(c.get("one").unwrap(), Some(2));
358        assert_eq!(c.get("two").unwrap(), Some(3));
359    }
360
361    #[test]
362    fn context_extensions() {
363        // This is mostly tested in the extensions module.
364        let c = Context::new();
365        c.extensions().with_lock(|lock| lock.insert(1usize));
366        let v = c
367            .extensions()
368            .with_lock(|lock| lock.get::<usize>().cloned());
369        assert_eq!(v, Some(1usize));
370    }
371
372    #[test]
373    fn test_executable_document_access() {
374        let c = Context::new();
375        let schema = include_str!("../testdata/minimal_supergraph.graphql");
376        let schema = Schema::parse(schema, &Default::default()).unwrap();
377        let document =
378            Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
379        assert!(c.executable_document().is_none());
380        c.extensions().with_lock(|lock| lock.insert(document));
381        assert!(c.executable_document().is_some());
382    }
383}