apollo_federation/connectors/runtime/
inputs.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use apollo_compiler::collections::HashSet;
5use apollo_compiler::collections::IndexMap;
6use apollo_compiler::collections::IndexSet;
7use http::HeaderMap;
8use http::HeaderValue;
9use http::response::Parts;
10use serde_json::Value as JsonValue;
11use serde_json_bytes::ByteString;
12use serde_json_bytes::Map;
13use serde_json_bytes::Value;
14use serde_json_bytes::json;
15
16use crate::connectors::Namespace;
17
18pub trait ContextReader {
19 fn get_key(&self, key: &str) -> Option<Value>;
20}
21
22pub(crate) fn externalize_header_map(
24 input: &HeaderMap<HeaderValue>,
25) -> Result<HashMap<String, Vec<String>>, String> {
26 let mut output = HashMap::new();
27 for (k, v) in input {
28 let k = k.as_str().to_owned();
29 let v = String::from_utf8(v.as_bytes().to_vec()).map_err(|e| e.to_string())?;
30 output.entry(k).or_insert_with(Vec::new).push(v)
31 }
32 Ok(output)
33}
34
35#[derive(Clone, Default)]
36pub struct RequestInputs {
37 pub args: Map<ByteString, Value>,
38 pub this: Map<ByteString, Value>,
39 pub batch: Vec<Map<ByteString, Value>>,
40}
41
42impl RequestInputs {
43 pub fn merger(
47 self,
48 variables_used: &IndexMap<Namespace, IndexSet<String>>,
49 ) -> MappingContextMerger<'_> {
50 MappingContextMerger {
51 inputs: self,
52 variables_used,
53 config: None,
54 context: None,
55 status: None,
56 request: None,
57 response: None,
58 env: None,
59 }
60 }
61}
62
63impl std::fmt::Debug for RequestInputs {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(
66 f,
67 "RequestInputs {{\n args: {},\n this: {},\n batch: {}\n}}",
68 serde_json::to_string(&self.args).unwrap_or_else(|_| "<invalid JSON>".to_string()),
69 serde_json::to_string(&self.this).unwrap_or_else(|_| "<invalid JSON>".to_string()),
70 serde_json::to_string(&self.batch).unwrap_or_else(|_| "<invalid JSON>".to_string()),
71 )
72 }
73}
74
75pub struct MappingContextMerger<'merger> {
76 pub inputs: RequestInputs,
77 pub variables_used: &'merger IndexMap<Namespace, IndexSet<String>>,
78 pub config: Option<Value>,
79 pub context: Option<Value>,
80 pub status: Option<Value>,
81 pub request: Option<Value>,
82 pub response: Option<Value>,
83 pub env: Option<Value>,
84}
85
86impl MappingContextMerger<'_> {
87 pub fn merge(self) -> IndexMap<String, Value> {
88 let mut map =
89 IndexMap::with_capacity_and_hasher(self.variables_used.len(), Default::default());
90 if self.variables_used.contains_key(&Namespace::Args) {
92 map.insert(
93 Namespace::Args.as_str().into(),
94 Value::Object(self.inputs.args),
95 );
96 }
97
98 if self.variables_used.contains_key(&Namespace::This) {
100 map.insert(
101 Namespace::This.as_str().into(),
102 Value::Object(self.inputs.this),
103 );
104 }
105
106 if self.variables_used.contains_key(&Namespace::Batch) {
108 map.insert(
109 Namespace::Batch.as_str().into(),
110 Value::Array(self.inputs.batch.into_iter().map(Value::Object).collect()),
111 );
112 }
113
114 if let Some(config) = self.config.iter().next() {
115 map.insert(Namespace::Config.as_str().into(), config.to_owned());
116 }
117
118 if let Some(context) = self.context.iter().next() {
119 map.insert(Namespace::Context.as_str().into(), context.to_owned());
120 }
121
122 if let Some(status) = self.status.iter().next() {
123 map.insert(Namespace::Status.as_str().into(), status.to_owned());
124 }
125
126 if let Some(request) = self.request.iter().next() {
127 map.insert(Namespace::Request.as_str().into(), request.to_owned());
128 }
129
130 if let Some(response) = self.response.iter().next() {
131 map.insert(Namespace::Response.as_str().into(), response.to_owned());
132 }
133
134 if let Some(env_vars_used) = self.variables_used.get(&Namespace::Env) {
135 let env_vars: Map<ByteString, Value> = env_vars_used
136 .iter()
137 .flat_map(|key| {
138 std::env::var(key)
139 .ok()
140 .map(|value| (key.as_str().into(), Value::String(value.into())))
141 })
142 .collect();
143 map.insert(Namespace::Env.as_str().into(), Value::Object(env_vars));
144 }
145
146 map
147 }
148
149 pub fn context<'a>(mut self, context: impl ContextReader + 'a) -> Self {
150 if let Some(context_keys) = self.variables_used.get(&Namespace::Context) {
154 self.context = Some(Value::Object(
155 context_keys
156 .iter()
157 .filter_map(|key| {
158 context
159 .get_key(key)
160 .map(|value| (key.as_str().into(), value))
161 })
162 .collect(),
163 ));
164 }
165 self
166 }
167
168 pub fn config(mut self, config: Option<&Arc<HashMap<String, JsonValue>>>) -> Self {
169 if self.variables_used.contains_key(&Namespace::Config) {
173 self.config = config.map(|c| json!(c)).or_else(|| Some(json!({})));
174 }
175 self
176 }
177
178 pub fn status(mut self, status: u16) -> Self {
179 if self.variables_used.contains_key(&Namespace::Status) {
181 self.status = Some(Value::Number(status.into()));
182 }
183 self
184 }
185
186 pub fn request(
187 mut self,
188 headers_used: &HashSet<String>,
189 headers: &HeaderMap<HeaderValue>,
190 ) -> Self {
191 if self.variables_used.contains_key(&Namespace::Request) {
194 let new_headers = externalize_header_map(headers)
195 .unwrap_or_default()
196 .iter()
197 .filter_map(|(key, value)| {
198 headers_used.contains(key.as_str()).then_some((
199 key.as_str().into(),
200 value
201 .iter()
202 .map(|s| Value::String(s.as_str().into()))
203 .collect(),
204 ))
205 })
206 .collect();
207 let request_object = json!({
208 "headers": Value::Object(new_headers)
209 });
210 self.request = Some(request_object);
211 }
212 self
213 }
214
215 pub fn response(
216 mut self,
217 headers_used: &HashSet<String>,
218 response_parts: Option<&Parts>,
219 ) -> Self {
220 if let (true, Some(response_parts)) = (
223 self.variables_used.contains_key(&Namespace::Response),
224 response_parts,
225 ) {
226 let new_headers: Map<ByteString, Value> =
227 externalize_header_map(&response_parts.headers)
228 .unwrap_or_default()
229 .iter()
230 .filter_map(|(key, value)| {
231 headers_used.contains(key.as_str()).then_some((
232 key.as_str().into(),
233 value
234 .iter()
235 .map(|s| Value::String(s.as_str().into()))
236 .collect(),
237 ))
238 })
239 .collect();
240 let response_object = json!({
241 "headers": Value::Object(new_headers)
242 });
243 self.response = Some(response_object);
244 }
245 self
246 }
247}