hive_router_plan_executor/executors/
http.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::executors::dedupe::unique_leader_fingerprint;
5use crate::executors::map::InflightRequestsMap;
6use crate::hooks::on_subgraph_http_request::{
7 OnSubgraphHttpRequestHookPayload, OnSubgraphHttpResponseHookPayload,
8};
9use crate::plugin_context::PluginRequestState;
10use crate::plugin_trait::{EndControlFlow, StartControlFlow};
11use crate::response::subgraph_response::SubgraphResponse;
12use hive_router_config::HiveRouterConfig;
13use hive_router_internal::telemetry::TelemetryContext;
14
15use async_trait::async_trait;
16
17use bytes::{BufMut, Bytes};
18use http::HeaderMap;
19use http::HeaderValue;
20use http::StatusCode;
21use http_body_util::BodyExt;
22use http_body_util::Full;
23use hyper::Version;
24use hyper_rustls::HttpsConnector;
25use hyper_util::client::legacy::{connect::HttpConnector, Client};
26use tokio::sync::Semaphore;
27use tracing::debug;
28
29use crate::executors::common::SubgraphExecutionRequest;
30use crate::executors::error::SubgraphExecutorError;
31use crate::utils::consts::CLOSE_BRACE;
32use crate::utils::consts::COLON;
33use crate::utils::consts::COMMA;
34use crate::utils::consts::QUOTE;
35use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string};
36use hive_router_internal::telemetry::traces::spans::http_request::HttpClientRequestSpan;
37use hive_router_internal::telemetry::traces::spans::http_request::HttpInflightRequestSpan;
38use hive_router_internal::telemetry::Injector;
39use http::HeaderName;
40use tracing::Instrument;
41
42pub struct HTTPSubgraphExecutor {
43 pub subgraph_name: String,
44 pub endpoint: http::Uri,
45 pub http_client: Arc<HttpClient>,
46 pub header_map: HeaderMap,
47 pub semaphore: Arc<Semaphore>,
48 pub dedupe_enabled: bool,
49 pub in_flight_requests: InflightRequestsMap,
50 pub telemetry_context: Arc<TelemetryContext>,
51 pub config: Arc<HiveRouterConfig>,
52}
53
54const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
55const FIRST_QUOTE_STR: &[u8] = b"{\"query\":";
56
57pub type HttpClient = Client<HttpsConnector<HttpConnector>, Full<Bytes>>;
58
59impl HTTPSubgraphExecutor {
60 #[allow(clippy::too_many_arguments)]
61 pub fn new(
62 subgraph_name: String,
63 endpoint: http::Uri,
64 http_client: Arc<HttpClient>,
65 semaphore: Arc<Semaphore>,
66 dedupe_enabled: bool,
67 in_flight_requests: InflightRequestsMap,
68 telemetry_context: Arc<TelemetryContext>,
69 config: Arc<HiveRouterConfig>,
70 ) -> Self {
71 let mut header_map = HeaderMap::new();
72 header_map.insert(
73 http::header::CONTENT_TYPE,
74 HeaderValue::from_static("application/json; charset=utf-8"),
75 );
76 header_map.insert(
77 http::header::CONNECTION,
78 HeaderValue::from_static("keep-alive"),
79 );
80
81 Self {
82 subgraph_name,
83 endpoint,
84 http_client,
85 header_map,
86 semaphore,
87 dedupe_enabled,
88 in_flight_requests,
89 telemetry_context,
90 config,
91 }
92 }
93
94 fn build_request_body<'a>(
95 &self,
96 execution_request: &SubgraphExecutionRequest<'a>,
97 ) -> Result<Vec<u8>, SubgraphExecutorError> {
98 let mut body = Vec::with_capacity(4096);
99 body.put(FIRST_QUOTE_STR);
100 write_and_escape_string(&mut body, execution_request.query);
101 let mut first_variable = true;
102 if let Some(variables) = &execution_request.variables {
103 for (variable_name, variable_value) in variables {
104 if first_variable {
105 body.put(FIRST_VARIABLE_STR);
106 first_variable = false;
107 } else {
108 body.put(COMMA);
109 }
110 body.put(QUOTE);
111 body.put(variable_name.as_bytes());
112 body.put(QUOTE);
113 body.put(COLON);
114 let value_str = sonic_rs::to_string(variable_value).map_err(|err| {
115 SubgraphExecutorError::VariablesSerializationFailure(
116 variable_name.to_string(),
117 err,
118 )
119 })?;
120 body.put(value_str.as_bytes());
121 }
122 }
123 if let Some(representations) = &execution_request.representations {
124 if first_variable {
125 body.put(FIRST_VARIABLE_STR);
126 first_variable = false;
127 } else {
128 body.put(COMMA);
129 }
130 body.put("\"representations\":".as_bytes());
131 body.extend_from_slice(representations);
132 }
133 if !first_variable {
135 body.put(CLOSE_BRACE);
136 }
137
138 if let Some(extensions) = &execution_request.extensions {
139 if !extensions.is_empty() {
140 let as_value = sonic_rs::to_value(extensions).unwrap();
141
142 body.put(COMMA);
143 body.put("\"extensions\":".as_bytes());
144 body.extend_from_slice(as_value.to_string().as_bytes());
145 }
146 }
147
148 body.put(CLOSE_BRACE);
149
150 Ok(body)
151 }
152}
153
154pub struct SendRequestOpts<'a> {
155 pub http_client: &'a HttpClient,
156 pub endpoint: &'a http::Uri,
157 pub method: http::Method,
158 pub body: Vec<u8>,
159 pub headers: HeaderMap,
160 pub timeout: Option<Duration>,
161 pub telemetry_context: Arc<TelemetryContext>,
162}
163
164async fn send_request<'a>(
165 opts: SendRequestOpts<'a>,
166) -> Result<SubgraphHttpResponse, SubgraphExecutorError> {
167 let SendRequestOpts {
168 http_client,
169 endpoint,
170 method,
171 body,
172 headers,
173 timeout,
174 telemetry_context,
175 } = opts;
176
177 let mut req = hyper::Request::builder()
178 .method(method)
179 .uri(endpoint)
180 .version(Version::HTTP_11)
181 .body(Full::new(Bytes::from(body)))?;
182
183 *req.headers_mut() = headers;
184
185 debug!("making http request to {}", endpoint.to_string());
186
187 let http_request_span = HttpClientRequestSpan::from_request(&req);
188
189 async {
190 telemetry_context.inject_context(&mut TraceHeaderInjector(req.headers_mut()));
193
194 let res_fut = http_client.request(req);
195
196 let res = if let Some(timeout_duration) = timeout {
197 tokio::time::timeout(timeout_duration, res_fut)
198 .await
199 .map_err(|_| SubgraphExecutorError::RequestTimeout(timeout_duration.as_millis()))?
200 } else {
201 res_fut.await
202 }?;
203
204 http_request_span.record_response(&res);
205
206 debug!(
207 "http request to {} completed, status: {}",
208 endpoint.to_string(),
209 res.status()
210 );
211
212 let (parts, body) = res.into_parts();
213 let body = body.collect().await?.to_bytes();
214
215 if body.is_empty() {
216 return Err(SubgraphExecutorError::EmptyResponseBody);
217 }
218
219 Ok(SubgraphHttpResponse {
220 status: parts.status,
221 body,
222 headers: parts.headers.into(),
223 })
224 }
225 .instrument(http_request_span.clone())
226 .await
227}
228
229pub enum DeduplicationHint {
230 Deduped {
231 fingerprint: u64,
232 leader_id: u64,
233 is_leader: bool,
234 },
235 NotDeduped,
236}
237
238#[async_trait]
239impl SubgraphExecutor for HTTPSubgraphExecutor {
240 fn endpoint(&self) -> &http::Uri {
241 &self.endpoint
242 }
243 async fn execute<'a>(
244 &self,
245 mut execution_request: SubgraphExecutionRequest<'a>,
246 timeout: Option<Duration>,
247 plugin_req_state: &'a Option<PluginRequestState<'a>>,
248 ) -> Result<SubgraphResponse<'a>, SubgraphExecutorError> {
249 let mut body = self.build_request_body(&execution_request)?;
250
251 self.header_map.iter().for_each(|(key, value)| {
252 execution_request.headers.insert(key, value.clone());
253 });
254
255 let mut method = http::Method::POST;
256 let mut deduplicate_request = !self.dedupe_enabled || !execution_request.dedupe;
257
258 let mut on_end_callbacks = vec![];
259 let mut response = None;
260
261 if let Some(plugin_req_state) = plugin_req_state.as_ref() {
262 let mut start_payload = OnSubgraphHttpRequestHookPayload {
263 subgraph_name: &self.subgraph_name,
264 endpoint: &self.endpoint,
265 method,
266 body,
267 execution_request,
268 deduplicate_request,
269 context: &plugin_req_state.context,
270 };
271 for plugin in plugin_req_state.plugins.as_ref() {
272 let result = plugin.on_subgraph_http_request(start_payload).await;
273 start_payload = result.payload;
274 match result.control_flow {
275 StartControlFlow::Proceed => { }
276 StartControlFlow::EndWithResponse(early_response) => {
277 response = Some(early_response);
278 break;
280 }
281 StartControlFlow::OnEnd(callback) => {
282 on_end_callbacks.push(callback);
283 }
284 }
285 }
286 method = start_payload.method;
288 execution_request = start_payload.execution_request;
289 body = start_payload.body;
290 deduplicate_request = start_payload.deduplicate_request;
291 }
292
293 let mut deduplication_hint = DeduplicationHint::NotDeduped;
294
295 let mut response = match response {
296 Some(resp) => resp,
297 None => {
298 let send_request_opts = SendRequestOpts {
299 http_client: &self.http_client,
300 endpoint: &self.endpoint,
301 method,
302 body,
303 headers: execution_request.headers,
304 timeout,
305 telemetry_context: self.telemetry_context.clone(),
306 };
307
308 if deduplicate_request {
309 let _permit = self.semaphore.acquire().await.unwrap();
312 send_request(send_request_opts).await?
313 } else {
314 let fingerprint = send_request_opts.fingerprint();
315
316 let inflight_span = HttpInflightRequestSpan::new(
317 &send_request_opts.method,
318 send_request_opts.endpoint,
319 &send_request_opts.headers,
320 &send_request_opts.body,
321 );
322
323 let result: Result<SubgraphHttpResponse, SubgraphExecutorError> = async {
324 let cell = self
327 .in_flight_requests
328 .entry(fingerprint)
329 .or_default()
330 .clone();
331 let mut is_leader = false;
333 let (shared_response, leader_id) = cell
334 .get_or_try_init(|| async {
335 is_leader = true;
337 let res = {
338 let _permit = self.semaphore.acquire().await.unwrap();
341 send_request(send_request_opts).await
342 };
343 self.in_flight_requests.remove(&fingerprint);
347 res.map(|res| (res, unique_leader_fingerprint()))
348 })
349 .await?;
350
351 if is_leader {
352 inflight_span.record_as_leader(leader_id);
353 } else {
354 inflight_span.record_as_joiner(leader_id);
355 }
356
357 inflight_span
358 .record_response(&shared_response.body, &shared_response.status);
359
360 deduplication_hint = DeduplicationHint::Deduped {
361 fingerprint,
362 leader_id: *leader_id,
363 is_leader,
364 };
365
366 Ok(shared_response.clone())
367 }
368 .instrument(inflight_span.clone())
369 .await;
370
371 result?
372 }
373 }
374 };
375
376 if !on_end_callbacks.is_empty() {
377 let mut end_payload = OnSubgraphHttpResponseHookPayload {
378 context: &plugin_req_state.as_ref().unwrap().context,
379 response,
380 deduplication_hint,
381 };
382 for callback in on_end_callbacks {
383 let result = callback(end_payload);
384 end_payload = result.payload;
385 match result.control_flow {
386 EndControlFlow::Proceed => { }
387 EndControlFlow::EndWithResponse(early_response) => {
388 end_payload.response = early_response;
389 break;
391 }
392 }
393 }
394 response = end_payload.response;
396 }
397
398 response.deserialize_http_response()
399 }
400}
401
402#[derive(Default, Clone)]
403pub struct SubgraphHttpResponse {
404 pub status: StatusCode,
405 pub headers: Arc<HeaderMap>,
406 pub body: Bytes,
407}
408
409impl SubgraphHttpResponse {
410 fn deserialize_http_response<'a>(self) -> Result<SubgraphResponse<'a>, SubgraphExecutorError> {
411 let bytes_ref: &[u8] = &self.body;
412
413 let bytes_ref: &'a [u8] = unsafe { std::mem::transmute(bytes_ref) };
420
421 sonic_rs::from_slice(bytes_ref)
422 .map_err(SubgraphExecutorError::ResponseDeserializationFailure)
423 .map(|mut resp: SubgraphResponse<'a>| {
424 resp.headers = Some(self.headers);
426 resp.bytes = Some(self.body);
428 resp
429 })
430 }
431}
432
433struct TraceHeaderInjector<'a>(pub &'a mut HeaderMap);
434
435impl<'a> Injector for TraceHeaderInjector<'a> {
436 fn set(&mut self, key: &str, value: String) {
437 let Ok(name) = HeaderName::from_bytes(key.as_bytes()) else {
438 return;
439 };
440
441 let Ok(val) = HeaderValue::from_str(&value) else {
442 return;
443 };
444
445 self.0.insert(name, val);
446 }
447}