hive_router/
schema_state.rs1use arc_swap::{ArcSwap, Guard};
2use async_trait::async_trait;
3use graphql_tools::validation::utils::ValidationError;
4use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
5use hive_router_plan_executor::{
6 executors::error::SubgraphExecutorError,
7 introspection::schema::{SchemaMetadata, SchemaWithMetadata},
8 SubgraphExecutorMap,
9};
10use hive_router_query_planner::planner::plan_nodes::QueryPlan;
11use hive_router_query_planner::{
12 planner::{Planner, PlannerError},
13 state::supergraph_state::SupergraphState,
14 utils::parsing::parse_schema,
15};
16use moka::future::Cache;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, error, trace};
21
22use crate::{
23 background_tasks::{BackgroundTask, BackgroundTasksManager},
24 pipeline::{
25 authorization::{AuthorizationMetadata, AuthorizationMetadataError},
26 normalize::GraphQLNormalizationPayload,
27 },
28 supergraph::{
29 base::{LoadSupergraphError, ReloadSupergraphResult, SupergraphLoader},
30 resolve_from_config,
31 },
32};
33
34pub struct SchemaState {
35 current_swapable: Arc<ArcSwap<Option<SupergraphData>>>,
36 pub plan_cache: Cache<u64, Arc<QueryPlan>>,
37 pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
38 pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
39}
40
41pub struct SupergraphData {
42 pub metadata: SchemaMetadata,
43 pub planner: Planner,
44 pub authorization: AuthorizationMetadata,
45 pub subgraph_executor_map: SubgraphExecutorMap,
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum SupergraphManagerError {
50 #[error("Failed to load supergraph: {0}")]
51 LoadSupergraphError(#[from] LoadSupergraphError),
52
53 #[error("Failed to build planner: {0}")]
54 PlannerBuilderError(#[from] PlannerError),
55 #[error("Failed to build authorization: {0}")]
56 AuthorizationMetadataError(#[from] AuthorizationMetadataError),
57 #[error(transparent)]
58 ExecutorInitError(#[from] SubgraphExecutorError),
59 #[error("Unexpected: failed to load initial supergraph")]
60 FailedToLoadInitialSupergraph,
61}
62
63impl SchemaState {
64 pub fn current_supergraph(&self) -> Guard<Arc<Option<SupergraphData>>> {
65 self.current_swapable.load()
66 }
67
68 pub fn is_ready(&self) -> bool {
69 self.current_supergraph().is_some()
70 }
71
72 pub async fn new_from_config(
73 bg_tasks_manager: &mut BackgroundTasksManager,
74 router_config: Arc<HiveRouterConfig>,
75 ) -> Result<Self, SupergraphManagerError> {
76 let (tx, mut rx) = mpsc::channel::<String>(1);
77 let background_loader = SupergraphBackgroundLoader::new(&router_config.supergraph, tx)?;
78 bg_tasks_manager.register_task(Arc::new(background_loader));
79
80 let swappable_data = Arc::new(ArcSwap::from(Arc::new(None)));
81 let swappable_data_spawn_clone = swappable_data.clone();
82 let plan_cache = Cache::new(1000);
83 let validate_cache = Cache::new(1000);
84 let normalize_cache = Cache::new(1000);
85
86 let task_plan_cache = plan_cache.clone();
88 let validate_cache_cache = validate_cache.clone();
89 let normalize_cache_cache = normalize_cache.clone();
90
91 bg_tasks_manager.register_handle(async move {
92 while let Some(new_sdl) = rx.recv().await {
93 debug!("Received new supergraph SDL, building new supergraph state...");
94
95 match Self::build_data(router_config.clone(), &new_sdl) {
96 Ok(new_data) => {
97 swappable_data_spawn_clone.store(Arc::new(Some(new_data)));
98 debug!("Supergraph updated successfully");
99
100 task_plan_cache.invalidate_all();
101 validate_cache_cache.invalidate_all();
102 normalize_cache_cache.invalidate_all();
103 debug!("Schema-associated caches cleared successfully");
104 }
105 Err(e) => {
106 error!("Failed to build new supergraph data: {}", e);
107 }
108 }
109 }
110 });
111
112 Ok(Self {
113 current_swapable: swappable_data,
114 plan_cache,
115 validate_cache,
116 normalize_cache,
117 })
118 }
119
120 fn build_data(
121 router_config: Arc<HiveRouterConfig>,
122 supergraph_sdl: &str,
123 ) -> Result<SupergraphData, SupergraphManagerError> {
124 let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
125 let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
126 let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
127 let metadata = planner.consumer_schema.schema_metadata();
128 let authorization = AuthorizationMetadata::build(&planner.supergraph, &metadata)?;
129 let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
130 supergraph_state.subgraph_endpoint_map,
131 router_config,
132 )?;
133
134 Ok(SupergraphData {
135 metadata,
136 planner,
137 authorization,
138 subgraph_executor_map,
139 })
140 }
141}
142
143pub struct SupergraphBackgroundLoader {
144 loader: Box<dyn SupergraphLoader + Send + Sync>,
145 sender: Arc<mpsc::Sender<String>>,
146}
147
148impl SupergraphBackgroundLoader {
149 pub fn new(
150 config: &SupergraphSource,
151 sender: mpsc::Sender<String>,
152 ) -> Result<Self, LoadSupergraphError> {
153 let loader = resolve_from_config(config)?;
154
155 Ok(Self {
156 loader,
157 sender: Arc::new(sender),
158 })
159 }
160}
161
162#[async_trait]
163impl BackgroundTask for SupergraphBackgroundLoader {
164 fn id(&self) -> &str {
165 "supergraph-background-loader"
166 }
167
168 async fn run(&self, token: CancellationToken) {
169 loop {
170 if token.is_cancelled() {
171 trace!("Background task cancelled");
172
173 break;
174 }
175
176 match self.loader.load().await {
177 Ok(ReloadSupergraphResult::Unchanged) => {
178 debug!("Supergraph fetched successfully with no changes");
179 }
180 Ok(ReloadSupergraphResult::Changed { new_sdl }) => {
181 debug!("Supergraph loaded successfully with changes, updating...");
182
183 if self.sender.clone().send(new_sdl).await.is_err() {
184 error!("Failed to send new supergraph SDL: receiver dropped.");
185 break;
186 }
187 }
188 Err(err) => {
189 error!("Failed to load supergraph: {}", err);
190 }
191 }
192
193 if let Some(interval) = self.loader.reload_interval() {
194 debug!(
195 "waiting for {:?}ms before checking again for supergraph changes",
196 interval.as_millis()
197 );
198
199 ntex::time::sleep(*interval).await;
200 } else {
201 debug!("poll interval not configured for supergraph changes, breaking");
202
203 break;
204 }
205 }
206 }
207}