1use std::sync::Arc;
7use std::time::Duration;
8use std::time::Instant;
9
10use apollo_compiler::ExecutableDocument;
11use apollo_compiler::validation::Valid;
12use dashmap::DashMap;
13use dashmap::mapref::multiple::RefMulti;
14use dashmap::mapref::multiple::RefMutMulti;
15use derivative::Derivative;
16use extensions::sync::ExtensionsMutex;
17use parking_lot::Mutex;
18use serde::Deserialize;
19use serde::Serialize;
20use tower::BoxError;
21
22use crate::json_ext::Value;
23use crate::services::layers::query_analysis::ParsedDocument;
24
25pub(crate) mod extensions;
26
27pub(crate) const OPERATION_NAME: &str = "operation_name";
29pub(crate) const OPERATION_KIND: &str = "operation_kind";
31pub(crate) const CONTAINS_GRAPHQL_ERROR: &str = "apollo::telemetry::contains_graphql_error";
33
34pub(crate) type Entries = Arc<DashMap<String, Value>>;
36
37#[derive(Clone, Deserialize, Serialize, Derivative)]
49#[serde(default)]
50#[derivative(Debug)]
51pub struct Context {
52 entries: Entries,
54
55 #[serde(skip)]
56 extensions: ExtensionsMutex,
57
58 #[serde(skip)]
60 pub(crate) created_at: Instant,
61
62 #[serde(skip)]
63 #[derivative(Debug = "ignore")]
64 busy_timer: Arc<Mutex<BusyTimer>>,
65
66 #[serde(skip)]
67 pub(crate) id: String,
68}
69
70impl Context {
71 pub fn new() -> Self {
73 let id = uuid::Uuid::new_v4()
74 .as_hyphenated()
75 .encode_lower(&mut uuid::Uuid::encode_buffer())
76 .to_string();
77 Context {
78 entries: Default::default(),
79 extensions: ExtensionsMutex::default(),
80 created_at: Instant::now(),
81 busy_timer: Arc::new(Mutex::new(BusyTimer::new())),
82 id,
83 }
84 }
85}
86
87impl Context {
88 pub fn extensions(&self) -> &ExtensionsMutex {
97 &self.extensions
98 }
99
100 pub fn contains_key<K>(&self, key: K) -> bool
102 where
103 K: Into<String>,
104 {
105 self.entries.contains_key(&key.into())
106 }
107
108 pub fn get<K, V>(&self, key: K) -> Result<Option<V>, BoxError>
114 where
115 K: Into<String>,
116 V: for<'de> serde::Deserialize<'de>,
117 {
118 self.entries
119 .get(&key.into())
120 .map(|v| serde_json_bytes::from_value(v.value().clone()))
121 .transpose()
122 .map_err(|e| e.into())
123 }
124
125 pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<V>, BoxError>
131 where
132 K: Into<String>,
133 V: for<'de> serde::Deserialize<'de> + Serialize,
134 {
135 match serde_json_bytes::to_value(value) {
136 Ok(value) => self
137 .entries
138 .insert(key.into(), value)
139 .map(|v| serde_json_bytes::from_value(v))
140 .transpose()
141 .map_err(|e| e.into()),
142 Err(e) => Err(e.into()),
143 }
144 }
145
146 pub fn insert_json_value<K>(&self, key: K, value: Value) -> Option<Value>
150 where
151 K: Into<String>,
152 {
153 self.entries.insert(key.into(), value)
154 }
155
156 pub fn get_json_value<K>(&self, key: K) -> Option<Value>
158 where
159 K: Into<String>,
160 {
161 self.entries.get(&key.into()).map(|v| v.value().clone())
162 }
163
164 pub fn upsert<K, V>(&self, key: K, upsert: impl FnOnce(V) -> V) -> Result<(), BoxError>
177 where
178 K: Into<String>,
179 V: for<'de> serde::Deserialize<'de> + Serialize + Default,
180 {
181 let key = key.into();
182 self.entries
183 .entry(key.clone())
184 .or_try_insert_with(|| serde_json_bytes::to_value::<V>(Default::default()))?;
185 let mut result = Ok(());
186 self.entries
187 .alter(&key, |_, v| match serde_json_bytes::from_value(v.clone()) {
188 Ok(value) => match serde_json_bytes::to_value((upsert)(value)) {
189 Ok(value) => value,
190 Err(e) => {
191 result = Err(e);
192 v
193 }
194 },
195 Err(e) => {
196 result = Err(e);
197 v
198 }
199 });
200 result.map_err(|e| e.into())
201 }
202
203 pub(crate) fn upsert_json_value<K>(&self, key: K, upsert: impl FnOnce(Value) -> Value)
210 where
211 K: Into<String>,
212 {
213 let key = key.into();
214 self.entries.entry(key.clone()).or_insert(Value::Null);
215 self.entries.alter(&key, |_, v| upsert(v));
216 }
217
218 pub(crate) fn try_into_iter(
220 self,
221 ) -> Result<impl IntoIterator<Item = (String, Value)>, BoxError> {
222 Ok(Arc::try_unwrap(self.entries)
223 .map_err(|_e| anyhow::anyhow!("cannot take ownership of dashmap"))?
224 .into_iter())
225 }
226
227 pub fn iter(&self) -> impl Iterator<Item = RefMulti<'_, String, Value>> + '_ {
229 self.entries.iter()
230 }
231
232 pub fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, String, Value>> + '_ {
234 self.entries.iter_mut()
235 }
236
237 pub fn enter_active_request(&self) -> BusyTimerGuard {
245 self.busy_timer.lock().increment_active_requests();
246 BusyTimerGuard {
247 busy_timer: self.busy_timer.clone(),
248 }
249 }
250
251 pub fn busy_time(&self) -> Duration {
260 self.busy_timer.lock().current()
261 }
262
263 pub(crate) fn extend(&self, other: &Context) {
264 for kv in other.entries.iter() {
265 self.entries.insert(kv.key().clone(), kv.value().clone());
266 }
267 }
268
269 #[doc(hidden)]
272 pub fn unsupported_executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
273 self.extensions()
274 .with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
275 }
276}
277
278pub struct BusyTimerGuard {
279 busy_timer: Arc<Mutex<BusyTimer>>,
280}
281
282impl Drop for BusyTimerGuard {
283 fn drop(&mut self) {
284 self.busy_timer.lock().decrement_active_requests()
285 }
286}
287
288impl Default for Context {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294pub(crate) struct BusyTimer {
301 active_requests: u32,
302 busy_ns: Duration,
303 start: Option<Instant>,
304}
305
306impl BusyTimer {
307 fn new() -> Self {
308 BusyTimer::default()
309 }
310
311 fn increment_active_requests(&mut self) {
312 if self.active_requests == 0 {
313 if let Some(start) = self.start.take() {
314 self.busy_ns += start.elapsed();
315 }
316 self.start = None;
317 }
318
319 self.active_requests += 1;
320 }
321
322 fn decrement_active_requests(&mut self) {
323 self.active_requests -= 1;
324
325 if self.active_requests == 0 {
326 self.start = Some(Instant::now());
327 }
328 }
329
330 fn current(&mut self) -> Duration {
331 if let Some(start) = self.start {
332 self.busy_ns + start.elapsed()
333 } else {
334 self.busy_ns
335 }
336 }
337}
338
339impl Default for BusyTimer {
340 fn default() -> Self {
341 Self {
342 active_requests: 0,
343 busy_ns: Duration::new(0, 0),
344 start: Some(Instant::now()),
345 }
346 }
347}
348
349#[cfg(test)]
350mod test {
351 use crate::Configuration;
352 use crate::Context;
353 use crate::spec::Query;
354 use crate::spec::Schema;
355
356 #[test]
357 fn test_context_insert() {
358 let c = Context::new();
359 assert!(c.insert("key1", 1).is_ok());
360 assert_eq!(c.get("key1").unwrap(), Some(1));
361 }
362
363 #[test]
364 fn test_context_overwrite() {
365 let c = Context::new();
366 assert!(c.insert("overwrite", 2).is_ok());
367 assert!(c.insert("overwrite", 3).is_ok());
368 assert_eq!(c.get("overwrite").unwrap(), Some(3));
369 }
370
371 #[test]
372 fn test_context_upsert() {
373 let c = Context::new();
374 assert!(c.insert("present", 1).is_ok());
375 assert!(c.upsert("present", |v: usize| v + 1).is_ok());
376 assert_eq!(c.get("present").unwrap(), Some(2));
377 assert!(c.upsert("not_present", |v: usize| v + 1).is_ok());
378 assert_eq!(c.get("not_present").unwrap(), Some(1));
379 }
380
381 #[test]
382 fn test_context_marshall_errors() {
383 let c = Context::new();
384 assert!(c.insert("string", "Some value".to_string()).is_ok());
385 assert!(c.upsert("string", |v: usize| v + 1).is_err());
386 }
387
388 #[test]
389 fn it_iterates_over_context() {
390 let c = Context::new();
391 assert!(c.insert("one", 1).is_ok());
392 assert!(c.insert("two", 2).is_ok());
393 assert_eq!(c.iter().count(), 2);
394 assert_eq!(
395 c.iter()
396 .map(|r| serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap())
398 .sum::<usize>(),
399 3
400 );
401 }
402
403 #[test]
404 fn it_iterates_mutably_over_context() {
405 let c = Context::new();
406 assert!(c.insert("one", 1usize).is_ok());
407 assert!(c.insert("two", 2usize).is_ok());
408 assert_eq!(c.iter().count(), 2);
409 c.iter_mut().for_each(|mut r| {
410 let new: usize = serde_json_bytes::from_value::<usize>(r.value().clone()).unwrap() + 1;
412 *r = new.into();
413 });
414 assert_eq!(c.get("one").unwrap(), Some(2));
415 assert_eq!(c.get("two").unwrap(), Some(3));
416 }
417
418 #[test]
419 fn context_extensions() {
420 let c = Context::new();
422 c.extensions().with_lock(|mut lock| lock.insert(1usize));
423 let v = c
424 .extensions()
425 .with_lock(|lock| lock.get::<usize>().cloned());
426 assert_eq!(v, Some(1usize));
427 }
428
429 #[test]
430 fn test_executable_document_access() {
431 let c = Context::new();
432 let schema = include_str!("../testdata/minimal_supergraph.graphql");
433 let schema = Schema::parse(schema, &Default::default()).unwrap();
434 let document =
435 Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
436 assert!(c.unsupported_executable_document().is_none());
437 c.extensions().with_lock(|mut lock| lock.insert(document));
438 assert!(c.unsupported_executable_document().is_some());
439 }
440}