apollo_federation/connectors/runtime/
debug.rs1use std::collections::HashMap;
2
3use itertools::Itertools;
4use serde::Deserialize;
5use serde::Serialize;
6use serde_json_bytes::json;
7
8use crate::connectors::ConnectorErrorsSettings;
9use crate::connectors::HeaderSource;
10use crate::connectors::HttpJsonTransport;
11use crate::connectors::OriginatingDirective;
12use crate::connectors::runtime::mapping::Problem;
13
14#[derive(Debug, Clone, Default)]
15pub struct ConnectorContext {
16 items: Vec<ConnectorContextItem>,
17}
18
19#[derive(Debug, Clone)]
20pub struct ConnectorContextItem {
21 problems: Vec<Problem>,
22 request: ConnectorDebugHttpRequest,
23 response: ConnectorDebugHttpResponse,
24}
25
26impl ConnectorContext {
27 pub fn push_response(
28 &mut self,
29 request: Option<Box<ConnectorDebugHttpRequest>>,
30 parts: &http::response::Parts,
31 json_body: &serde_json_bytes::Value,
32 selection_data: Option<SelectionData>,
33 error_settings: &ConnectorErrorsSettings,
34 problems: Vec<Problem>,
35 ) {
36 if let Some(request) = request {
37 self.items.push(ConnectorContextItem {
38 request: *request,
39 response: ConnectorDebugHttpResponse::new(
40 parts,
41 json_body,
42 selection_data,
43 error_settings,
44 ),
45 problems,
46 });
47 } else {
48 tracing::warn!(
49 "connectors debugging: couldn't find a matching request for the response"
50 );
51 }
52 }
53
54 pub fn push_invalid_response(
55 &mut self,
56 request: Option<Box<ConnectorDebugHttpRequest>>,
57 parts: &http::response::Parts,
58 body: &[u8],
59 error_settings: &ConnectorErrorsSettings,
60 problems: Vec<Problem>,
61 ) {
62 if let Some(request) = request {
63 self.items.push(ConnectorContextItem {
64 request: *request,
65 response: ConnectorDebugHttpResponse {
66 status: parts.status.as_u16(),
67 headers: parts
68 .headers
69 .iter()
70 .map(|(name, value)| {
71 (
72 name.as_str().to_string(),
73 value.to_str().unwrap_or_default().to_string(),
74 )
75 })
76 .collect(),
77 body: ConnectorDebugBody {
78 kind: "invalid".to_string(),
79 content: format!("{body:?}").into(),
80 selection: None,
81 },
82 errors: if error_settings.message.is_some()
83 || error_settings.connect_extensions.is_some()
84 || error_settings.source_extensions.is_some()
85 {
86 Some(ConnectorDebugErrors {
87 message: error_settings.message.as_ref().map(|m| m.to_string()),
88 source_extensions: error_settings
89 .source_extensions
90 .as_ref()
91 .map(|m| m.to_string()),
92 connect_extensions: error_settings
93 .connect_extensions
94 .as_ref()
95 .map(|m| m.to_string()),
96 })
97 } else {
98 None
99 },
100 },
101 problems,
102 });
103 } else {
104 tracing::warn!(
105 "connectors debugging: couldn't find a matching request for the response"
106 );
107 }
108 }
109
110 pub fn serialize(self) -> serde_json_bytes::Value {
111 json!(
112 self.items
113 .iter()
114 .map(|item| {
115 let problems = item
117 .problems
118 .iter()
119 .sorted_by_key(|problem| problem.location)
120 .map(
121 |Problem {
122 message,
123 path,
124 count,
125 location,
126 }| {
127 json!({
130 "location": location,
131 "details": {
132 "message": message,
133 "path": path,
134 "count": count,
135 },
136 })
137 },
138 )
139 .collect_vec();
140
141 json!({
142 "request": item.request,
143 "response": item.response,
144 "problems": problems
145 })
146 })
147 .collect::<Vec<_>>()
148 )
149 }
150
151 pub fn problems(&self) -> Vec<serde_json_bytes::Value> {
152 self.items
153 .iter()
154 .flat_map(|item| item.problems.iter())
155 .map(|problem| json!({ "message": problem.message, "path": problem.path }))
156 .collect()
157 }
158}
159
160pub struct SelectionData {
164 pub source: String,
166
167 pub transformed: String,
172
173 pub result: Option<serde_json_bytes::Value>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179struct ConnectorDebugBody {
180 kind: String,
181 content: serde_json_bytes::Value,
182 selection: Option<ConnectorDebugSelection>,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186struct ConnectorDebugSelection {
187 source: String,
188 transformed: String,
189 result: Option<serde_json_bytes::Value>,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub struct ConnectorDebugUri {
195 base: Option<String>,
196 path: Option<String>,
197 query_params: Option<String>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
201#[serde(rename_all = "camelCase")]
202struct ConnectorDebugErrors {
203 message: Option<String>,
204 source_extensions: Option<String>,
205 connect_extensions: Option<String>,
206}
207
208pub type DebugRequest = (Option<Box<ConnectorDebugHttpRequest>>, Vec<Problem>);
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct ConnectorDebugHttpRequest {
213 url: String,
214 method: String,
215 headers: Vec<(String, String)>,
216 body: Option<ConnectorDebugBody>,
217 source_url: Option<ConnectorDebugUri>,
218 connect_url: ConnectorDebugUri,
219 source_headers: Option<Vec<(String, String)>>,
220 connect_headers: Option<Vec<(String, String)>>,
221}
222
223impl ConnectorDebugHttpRequest {
224 pub fn new(
225 req: &http::Request<String>,
226 kind: String,
227 json_body: Option<&serde_json_bytes::Value>,
228 selection_data: Option<SelectionData>,
229 transport: &HttpJsonTransport,
230 ) -> Self {
231 let headers = transport.headers.iter().fold(
232 HashMap::new(),
233 |mut acc: HashMap<OriginatingDirective, Vec<(String, String)>>, header| {
234 if let HeaderSource::Value(value) = &header.source {
235 acc.entry(header.originating_directive)
236 .or_default()
237 .push((header.name.to_string(), value.to_string()));
238 }
239 acc
240 },
241 );
242
243 ConnectorDebugHttpRequest {
244 url: req.uri().to_string(),
245 method: req.method().to_string(),
246 headers: req
247 .headers()
248 .iter()
249 .map(|(name, value)| {
250 (
251 name.as_str().to_string(),
252 value.to_str().unwrap_or_default().to_string(),
253 )
254 })
255 .collect(),
256 body: json_body.map(|body| ConnectorDebugBody {
257 kind,
258 content: body.clone(),
259 selection: selection_data.map(|selection| ConnectorDebugSelection {
260 source: selection.source,
261 transformed: selection.transformed,
262 result: selection.result,
263 }),
264 }),
265 source_url: if transport.source_template.is_some()
266 || transport.source_path.is_some()
267 || transport.source_query_params.is_some()
268 {
269 Some(ConnectorDebugUri {
270 base: transport.source_template.clone().map(|u| u.to_string()),
271 path: transport.source_path.clone().map(|u| u.to_string()),
272 query_params: transport.source_query_params.clone().map(|u| u.to_string()),
273 })
274 } else {
275 None
276 },
277 connect_url: ConnectorDebugUri {
278 base: Some(transport.connect_template.clone().to_string()),
279 path: transport.connect_path.clone().map(|u| u.to_string()),
280 query_params: transport
281 .connect_query_params
282 .clone()
283 .map(|u| u.to_string()),
284 },
285 connect_headers: headers.get(&OriginatingDirective::Connect).cloned(),
286 source_headers: headers.get(&OriginatingDirective::Source).cloned(),
287 }
288 }
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct ConnectorDebugHttpResponse {
293 status: u16,
294 headers: Vec<(String, String)>,
295 body: ConnectorDebugBody,
296 errors: Option<ConnectorDebugErrors>,
297}
298
299impl ConnectorDebugHttpResponse {
300 pub fn new(
301 parts: &http::response::Parts,
302 json_body: &serde_json_bytes::Value,
303 selection_data: Option<SelectionData>,
304 error_settings: &ConnectorErrorsSettings,
305 ) -> Self {
306 ConnectorDebugHttpResponse {
307 status: parts.status.as_u16(),
308 headers: parts
309 .headers
310 .iter()
311 .map(|(name, value)| {
312 (
313 name.as_str().to_string(),
314 value.to_str().unwrap_or_default().to_string(),
315 )
316 })
317 .collect(),
318 body: ConnectorDebugBody {
319 kind: "json".to_string(),
320 content: json_body.clone(),
321 selection: selection_data.map(|selection| ConnectorDebugSelection {
322 source: selection.source,
323 transformed: selection.transformed,
324 result: selection.result,
325 }),
326 },
327 errors: if error_settings.message.is_some()
328 || error_settings.connect_extensions.is_some()
329 || error_settings.source_extensions.is_some()
330 {
331 Some(ConnectorDebugErrors {
332 message: error_settings.message.as_ref().map(|m| m.to_string()),
333 source_extensions: error_settings
334 .source_extensions
335 .as_ref()
336 .map(|m| m.to_string()),
337 connect_extensions: error_settings
338 .connect_extensions
339 .as_ref()
340 .map(|m| m.to_string()),
341 })
342 } else {
343 None
344 },
345 }
346 }
347}