apollo_router/context/
mod.rs

1//! Provide a [`Context`] for the plugin chain of responsibilities.
2//!
3//! Router plugins accept a mutable [`Context`] when invoked and this contains a DashMap which
4//! allows additional data to be passed back and forth along the request invocation pipeline.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use apollo_compiler::ExecutableDocument;
10use apollo_compiler::validation::Valid;
11use dashmap::DashMap;
12use dashmap::mapref::multiple::RefMulti;
13use dashmap::mapref::multiple::RefMutMulti;
14use derivative::Derivative;
15use extensions::sync::ExtensionsMutex;
16use serde::Deserialize;
17use serde::Serialize;
18use tower::BoxError;
19
20use crate::json_ext::Value;
21use crate::services::layers::query_analysis::ParsedDocument;
22
23pub(crate) mod deprecated;
24pub(crate) mod extensions;
25
26/// Context key for the operation name.
27pub(crate) const OPERATION_NAME: &str = "apollo::supergraph::operation_name";
28/// Context key for the operation kind.
29pub(crate) const OPERATION_KIND: &str = "apollo::supergraph::operation_kind";
30/// The key to know if the response body contains at least 1 GraphQL error
31pub(crate) const CONTAINS_GRAPHQL_ERROR: &str = "apollo::telemetry::contains_graphql_error";
32/// The key to a map of errors that were already counted in a previous layer. This is subject to
33/// change and is NOT supported for user access.
34pub(crate) const COUNTED_ERRORS: &str = "apollo::telemetry::counted_errors";
35/// The key for the full list of errors in the router response. This allows us to pull the value in
36/// plugins without having to deserialize the router response. This is subject to change and is NOT
37/// supported for user access.
38pub(crate) const ROUTER_RESPONSE_ERRORS: &str = "apollo::router::response_errors";
39
40pub(crate) use deprecated::context_key_from_deprecated;
41pub(crate) use deprecated::context_key_to_deprecated;
42
43/// Holds [`Context`] entries.
44pub(crate) type Entries = Arc<DashMap<String, Value>>;
45
46/// A map of arbitrary JSON values, for use by plugins.
47///
48/// Context makes use of [`DashMap`] under the hood which tries to handle concurrency
49/// by allowing concurrency across threads without requiring locking. This is great
50/// for usability but could lead to surprises when updates are highly contested.
51///
52/// Within the router, contention is likely to be highest within plugins which
53/// provide [`crate::services::SubgraphRequest`] or
54/// [`crate::services::SubgraphResponse`] processing. At such times,
55/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
56/// functions to minimise the possibility of mis-sequenced updates.
57#[derive(Clone, Deserialize, Serialize, Derivative)]
58#[serde(default)]
59#[derivative(Debug)]
60pub struct Context {
61    // Allows adding custom entries to the context.
62    entries: Entries,
63
64    #[serde(skip)]
65    extensions: ExtensionsMutex,
66
67    /// Creation time
68    #[serde(skip)]
69    pub(crate) created_at: Instant,
70
71    #[serde(skip)]
72    pub(crate) id: String,
73}
74
75impl Context {
76    /// Create a new context.
77    pub fn new() -> Self {
78        let id = uuid::Uuid::new_v4()
79            .as_hyphenated()
80            .encode_lower(&mut uuid::Uuid::encode_buffer())
81            .to_string();
82        Context {
83            entries: Default::default(),
84            extensions: ExtensionsMutex::default(),
85            created_at: Instant::now(),
86            id,
87        }
88    }
89}
90
91impl FromIterator<(String, Value)> for Context {
92    fn from_iter<T: IntoIterator<Item = (String, Value)>>(iter: T) -> Self {
93        Self {
94            entries: Arc::new(DashMap::from_iter(iter)),
95            extensions: ExtensionsMutex::default(),
96            created_at: Instant::now(),
97            id: String::new(),
98        }
99    }
100}
101
102impl Context {
103    /// Returns extensions of the context.
104    ///
105    /// You can use `Extensions` to pass data between plugins that is not serializable. Such data is not accessible from Rhai or co-processoers.
106    ///
107    /// It is CRITICAL to avoid holding on to the mutex guard for too long, particularly across async calls.
108    /// Doing so may cause performance degradation or even deadlocks.
109    ///
110    /// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
111    pub fn extensions(&self) -> &ExtensionsMutex {
112        &self.extensions
113    }
114
115    /// Returns true if the context contains a value for the specified key.
116    pub fn contains_key<K>(&self, key: K) -> bool
117    where
118        K: Into<String>,
119    {
120        self.entries.contains_key(&key.into())
121    }
122
123    /// Get a value from the context using the provided key.
124    ///
125    /// Semantics:
126    ///  - If the operation fails, that's because we can't deserialize the value.
127    ///  - If the operation succeeds, the value is an [`Option`].
128    pub fn get<K, V>(&self, key: K) -> Result<Option<V>, BoxError>
129    where
130        K: Into<String>,
131        V: for<'de> serde::Deserialize<'de>,
132    {
133        self.entries
134            .get(&key.into())
135            .map(|v| serde_json_bytes::from_value(v.value().clone()))
136            .transpose()
137            .map_err(|e| e.into())
138    }
139
140    /// Insert a value int the context using the provided key and value.
141    ///
142    /// Semantics:
143    ///  - If the operation fails, then the pair has not been inserted.
144    ///  - If the operation succeeds, the result is the old value as an [`Option`].
145    pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<V>, BoxError>
146    where
147        K: Into<String>,
148        V: for<'de> serde::Deserialize<'de> + Serialize,
149    {
150        match serde_json_bytes::to_value(value) {
151            Ok(value) => self
152                .entries
153                .insert(key.into(), value)
154                .map(|v| serde_json_bytes::from_value(v))
155                .transpose()
156                .map_err(|e| e.into()),
157            Err(e) => Err(e.into()),
158        }
159    }
160
161    /// Insert a value in the context using the provided key and value.
162    ///
163    /// Semantics: the result is the old value as an [`Option`].
164    pub fn insert_json_value<K>(&self, key: K, value: Value) -> Option<Value>
165    where
166        K: Into<String>,
167    {
168        self.entries.insert(key.into(), value)
169    }
170
171    /// Get a json value from the context using the provided key.
172    pub fn get_json_value<K>(&self, key: K) -> Option<Value>
173    where
174        K: Into<String>,
175    {
176        self.entries.get(&key.into()).map(|v| v.value().clone())
177    }
178
179    /// Upsert a value in the context using the provided key and resolving
180    /// function.
181    ///
182    /// The resolving function must yield a value to be used in the context. It
183    /// is provided with the current value to use in evaluating which value to
184    /// yield.
185    ///
186    /// Semantics:
187    ///  - If the operation fails, then the pair has not been inserted (or a current
188    ///    value updated).
189    ///  - If the operation succeeds, the pair have either updated an existing value
190    ///    or been inserted.
191    pub fn upsert<K, V>(&self, key: K, upsert: impl FnOnce(V) -> V) -> Result<(), BoxError>
192    where
193        K: Into<String>,
194        V: for<'de> serde::Deserialize<'de> + Serialize + Default,
195    {
196        let key = key.into();
197        self.entries
198            .entry(key.clone())
199            .or_try_insert_with(|| serde_json_bytes::to_value::<V>(Default::default()))?;
200        let mut result = Ok(());
201        self.entries
202            .alter(&key, |_, v| match serde_json_bytes::from_value(v.clone()) {
203                Ok(value) => match serde_json_bytes::to_value((upsert)(value)) {
204                    Ok(value) => value,
205                    Err(e) => {
206                        result = Err(e);
207                        v
208                    }
209                },
210                Err(e) => {
211                    result = Err(e);
212                    v
213                }
214            });
215        result.map_err(|e| e.into())
216    }
217
218    /// Upsert a JSON value in the context using the provided key and resolving
219    /// function.
220    ///
221    /// The resolving function must yield a value to be used in the context. It
222    /// is provided with the current value to use in evaluating which value to
223    /// yield.
224    pub(crate) fn upsert_json_value<K>(&self, key: K, upsert: impl FnOnce(Value) -> Value)
225    where
226        K: Into<String>,
227    {
228        let key = key.into();
229        self.entries.entry(key.clone()).or_insert(Value::Null);
230        self.entries.alter(&key, |_, v| upsert(v));
231    }
232
233    /// Convert the context into an iterator.
234    pub(crate) fn try_into_iter(
235        self,
236    ) -> Result<impl IntoIterator<Item = (String, Value)>, BoxError> {
237        Ok(Arc::try_unwrap(self.entries)
238            .map_err(|_e| anyhow::anyhow!("cannot take ownership of dashmap"))?
239            .into_iter())
240    }
241
242    /// Iterate over the entries.
243    pub fn iter(&self) -> impl Iterator<Item = RefMulti<'_, String, Value>> + '_ {
244        self.entries.iter()
245    }
246
247    /// Iterate mutably over the entries.
248    pub fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, String, Value>> + '_ {
249        self.entries.iter_mut()
250    }
251
252    pub(crate) fn extend(&self, other: &Context) {
253        for kv in other.entries.iter() {
254            self.entries.insert(kv.key().clone(), kv.value().clone());
255        }
256    }
257
258    pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) {
259        self.entries.retain(|k, v| f(k, v));
260    }
261
262    pub(crate) fn len(&self) -> usize {
263        self.entries.len()
264    }
265
266    /// Read only access to the executable document for internal router plugins.
267    pub(crate) fn executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
268        self.extensions()
269            .with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
270    }
271}
272
273impl Default for Context {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279#[cfg(test)]
280mod test {
281    use crate::Configuration;
282    use crate::Context;
283    use crate::spec::Query;
284    use crate::spec::Schema;
285
286    #[test]
287    fn test_context_insert() {
288        let c = Context::new();
289        assert!(c.insert("key1", 1).is_ok());
290        assert_eq!(c.get("key1").unwrap(), Some(1));
291    }
292
293    #[test]
294    fn test_context_overwrite() {
295        let c = Context::new();
296        assert!(c.insert("overwrite", 2).is_ok());
297        assert!(c.insert("overwrite", 3).is_ok());
298        assert_eq!(c.get("overwrite").unwrap(), Some(3));
299    }
300
301    #[test]
302    fn test_context_upsert() {
303        let c = Context::new();
304        assert!(c.insert("present", 1).is_ok());
305        assert!(c.upsert("present", |v: usize| v + 1).is_ok());
306        assert_eq!(c.get("present").unwrap(), Some(2));
307        assert!(c.upsert("not_present", |v: usize| v + 1).is_ok());
308        assert_eq!(c.get("not_present").unwrap(), Some(1));
309    }
310
311    #[test]
312    fn test_context_marshall_errors() {
313        let c = Context::new();
314        assert!(c.insert("string", "Some value".to_string()).is_ok());
315        assert!(c.upsert("string", |v: usize| v + 1).is_err());
316    }
317
318    #[test]
319    fn it_iterates_over_context() {
320        let c = Context::new();
321        assert!(c.insert("one", 1).is_ok());
322        assert!(c.insert("two", 2).is_ok());
323        assert_eq!(c.iter().count(), 2);
324        assert_eq!(
325            c.iter()
326                // Fiddly because of the conversion from bytes to usize, but ...
327                .map(|r| serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap())
328                .sum::<usize>(),
329            3
330        );
331    }
332
333    #[test]
334    fn it_iterates_mutably_over_context() {
335        let c = Context::new();
336        assert!(c.insert("one", 1usize).is_ok());
337        assert!(c.insert("two", 2usize).is_ok());
338        assert_eq!(c.iter().count(), 2);
339        c.iter_mut().for_each(|mut r| {
340            // Fiddly because of the conversion from bytes to usize, but ...
341            let new: usize = serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap() + 1;
342            *r = new.into();
343        });
344        assert_eq!(c.get("one").unwrap(), Some(2));
345        assert_eq!(c.get("two").unwrap(), Some(3));
346    }
347
348    #[test]
349    fn context_extensions() {
350        // This is mostly tested in the extensions module.
351        let c = Context::new();
352        c.extensions().with_lock(|lock| lock.insert(1usize));
353        let v = c
354            .extensions()
355            .with_lock(|lock| lock.get::<usize>().cloned());
356        assert_eq!(v, Some(1usize));
357    }
358
359    #[test]
360    fn test_executable_document_access() {
361        let c = Context::new();
362        let schema = include_str!("../testdata/minimal_supergraph.graphql");
363        let schema = Schema::parse(schema, &Default::default()).unwrap();
364        let document =
365            Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
366        assert!(c.executable_document().is_none());
367        c.extensions().with_lock(|lock| lock.insert(document));
368        assert!(c.executable_document().is_some());
369    }
370}