Skip to main content

apollo_federation/connectors/runtime/
inputs.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use apollo_compiler::collections::HashSet;
5use apollo_compiler::collections::IndexMap;
6use apollo_compiler::collections::IndexSet;
7use http::HeaderMap;
8use http::HeaderValue;
9use http::response::Parts;
10use serde_json::Value as JsonValue;
11use serde_json_bytes::ByteString;
12use serde_json_bytes::Map;
13use serde_json_bytes::Value;
14use serde_json_bytes::json;
15
16use crate::connectors::Namespace;
17
18pub trait ContextReader {
19    fn get_key(&self, key: &str) -> Option<Value>;
20}
21
22/// Convert a HeaderMap into a HashMap
23pub(crate) fn externalize_header_map(
24    input: &HeaderMap<HeaderValue>,
25) -> Result<HashMap<String, Vec<String>>, String> {
26    let mut output = HashMap::new();
27    for (k, v) in input {
28        let k = k.as_str().to_owned();
29        let v = String::from_utf8(v.as_bytes().to_vec()).map_err(|e| e.to_string())?;
30        output.entry(k).or_insert_with(Vec::new).push(v)
31    }
32    Ok(output)
33}
34
35#[derive(Clone, Default)]
36pub struct RequestInputs {
37    pub args: Map<ByteString, Value>,
38    pub this: Map<ByteString, Value>,
39    pub batch: Vec<Map<ByteString, Value>>,
40}
41
42impl RequestInputs {
43    /// Creates a map for use in JSONSelection::apply_with_vars. It only clones
44    /// values into the map if the variable namespaces (`$args`, `$this`, etc.)
45    /// are actually referenced in the expressions for URLs, headers, body, or selection.
46    pub fn merger(
47        self,
48        variables_used: &IndexMap<Namespace, IndexSet<String>>,
49    ) -> MappingContextMerger<'_> {
50        MappingContextMerger {
51            inputs: self,
52            variables_used,
53            config: None,
54            context: None,
55            status: None,
56            request: None,
57            response: None,
58            env: None,
59        }
60    }
61}
62
63impl std::fmt::Debug for RequestInputs {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(
66            f,
67            "RequestInputs {{\n    args: {},\n    this: {},\n    batch: {}\n}}",
68            serde_json::to_string(&self.args).unwrap_or_else(|_| "<invalid JSON>".to_string()),
69            serde_json::to_string(&self.this).unwrap_or_else(|_| "<invalid JSON>".to_string()),
70            serde_json::to_string(&self.batch).unwrap_or_else(|_| "<invalid JSON>".to_string()),
71        )
72    }
73}
74
75pub struct MappingContextMerger<'merger> {
76    pub inputs: RequestInputs,
77    pub variables_used: &'merger IndexMap<Namespace, IndexSet<String>>,
78    pub config: Option<Value>,
79    pub context: Option<Value>,
80    pub status: Option<Value>,
81    pub request: Option<Value>,
82    pub response: Option<Value>,
83    pub env: Option<Value>,
84}
85
86impl MappingContextMerger<'_> {
87    pub fn merge(self) -> IndexMap<String, Value> {
88        let mut map =
89            IndexMap::with_capacity_and_hasher(self.variables_used.len(), Default::default());
90        // Not all connectors reference $args
91        if self.variables_used.contains_key(&Namespace::Args) {
92            map.insert(
93                Namespace::Args.as_str().into(),
94                Value::Object(self.inputs.args),
95            );
96        }
97
98        // $this only applies to fields on entity types (not Query or Mutation)
99        if self.variables_used.contains_key(&Namespace::This) {
100            map.insert(
101                Namespace::This.as_str().into(),
102                Value::Object(self.inputs.this),
103            );
104        }
105
106        // $batch only applies to entity resolvers on types
107        if self.variables_used.contains_key(&Namespace::Batch) {
108            map.insert(
109                Namespace::Batch.as_str().into(),
110                Value::Array(self.inputs.batch.into_iter().map(Value::Object).collect()),
111            );
112        }
113
114        if let Some(config) = self.config.iter().next() {
115            map.insert(Namespace::Config.as_str().into(), config.to_owned());
116        }
117
118        if let Some(context) = self.context.iter().next() {
119            map.insert(Namespace::Context.as_str().into(), context.to_owned());
120        }
121
122        if let Some(status) = self.status.iter().next() {
123            map.insert(Namespace::Status.as_str().into(), status.to_owned());
124        }
125
126        if let Some(request) = self.request.iter().next() {
127            map.insert(Namespace::Request.as_str().into(), request.to_owned());
128        }
129
130        if let Some(response) = self.response.iter().next() {
131            map.insert(Namespace::Response.as_str().into(), response.to_owned());
132        }
133
134        if let Some(env_vars_used) = self.variables_used.get(&Namespace::Env) {
135            let env_vars: Map<ByteString, Value> = env_vars_used
136                .iter()
137                .flat_map(|key| {
138                    std::env::var(key)
139                        .ok()
140                        .map(|value| (key.as_str().into(), Value::String(value.into())))
141                })
142                .collect();
143            map.insert(Namespace::Env.as_str().into(), Value::Object(env_vars));
144        }
145
146        map
147    }
148
149    pub fn context<'a>(mut self, context: impl ContextReader + 'a) -> Self {
150        // $context could be a large object, so we only convert it to JSON
151        // if it's used. It can also be mutated between requests, so we have
152        // to convert it each time.
153        if let Some(context_keys) = self.variables_used.get(&Namespace::Context) {
154            self.context = Some(Value::Object(
155                context_keys
156                    .iter()
157                    .filter_map(|key| {
158                        context
159                            .get_key(key)
160                            .map(|value| (key.as_str().into(), value))
161                    })
162                    .collect(),
163            ));
164        }
165        self
166    }
167
168    pub fn config(mut self, config: Option<&Arc<HashMap<String, JsonValue>>>) -> Self {
169        // $config doesn't change unless the schema reloads, but we can avoid
170        // the allocation if it's unused.
171        // We should always have a value for $config, even if it's an empty object, or we end up with "Variable $config not found" which is a confusing error for users
172        if self.variables_used.contains_key(&Namespace::Config) {
173            self.config = config.map(|c| json!(c)).or_else(|| Some(json!({})));
174        }
175        self
176    }
177
178    pub fn status(mut self, status: u16) -> Self {
179        // $status is available only for response mapping
180        if self.variables_used.contains_key(&Namespace::Status) {
181            self.status = Some(Value::Number(status.into()));
182        }
183        self
184    }
185
186    pub fn request(
187        mut self,
188        headers_used: &HashSet<String>,
189        headers: &HeaderMap<HeaderValue>,
190    ) -> Self {
191        // Add headers from the original router request.
192        // Only include headers that are actually referenced to save on passing around unused headers in memory.
193        if self.variables_used.contains_key(&Namespace::Request) {
194            let new_headers = externalize_header_map(headers)
195                .unwrap_or_default()
196                .iter()
197                .filter_map(|(key, value)| {
198                    headers_used.contains(key.as_str()).then_some((
199                        key.as_str().into(),
200                        value
201                            .iter()
202                            .map(|s| Value::String(s.as_str().into()))
203                            .collect(),
204                    ))
205                })
206                .collect();
207            let request_object = json!({
208                "headers": Value::Object(new_headers)
209            });
210            self.request = Some(request_object);
211        }
212        self
213    }
214
215    pub fn response(
216        mut self,
217        headers_used: &HashSet<String>,
218        response_parts: Option<&Parts>,
219    ) -> Self {
220        // Add headers from the connectors response
221        // Only include headers that are actually referenced to save on passing around unused headers in memory.
222        if let (true, Some(response_parts)) = (
223            self.variables_used.contains_key(&Namespace::Response),
224            response_parts,
225        ) {
226            let new_headers: Map<ByteString, Value> =
227                externalize_header_map(&response_parts.headers)
228                    .unwrap_or_default()
229                    .iter()
230                    .filter_map(|(key, value)| {
231                        headers_used.contains(key.as_str()).then_some((
232                            key.as_str().into(),
233                            value
234                                .iter()
235                                .map(|s| Value::String(s.as_str().into()))
236                                .collect(),
237                        ))
238                    })
239                    .collect();
240            let response_object = json!({
241                "headers": Value::Object(new_headers)
242            });
243            self.response = Some(response_object);
244        }
245        self
246    }
247}