1pub mod background_tasks;
2mod consts;
3pub mod error;
4mod http_utils;
5mod jwt;
6pub mod pipeline;
7mod schema_state;
8mod shared_state;
9mod supergraph;
10pub mod telemetry;
11mod utils;
12
13use std::sync::Arc;
14
15use crate::{
16 background_tasks::BackgroundTasksManager,
17 consts::ROUTER_VERSION,
18 error::RouterInitError,
19 http_utils::{
20 landing_page::landing_page_handler,
21 probes::{health_check_handler, readiness_check_handler},
22 },
23 jwt::JwtAuthRuntime,
24 pipeline::{
25 error::PipelineError,
26 graphql_request_handler,
27 header::{RequestAccepts, ResponseMode, TEXT_HTML_MIME},
28 usage_reporting::init_hive_usage_agent,
29 validation::{
30 max_aliases_rule::MaxAliasesRule, max_depth_rule::MaxDepthRule,
31 max_directives_rule::MaxDirectivesRule,
32 },
33 },
34 telemetry::HeaderExtractor,
35};
36
37pub use crate::{schema_state::SchemaState, shared_state::RouterSharedState};
38
39use graphql_tools::validation::rules::default_rules_validation_plan;
40use hive_router_config::{load_config, HiveRouterConfig};
41use hive_router_internal::telemetry::{
42 otel::tracing_opentelemetry::OpenTelemetrySpanExt,
43 traces::spans::http_request::HttpServerRequestSpan, TelemetryContext,
44};
45use http::header::{CONTENT_TYPE, RETRY_AFTER};
46use ntex::util::{select, Either};
47use ntex::{
48 time::sleep,
49 web::{self, HttpRequest},
50};
51use tracing::{info, warn, Instrument};
52
53static GRAPHIQL_HTML: &str = include_str!("../static/graphiql.html");
54
55async fn graphql_endpoint_handler(
56 request: HttpRequest,
57 body_stream: web::types::Payload,
58 schema_state: web::types::State<Arc<SchemaState>>,
59 app_state: web::types::State<Arc<RouterSharedState>>,
60) -> impl web::Responder {
61 if let Some(supergraph) = schema_state.current_supergraph().as_ref() {
62 if let Some(early_response) = app_state
64 .cors_runtime
65 .as_ref()
66 .and_then(|cors| cors.get_early_response(&request))
67 {
68 return early_response;
69 }
70
71 let response_mode = match request.negotiate() {
74 Ok(response_mode) => response_mode,
75 Err(err) => return err.into_response(None),
76 };
77
78 if response_mode == ResponseMode::GraphiQL {
79 if app_state.router_config.graphiql.enabled {
80 return web::HttpResponse::Ok()
81 .header(CONTENT_TYPE, TEXT_HTML_MIME)
82 .body(GRAPHIQL_HTML);
83 } else {
84 return web::HttpResponse::NotFound().into();
85 }
86 }
87
88 let parent_ctx = app_state
89 .telemetry_context
90 .extract_context(&HeaderExtractor(request.headers()));
91 let root_http_request_span = HttpServerRequestSpan::from_request(&request);
92 let _ = root_http_request_span.set_parent(parent_ctx);
93
94 async {
95 let timeout_fut = sleep(
96 app_state
97 .router_config
98 .traffic_shaping
99 .router
100 .request_timeout,
101 );
102 let req_handler_fut = graphql_request_handler(
103 &request,
104 body_stream,
105 &response_mode,
106 supergraph,
107 app_state.get_ref(),
108 schema_state.get_ref(),
109 &root_http_request_span,
110 );
111 let mut res = match select(timeout_fut, req_handler_fut).await {
112 Either::Left(_) => {
114 let err = PipelineError::TimeoutError;
115 return {
116 tracing::error!("{}", err);
117 err.into_response(Some(response_mode))
118 };
119 }
120 Either::Right(Ok(response)) => response,
122 Either::Right(Err(err)) => {
124 return {
125 tracing::error!("{}", err);
126 err.into_response(Some(response_mode))
127 }
128 }
129 };
130
131 if let Some(cors) = app_state.cors_runtime.as_ref() {
133 cors.set_headers(&request, res.headers_mut());
134 }
135
136 root_http_request_span.record_response(&res);
137
138 res
139 }
140 .instrument(root_http_request_span.clone())
141 .await
142 } else {
143 warn!("No supergraph available yet, unable to process request");
144
145 web::HttpResponse::ServiceUnavailable()
146 .header(RETRY_AFTER, 10)
147 .finish()
148 }
149}
150
151pub async fn router_entrypoint() -> Result<(), RouterInitError> {
152 let config_path = std::env::var("ROUTER_CONFIG_FILE_PATH").ok();
153 let router_config = load_config(config_path)?;
154 let telemetry = telemetry::Telemetry::init_global(&router_config)?;
155 info!("hive-router@{} starting...", ROUTER_VERSION);
156 let http_config = router_config.http.clone();
157 let addr = router_config.http.address();
158 let mut bg_tasks_manager = BackgroundTasksManager::new();
159 let (shared_state, schema_state) = configure_app_from_config(
160 router_config,
161 telemetry.context.clone(),
162 &mut bg_tasks_manager,
163 )
164 .await?;
165
166 let maybe_error = web::HttpServer::new(async move || {
167 let lp_gql_path = http_config.graphql_endpoint().to_string();
168 web::App::new()
169 .state(shared_state.clone())
170 .state(schema_state.clone())
171 .configure(|m| configure_ntex_app(m, http_config.graphql_endpoint()))
172 .default_service(web::to(move || landing_page_handler(lp_gql_path.clone())))
173 })
174 .bind(&addr)
175 .map_err(|err| RouterInitError::HttpServerBindError(addr, err))?
176 .run()
177 .await
178 .map_err(RouterInitError::HttpServerStartError);
179
180 info!("server stopped, clearning background tasks");
181 bg_tasks_manager.shutdown();
182 telemetry.graceful_shutdown().await;
183
184 maybe_error
185}
186
187pub async fn configure_app_from_config(
188 router_config: HiveRouterConfig,
189 telemetry_context: TelemetryContext,
190 bg_tasks_manager: &mut BackgroundTasksManager,
191) -> Result<(Arc<RouterSharedState>, Arc<SchemaState>), RouterInitError> {
192 let jwt_runtime = match router_config.jwt.is_jwt_auth_enabled() {
193 true => Some(JwtAuthRuntime::init(bg_tasks_manager, &router_config.jwt).await?),
194 false => None,
195 };
196
197 let hive_usage_agent = match router_config.telemetry.hive.as_ref() {
198 Some(hive_config) if hive_config.usage_reporting.enabled => {
199 Some(init_hive_usage_agent(bg_tasks_manager, hive_config)?)
200 }
201 _ => None,
202 };
203
204 let router_config_arc = Arc::new(router_config);
205 let telemetry_context_arc = Arc::new(telemetry_context);
206 let schema_state = SchemaState::new_from_config(
207 bg_tasks_manager,
208 telemetry_context_arc.clone(),
209 router_config_arc.clone(),
210 )
211 .await?;
212 let schema_state_arc = Arc::new(schema_state);
213 let mut validation_plan = default_rules_validation_plan();
214 if let Some(max_depth_config) = &router_config_arc.limits.max_depth {
215 validation_plan.add_rule(Box::new(MaxDepthRule {
216 config: max_depth_config.clone(),
217 }));
218 }
219 if let Some(max_directives_config) = &router_config_arc.limits.max_directives {
220 validation_plan.add_rule(Box::new(MaxDirectivesRule {
221 config: max_directives_config.clone(),
222 }));
223 }
224 if let Some(max_aliases_config) = &router_config_arc.limits.max_aliases {
225 validation_plan.add_rule(Box::new(MaxAliasesRule {
226 config: max_aliases_config.clone(),
227 }));
228 }
229 let shared_state = Arc::new(RouterSharedState::new(
230 router_config_arc,
231 jwt_runtime,
232 hive_usage_agent,
233 validation_plan,
234 telemetry_context_arc,
235 )?);
236
237 Ok((shared_state, schema_state_arc))
238}
239
240pub fn configure_ntex_app(cfg: &mut web::ServiceConfig, graphql_path: &str) {
241 cfg.route(graphql_path, web::to(graphql_endpoint_handler))
242 .route("/health", web::to(health_check_handler))
243 .route("/readiness", web::to(readiness_check_handler));
244}
245
246pub fn init_rustls_crypto_provider() {
265 if rustls::crypto::aws_lc_rs::default_provider()
266 .install_default()
267 .is_err()
268 {
269 warn!("Rustls crypto provider already installed");
270 }
271}