Skip to main content

apollo_router/context/
mod.rs

1//! Provide a [`Context`] for the plugin chain of responsibilities.
2//!
3//! Router plugins accept a mutable [`Context`] when invoked and this contains a DashMap which
4//! allows additional data to be passed back and forth along the request invocation pipeline.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use apollo_compiler::ExecutableDocument;
10use apollo_compiler::validation::Valid;
11use dashmap::DashMap;
12use dashmap::mapref::multiple::RefMulti;
13use dashmap::mapref::multiple::RefMutMulti;
14use derivative::Derivative;
15use extensions::sync::ExtensionsMutex;
16use serde::Deserialize;
17use serde::Serialize;
18use tower::BoxError;
19
20use crate::json_ext::Value;
21use crate::services::layers::query_analysis::ParsedDocument;
22
23pub(crate) mod deprecated;
24pub(crate) mod extensions;
25
26/// Context key for the operation name.
27pub(crate) const OPERATION_NAME: &str = "apollo::supergraph::operation_name";
28/// Context key for the operation kind.
29pub(crate) const OPERATION_KIND: &str = "apollo::supergraph::operation_kind";
30/// Context key for the persisted query ID.
31pub(crate) const PERSISTED_QUERY_ID: &str = "apollo::supergraph::persisted_query_id";
32/// The key to know if the response body contains at least 1 GraphQL error
33pub(crate) const CONTAINS_GRAPHQL_ERROR: &str = "apollo::telemetry::contains_graphql_error";
34/// The key to a map of errors that were already counted in a previous layer. This is subject to
35/// change and is NOT supported for user access.
36pub(crate) const COUNTED_ERRORS: &str = "apollo::telemetry::counted_errors";
37/// The key for the full list of errors in the router response. This allows us to pull the value in
38/// plugins without having to deserialize the router response. This is subject to change and is NOT
39/// supported for user access.
40pub(crate) const ROUTER_RESPONSE_ERRORS: &str = "apollo::router::response_errors";
41
42pub(crate) use deprecated::context_key_from_deprecated;
43pub(crate) use deprecated::context_key_to_deprecated;
44
45/// Holds [`Context`] entries.
46pub(crate) type Entries = Arc<DashMap<String, Value>>;
47
48/// A map of arbitrary JSON values, for use by plugins.
49///
50/// Context makes use of [`DashMap`] under the hood which tries to handle concurrency
51/// by allowing concurrency across threads without requiring locking. This is great
52/// for usability but could lead to surprises when updates are highly contested.
53///
54/// Within the router, contention is likely to be highest within plugins which
55/// provide [`crate::services::SubgraphRequest`] or
56/// [`crate::services::SubgraphResponse`] processing. At such times,
57/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
58/// functions to minimise the possibility of mis-sequenced updates.
59#[derive(Clone, Deserialize, Serialize, Derivative)]
60#[serde(default)]
61#[derivative(Debug)]
62pub struct Context {
63    // Allows adding custom entries to the context.
64    entries: Entries,
65
66    #[serde(skip)]
67    extensions: ExtensionsMutex,
68
69    /// Creation time
70    #[serde(skip)]
71    pub(crate) created_at: Instant,
72
73    #[serde(skip)]
74    pub(crate) id: String,
75}
76
77impl Context {
78    /// Create a new context.
79    pub fn new() -> Self {
80        let id = uuid::Uuid::new_v4()
81            .as_hyphenated()
82            .encode_lower(&mut uuid::Uuid::encode_buffer())
83            .to_string();
84        Context {
85            entries: Default::default(),
86            extensions: ExtensionsMutex::default(),
87            created_at: Instant::now(),
88            id,
89        }
90    }
91}
92
93impl FromIterator<(String, Value)> for Context {
94    fn from_iter<T: IntoIterator<Item = (String, Value)>>(iter: T) -> Self {
95        Self {
96            entries: Arc::new(DashMap::from_iter(iter)),
97            extensions: ExtensionsMutex::default(),
98            created_at: Instant::now(),
99            id: String::new(),
100        }
101    }
102}
103
104impl Context {
105    /// Returns extensions of the context.
106    ///
107    /// You can use `Extensions` to pass data between plugins that is not serializable. Such data is not accessible from Rhai or co-processoers.
108    ///
109    /// It is CRITICAL to avoid holding on to the mutex guard for too long, particularly across async calls.
110    /// Doing so may cause performance degradation or even deadlocks.
111    ///
112    /// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
113    pub fn extensions(&self) -> &ExtensionsMutex {
114        &self.extensions
115    }
116
117    /// Returns true if the context contains a value for the specified key.
118    pub fn contains_key<K>(&self, key: K) -> bool
119    where
120        K: Into<String>,
121    {
122        self.entries.contains_key(&key.into())
123    }
124
125    /// Get a value from the context using the provided key.
126    ///
127    /// Semantics:
128    ///  - If the operation fails, that's because we can't deserialize the value.
129    ///  - If the operation succeeds, the value is an [`Option`].
130    pub fn get<K, V>(&self, key: K) -> Result<Option<V>, BoxError>
131    where
132        K: Into<String>,
133        V: for<'de> serde::Deserialize<'de>,
134    {
135        self.entries
136            .get(&key.into())
137            .map(|v| serde_json_bytes::from_value(v.value().clone()))
138            .transpose()
139            .map_err(|e| e.into())
140    }
141
142    /// Insert a value int the context using the provided key and value.
143    ///
144    /// Semantics:
145    ///  - If the operation fails, then the pair has not been inserted.
146    ///  - If the operation succeeds, the result is the old value as an [`Option`].
147    pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<V>, BoxError>
148    where
149        K: Into<String>,
150        V: for<'de> serde::Deserialize<'de> + Serialize,
151    {
152        match serde_json_bytes::to_value(value) {
153            Ok(value) => self
154                .entries
155                .insert(key.into(), value)
156                .map(|v| serde_json_bytes::from_value(v))
157                .transpose()
158                .map_err(|e| e.into()),
159            Err(e) => Err(e.into()),
160        }
161    }
162
163    /// Insert a value in the context using the provided key and value.
164    ///
165    /// Semantics: the result is the old value as an [`Option`].
166    pub fn insert_json_value<K>(&self, key: K, value: Value) -> Option<Value>
167    where
168        K: Into<String>,
169    {
170        self.entries.insert(key.into(), value)
171    }
172
173    /// Get a json value from the context using the provided key.
174    pub fn get_json_value<K>(&self, key: K) -> Option<Value>
175    where
176        K: Into<String>,
177    {
178        self.entries.get(&key.into()).map(|v| v.value().clone())
179    }
180
181    /// Upsert a value in the context using the provided key and resolving
182    /// function.
183    ///
184    /// The resolving function must yield a value to be used in the context. It
185    /// is provided with the current value to use in evaluating which value to
186    /// yield.
187    ///
188    /// Semantics:
189    ///  - If the operation fails, then the pair has not been inserted (or a current
190    ///    value updated).
191    ///  - If the operation succeeds, the pair have either updated an existing value
192    ///    or been inserted.
193    pub fn upsert<K, V>(&self, key: K, upsert: impl FnOnce(V) -> V) -> Result<(), BoxError>
194    where
195        K: Into<String>,
196        V: for<'de> serde::Deserialize<'de> + Serialize + Default,
197    {
198        let key = key.into();
199        self.entries
200            .entry(key.clone())
201            .or_try_insert_with(|| serde_json_bytes::to_value::<V>(Default::default()))?;
202        let mut result = Ok(());
203        self.entries
204            .alter(&key, |_, v| match serde_json_bytes::from_value(v.clone()) {
205                Ok(value) => match serde_json_bytes::to_value((upsert)(value)) {
206                    Ok(value) => value,
207                    Err(e) => {
208                        result = Err(e);
209                        v
210                    }
211                },
212                Err(e) => {
213                    result = Err(e);
214                    v
215                }
216            });
217        result.map_err(|e| e.into())
218    }
219
220    /// Upsert a JSON value in the context using the provided key and resolving
221    /// function.
222    ///
223    /// The resolving function must yield a value to be used in the context. It
224    /// is provided with the current value to use in evaluating which value to
225    /// yield.
226    pub(crate) fn upsert_json_value<K>(&self, key: K, upsert: impl FnOnce(Value) -> Value)
227    where
228        K: Into<String>,
229    {
230        let key = key.into();
231        self.entries.entry(key.clone()).or_insert(Value::Null);
232        self.entries.alter(&key, |_, v| upsert(v));
233    }
234
235    /// Convert the context into an iterator.
236    pub(crate) fn try_into_iter(
237        self,
238    ) -> Result<impl IntoIterator<Item = (String, Value)>, BoxError> {
239        Ok(Arc::try_unwrap(self.entries)
240            .map_err(|_e| anyhow::anyhow!("cannot take ownership of dashmap"))?
241            .into_iter())
242    }
243
244    /// Iterate over the entries.
245    pub fn iter(&self) -> impl Iterator<Item = RefMulti<'_, String, Value>> + '_ {
246        self.entries.iter()
247    }
248
249    /// Iterate mutably over the entries.
250    pub fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, String, Value>> + '_ {
251        self.entries.iter_mut()
252    }
253
254    pub(crate) fn extend(&self, other: &Context) {
255        for kv in other.entries.iter() {
256            self.entries.insert(kv.key().clone(), kv.value().clone());
257        }
258    }
259
260    pub(crate) fn retain(&self, f: impl Fn(&String, &Value) -> bool) {
261        self.entries.retain(|k, v| f(k, v));
262    }
263
264    pub(crate) fn len(&self) -> usize {
265        self.entries.len()
266    }
267
268    /// Read only access to the executable document for internal router plugins.
269    pub(crate) fn executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
270        self.extensions()
271            .with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
272    }
273}
274
275impl Default for Context {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281#[cfg(test)]
282mod test {
283    use crate::Configuration;
284    use crate::Context;
285    use crate::spec::Query;
286    use crate::spec::Schema;
287
288    #[test]
289    fn test_context_insert() {
290        let c = Context::new();
291        assert!(c.insert("key1", 1).is_ok());
292        assert_eq!(c.get("key1").unwrap(), Some(1));
293    }
294
295    #[test]
296    fn test_context_overwrite() {
297        let c = Context::new();
298        assert!(c.insert("overwrite", 2).is_ok());
299        assert!(c.insert("overwrite", 3).is_ok());
300        assert_eq!(c.get("overwrite").unwrap(), Some(3));
301    }
302
303    #[test]
304    fn test_context_upsert() {
305        let c = Context::new();
306        assert!(c.insert("present", 1).is_ok());
307        assert!(c.upsert("present", |v: usize| v + 1).is_ok());
308        assert_eq!(c.get("present").unwrap(), Some(2));
309        assert!(c.upsert("not_present", |v: usize| v + 1).is_ok());
310        assert_eq!(c.get("not_present").unwrap(), Some(1));
311    }
312
313    #[test]
314    fn test_context_marshall_errors() {
315        let c = Context::new();
316        assert!(c.insert("string", "Some value".to_string()).is_ok());
317        assert!(c.upsert("string", |v: usize| v + 1).is_err());
318    }
319
320    #[test]
321    fn it_iterates_over_context() {
322        let c = Context::new();
323        assert!(c.insert("one", 1).is_ok());
324        assert!(c.insert("two", 2).is_ok());
325        assert_eq!(c.iter().count(), 2);
326        assert_eq!(
327            c.iter()
328                // Fiddly because of the conversion from bytes to usize, but ...
329                .map(|r| serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap())
330                .sum::<usize>(),
331            3
332        );
333    }
334
335    #[test]
336    fn it_iterates_mutably_over_context() {
337        let c = Context::new();
338        assert!(c.insert("one", 1usize).is_ok());
339        assert!(c.insert("two", 2usize).is_ok());
340        assert_eq!(c.iter().count(), 2);
341        c.iter_mut().for_each(|mut r| {
342            // Fiddly because of the conversion from bytes to usize, but ...
343            let new: usize = serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap() + 1;
344            *r = new.into();
345        });
346        assert_eq!(c.get("one").unwrap(), Some(2));
347        assert_eq!(c.get("two").unwrap(), Some(3));
348    }
349
350    #[test]
351    fn context_extensions() {
352        // This is mostly tested in the extensions module.
353        let c = Context::new();
354        c.extensions().with_lock(|lock| lock.insert(1usize));
355        let v = c
356            .extensions()
357            .with_lock(|lock| lock.get::<usize>().cloned());
358        assert_eq!(v, Some(1usize));
359    }
360
361    #[test]
362    fn test_executable_document_access() {
363        let c = Context::new();
364        let schema = include_str!("../testdata/minimal_supergraph.graphql");
365        let schema = Schema::parse(schema, &Default::default()).unwrap();
366        let document =
367            Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
368        assert!(c.executable_document().is_none());
369        c.extensions().with_lock(|lock| lock.insert(document));
370        assert!(c.executable_document().is_some());
371    }
372}