Skip to main content

hive_router_plan_executor/executors/
http.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::executors::dedupe::unique_leader_fingerprint;
5use crate::executors::map::InflightRequestsMap;
6use crate::hooks::on_subgraph_http_request::{
7    OnSubgraphHttpRequestHookPayload, OnSubgraphHttpResponseHookPayload,
8};
9use crate::plugin_context::PluginRequestState;
10use crate::plugin_trait::{EndControlFlow, StartControlFlow};
11use crate::response::subgraph_response::SubgraphResponse;
12use hive_router_config::HiveRouterConfig;
13use hive_router_internal::telemetry::TelemetryContext;
14
15use async_trait::async_trait;
16
17use bytes::{BufMut, Bytes};
18use http::HeaderMap;
19use http::HeaderValue;
20use http::StatusCode;
21use http_body_util::BodyExt;
22use http_body_util::Full;
23use hyper::Version;
24use hyper_rustls::HttpsConnector;
25use hyper_util::client::legacy::{connect::HttpConnector, Client};
26use tokio::sync::Semaphore;
27use tracing::debug;
28
29use crate::executors::common::SubgraphExecutionRequest;
30use crate::executors::error::SubgraphExecutorError;
31use crate::utils::consts::CLOSE_BRACE;
32use crate::utils::consts::COLON;
33use crate::utils::consts::COMMA;
34use crate::utils::consts::QUOTE;
35use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string};
36use hive_router_internal::telemetry::traces::spans::http_request::HttpClientRequestSpan;
37use hive_router_internal::telemetry::traces::spans::http_request::HttpInflightRequestSpan;
38use hive_router_internal::telemetry::Injector;
39use http::HeaderName;
40use tracing::Instrument;
41
42pub struct HTTPSubgraphExecutor {
43    pub subgraph_name: String,
44    pub endpoint: http::Uri,
45    pub http_client: Arc<HttpClient>,
46    pub header_map: HeaderMap,
47    pub semaphore: Arc<Semaphore>,
48    pub dedupe_enabled: bool,
49    pub in_flight_requests: InflightRequestsMap,
50    pub telemetry_context: Arc<TelemetryContext>,
51    pub config: Arc<HiveRouterConfig>,
52}
53
54const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
55const FIRST_QUOTE_STR: &[u8] = b"{\"query\":";
56
57pub type HttpClient = Client<HttpsConnector<HttpConnector>, Full<Bytes>>;
58
59impl HTTPSubgraphExecutor {
60    #[allow(clippy::too_many_arguments)]
61    pub fn new(
62        subgraph_name: String,
63        endpoint: http::Uri,
64        http_client: Arc<HttpClient>,
65        semaphore: Arc<Semaphore>,
66        dedupe_enabled: bool,
67        in_flight_requests: InflightRequestsMap,
68        telemetry_context: Arc<TelemetryContext>,
69        config: Arc<HiveRouterConfig>,
70    ) -> Self {
71        let mut header_map = HeaderMap::new();
72        header_map.insert(
73            http::header::CONTENT_TYPE,
74            HeaderValue::from_static("application/json; charset=utf-8"),
75        );
76        header_map.insert(
77            http::header::CONNECTION,
78            HeaderValue::from_static("keep-alive"),
79        );
80
81        Self {
82            subgraph_name,
83            endpoint,
84            http_client,
85            header_map,
86            semaphore,
87            dedupe_enabled,
88            in_flight_requests,
89            telemetry_context,
90            config,
91        }
92    }
93
94    fn build_request_body<'a>(
95        &self,
96        execution_request: &SubgraphExecutionRequest<'a>,
97    ) -> Result<Vec<u8>, SubgraphExecutorError> {
98        let mut body = Vec::with_capacity(4096);
99        body.put(FIRST_QUOTE_STR);
100        write_and_escape_string(&mut body, execution_request.query);
101        let mut first_variable = true;
102        if let Some(variables) = &execution_request.variables {
103            for (variable_name, variable_value) in variables {
104                if first_variable {
105                    body.put(FIRST_VARIABLE_STR);
106                    first_variable = false;
107                } else {
108                    body.put(COMMA);
109                }
110                body.put(QUOTE);
111                body.put(variable_name.as_bytes());
112                body.put(QUOTE);
113                body.put(COLON);
114                let value_str = sonic_rs::to_string(variable_value).map_err(|err| {
115                    SubgraphExecutorError::VariablesSerializationFailure(
116                        variable_name.to_string(),
117                        err,
118                    )
119                })?;
120                body.put(value_str.as_bytes());
121            }
122        }
123        if let Some(representations) = &execution_request.representations {
124            if first_variable {
125                body.put(FIRST_VARIABLE_STR);
126                first_variable = false;
127            } else {
128                body.put(COMMA);
129            }
130            body.put("\"representations\":".as_bytes());
131            body.extend_from_slice(representations);
132        }
133        // "first_variable" should be still true if there are no variables
134        if !first_variable {
135            body.put(CLOSE_BRACE);
136        }
137
138        if let Some(extensions) = &execution_request.extensions {
139            if !extensions.is_empty() {
140                let as_value = sonic_rs::to_value(extensions).unwrap();
141
142                body.put(COMMA);
143                body.put("\"extensions\":".as_bytes());
144                body.extend_from_slice(as_value.to_string().as_bytes());
145            }
146        }
147
148        body.put(CLOSE_BRACE);
149
150        Ok(body)
151    }
152}
153
154pub struct SendRequestOpts<'a> {
155    pub http_client: &'a HttpClient,
156    pub endpoint: &'a http::Uri,
157    pub method: http::Method,
158    pub body: Vec<u8>,
159    pub headers: HeaderMap,
160    pub timeout: Option<Duration>,
161    pub telemetry_context: Arc<TelemetryContext>,
162}
163
164async fn send_request<'a>(
165    opts: SendRequestOpts<'a>,
166) -> Result<SubgraphHttpResponse, SubgraphExecutorError> {
167    let SendRequestOpts {
168        http_client,
169        endpoint,
170        method,
171        body,
172        headers,
173        timeout,
174        telemetry_context,
175    } = opts;
176
177    let mut req = hyper::Request::builder()
178        .method(method)
179        .uri(endpoint)
180        .version(Version::HTTP_11)
181        .body(Full::new(Bytes::from(body)))?;
182
183    *req.headers_mut() = headers;
184
185    debug!("making http request to {}", endpoint.to_string());
186
187    let http_request_span = HttpClientRequestSpan::from_request(&req);
188
189    async {
190        // TODO: let's decide at some point if the tracing headers
191        //       should be part of the fingerprint or not.
192        telemetry_context.inject_context(&mut TraceHeaderInjector(req.headers_mut()));
193
194        let res_fut = http_client.request(req);
195
196        let res = if let Some(timeout_duration) = timeout {
197            tokio::time::timeout(timeout_duration, res_fut)
198                .await
199                .map_err(|_| SubgraphExecutorError::RequestTimeout(timeout_duration.as_millis()))?
200        } else {
201            res_fut.await
202        }?;
203
204        http_request_span.record_response(&res);
205
206        debug!(
207            "http request to {} completed, status: {}",
208            endpoint.to_string(),
209            res.status()
210        );
211
212        let (parts, body) = res.into_parts();
213        let body = body.collect().await?.to_bytes();
214
215        if body.is_empty() {
216            return Err(SubgraphExecutorError::EmptyResponseBody);
217        }
218
219        Ok(SubgraphHttpResponse {
220            status: parts.status,
221            body,
222            headers: parts.headers.into(),
223        })
224    }
225    .instrument(http_request_span.clone())
226    .await
227}
228
229pub enum DeduplicationHint {
230    Deduped {
231        fingerprint: u64,
232        leader_id: u64,
233        is_leader: bool,
234    },
235    NotDeduped,
236}
237
238#[async_trait]
239impl SubgraphExecutor for HTTPSubgraphExecutor {
240    fn endpoint(&self) -> &http::Uri {
241        &self.endpoint
242    }
243    async fn execute<'a>(
244        &self,
245        mut execution_request: SubgraphExecutionRequest<'a>,
246        timeout: Option<Duration>,
247        plugin_req_state: &'a Option<PluginRequestState<'a>>,
248    ) -> Result<SubgraphResponse<'a>, SubgraphExecutorError> {
249        let mut body = self.build_request_body(&execution_request)?;
250
251        self.header_map.iter().for_each(|(key, value)| {
252            execution_request.headers.insert(key, value.clone());
253        });
254
255        let mut method = http::Method::POST;
256        let mut deduplicate_request = !self.dedupe_enabled || !execution_request.dedupe;
257
258        let mut on_end_callbacks = vec![];
259        let mut response = None;
260
261        if let Some(plugin_req_state) = plugin_req_state.as_ref() {
262            let mut start_payload = OnSubgraphHttpRequestHookPayload {
263                subgraph_name: &self.subgraph_name,
264                endpoint: &self.endpoint,
265                method,
266                body,
267                execution_request,
268                deduplicate_request,
269                context: &plugin_req_state.context,
270            };
271            for plugin in plugin_req_state.plugins.as_ref() {
272                let result = plugin.on_subgraph_http_request(start_payload).await;
273                start_payload = result.payload;
274                match result.control_flow {
275                    StartControlFlow::Proceed => { /* continue to next plugin */ }
276                    StartControlFlow::EndWithResponse(early_response) => {
277                        response = Some(early_response);
278                        // Break so other plugins are not called
279                        break;
280                    }
281                    StartControlFlow::OnEnd(callback) => {
282                        on_end_callbacks.push(callback);
283                    }
284                }
285            }
286            // Give the ownership back to variables
287            method = start_payload.method;
288            execution_request = start_payload.execution_request;
289            body = start_payload.body;
290            deduplicate_request = start_payload.deduplicate_request;
291        }
292
293        let mut deduplication_hint = DeduplicationHint::NotDeduped;
294
295        let mut response = match response {
296            Some(resp) => resp,
297            None => {
298                let send_request_opts = SendRequestOpts {
299                    http_client: &self.http_client,
300                    endpoint: &self.endpoint,
301                    method,
302                    body,
303                    headers: execution_request.headers,
304                    timeout,
305                    telemetry_context: self.telemetry_context.clone(),
306                };
307
308                if deduplicate_request {
309                    // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
310                    // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
311                    let _permit = self.semaphore.acquire().await.unwrap();
312                    send_request(send_request_opts).await?
313                } else {
314                    let fingerprint = send_request_opts.fingerprint();
315
316                    let inflight_span = HttpInflightRequestSpan::new(
317                        &send_request_opts.method,
318                        send_request_opts.endpoint,
319                        &send_request_opts.headers,
320                        &send_request_opts.body,
321                    );
322
323                    let result: Result<SubgraphHttpResponse, SubgraphExecutorError> = async {
324                        // Clone the cell from the map, dropping the lock from the DashMap immediately.
325                        // Prevents any deadlocks.
326                        let cell = self
327                            .in_flight_requests
328                            .entry(fingerprint)
329                            .or_default()
330                            .clone();
331                        // Mark it as a joiner span by default.
332                        let mut is_leader = false;
333                        let (shared_response, leader_id) = cell
334                            .get_or_try_init(|| async {
335                                // Override the span to be a leader span for this request.
336                                is_leader = true;
337                                let res = {
338                                    // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
339                                    // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
340                                    let _permit = self.semaphore.acquire().await.unwrap();
341                                    send_request(send_request_opts).await
342                                };
343                                // It's important to remove the entry from the map before returning the result.
344                                // This ensures that once the OnceCell is set, no future requests can join it.
345                                // The cache is for the lifetime of the in-flight request only.
346                                self.in_flight_requests.remove(&fingerprint);
347                                res.map(|res| (res, unique_leader_fingerprint()))
348                            })
349                            .await?;
350
351                        if is_leader {
352                            inflight_span.record_as_leader(leader_id);
353                        } else {
354                            inflight_span.record_as_joiner(leader_id);
355                        }
356
357                        inflight_span
358                            .record_response(&shared_response.body, &shared_response.status);
359
360                        deduplication_hint = DeduplicationHint::Deduped {
361                            fingerprint,
362                            leader_id: *leader_id,
363                            is_leader,
364                        };
365
366                        Ok(shared_response.clone())
367                    }
368                    .instrument(inflight_span.clone())
369                    .await;
370
371                    result?
372                }
373            }
374        };
375
376        if !on_end_callbacks.is_empty() {
377            let mut end_payload = OnSubgraphHttpResponseHookPayload {
378                context: &plugin_req_state.as_ref().unwrap().context,
379                response,
380                deduplication_hint,
381            };
382            for callback in on_end_callbacks {
383                let result = callback(end_payload);
384                end_payload = result.payload;
385                match result.control_flow {
386                    EndControlFlow::Proceed => { /* continue to next plugin */ }
387                    EndControlFlow::EndWithResponse(early_response) => {
388                        end_payload.response = early_response;
389                        // Break so other plugins are not called
390                        break;
391                    }
392                }
393            }
394            // Give the ownership back to variables
395            response = end_payload.response;
396        }
397
398        response.deserialize_http_response()
399    }
400}
401
402#[derive(Default, Clone)]
403pub struct SubgraphHttpResponse {
404    pub status: StatusCode,
405    pub headers: Arc<HeaderMap>,
406    pub body: Bytes,
407}
408
409impl SubgraphHttpResponse {
410    fn deserialize_http_response<'a>(self) -> Result<SubgraphResponse<'a>, SubgraphExecutorError> {
411        let bytes_ref: &[u8] = &self.body;
412
413        // SAFETY: The byte slice `bytes_ref` is transmuted to have lifetime `'a`.
414        // This is safe because the returned `SubgraphResponse` contains a clone of `self.body`
415        // in its `bytes` field. `Bytes` is a reference-counted buffer, so this ensures the
416        // underlying data remains alive as long as the `SubgraphResponse` does.
417        // The `data` field of `SubgraphResponse` contains values that borrow from this buffer,
418        // creating a self-referential struct, which is why `unsafe` is required.
419        let bytes_ref: &'a [u8] = unsafe { std::mem::transmute(bytes_ref) };
420
421        sonic_rs::from_slice(bytes_ref)
422            .map_err(SubgraphExecutorError::ResponseDeserializationFailure)
423            .map(|mut resp: SubgraphResponse<'a>| {
424                // This is Arc
425                resp.headers = Some(self.headers);
426                // Zero cost of cloning Bytes
427                resp.bytes = Some(self.body);
428                resp
429            })
430    }
431}
432
433struct TraceHeaderInjector<'a>(pub &'a mut HeaderMap);
434
435impl<'a> Injector for TraceHeaderInjector<'a> {
436    fn set(&mut self, key: &str, value: String) {
437        let Ok(name) = HeaderName::from_bytes(key.as_bytes()) else {
438            return;
439        };
440
441        let Ok(val) = HeaderValue::from_str(&value) else {
442            return;
443        };
444
445        self.0.insert(name, val);
446    }
447}