hive_router/
schema_state.rs

1use arc_swap::{ArcSwap, Guard};
2use async_trait::async_trait;
3use graphql_tools::validation::utils::ValidationError;
4use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
5use hive_router_plan_executor::{
6    executors::error::SubgraphExecutorError,
7    introspection::schema::{SchemaMetadata, SchemaWithMetadata},
8    SubgraphExecutorMap,
9};
10use hive_router_query_planner::planner::plan_nodes::QueryPlan;
11use hive_router_query_planner::{
12    planner::{Planner, PlannerError},
13    state::supergraph_state::SupergraphState,
14    utils::parsing::parse_schema,
15};
16use moka::future::Cache;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, error, trace};
21
22use crate::{
23    background_tasks::{BackgroundTask, BackgroundTasksManager},
24    pipeline::{
25        authorization::{AuthorizationMetadata, AuthorizationMetadataError},
26        normalize::GraphQLNormalizationPayload,
27    },
28    supergraph::{
29        base::{LoadSupergraphError, ReloadSupergraphResult, SupergraphLoader},
30        resolve_from_config,
31    },
32};
33
34pub struct SchemaState {
35    current_swapable: Arc<ArcSwap<Option<SupergraphData>>>,
36    pub plan_cache: Cache<u64, Arc<QueryPlan>>,
37    pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
38    pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
39}
40
41pub struct SupergraphData {
42    pub metadata: SchemaMetadata,
43    pub planner: Planner,
44    pub authorization: AuthorizationMetadata,
45    pub subgraph_executor_map: SubgraphExecutorMap,
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum SupergraphManagerError {
50    #[error("Failed to load supergraph: {0}")]
51    LoadSupergraphError(#[from] LoadSupergraphError),
52
53    #[error("Failed to build planner: {0}")]
54    PlannerBuilderError(#[from] PlannerError),
55    #[error("Failed to build authorization: {0}")]
56    AuthorizationMetadataError(#[from] AuthorizationMetadataError),
57    #[error(transparent)]
58    ExecutorInitError(#[from] SubgraphExecutorError),
59    #[error("Unexpected: failed to load initial supergraph")]
60    FailedToLoadInitialSupergraph,
61}
62
63impl SchemaState {
64    pub fn current_supergraph(&self) -> Guard<Arc<Option<SupergraphData>>> {
65        self.current_swapable.load()
66    }
67
68    pub fn is_ready(&self) -> bool {
69        self.current_supergraph().is_some()
70    }
71
72    pub async fn new_from_config(
73        bg_tasks_manager: &mut BackgroundTasksManager,
74        router_config: Arc<HiveRouterConfig>,
75    ) -> Result<Self, SupergraphManagerError> {
76        let (tx, mut rx) = mpsc::channel::<String>(1);
77        let background_loader = SupergraphBackgroundLoader::new(&router_config.supergraph, tx)?;
78        bg_tasks_manager.register_task(Arc::new(background_loader));
79
80        let swappable_data = Arc::new(ArcSwap::from(Arc::new(None)));
81        let swappable_data_spawn_clone = swappable_data.clone();
82        let plan_cache = Cache::new(1000);
83        let validate_cache = Cache::new(1000);
84        let normalize_cache = Cache::new(1000);
85
86        // This is cheap clone, as Cache is thread-safe and can be cloned without any performance penalty.
87        let task_plan_cache = plan_cache.clone();
88        let validate_cache_cache = validate_cache.clone();
89        let normalize_cache_cache = normalize_cache.clone();
90
91        bg_tasks_manager.register_handle(async move {
92            while let Some(new_sdl) = rx.recv().await {
93                debug!("Received new supergraph SDL, building new supergraph state...");
94
95                match Self::build_data(router_config.clone(), &new_sdl) {
96                    Ok(new_data) => {
97                        swappable_data_spawn_clone.store(Arc::new(Some(new_data)));
98                        debug!("Supergraph updated successfully");
99
100                        task_plan_cache.invalidate_all();
101                        validate_cache_cache.invalidate_all();
102                        normalize_cache_cache.invalidate_all();
103                        debug!("Schema-associated caches cleared successfully");
104                    }
105                    Err(e) => {
106                        error!("Failed to build new supergraph data: {}", e);
107                    }
108                }
109            }
110        });
111
112        Ok(Self {
113            current_swapable: swappable_data,
114            plan_cache,
115            validate_cache,
116            normalize_cache,
117        })
118    }
119
120    fn build_data(
121        router_config: Arc<HiveRouterConfig>,
122        supergraph_sdl: &str,
123    ) -> Result<SupergraphData, SupergraphManagerError> {
124        let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
125        let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
126        let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
127        let metadata = planner.consumer_schema.schema_metadata();
128        let authorization = AuthorizationMetadata::build(&planner.supergraph, &metadata)?;
129        let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
130            supergraph_state.subgraph_endpoint_map,
131            router_config,
132        )?;
133
134        Ok(SupergraphData {
135            metadata,
136            planner,
137            authorization,
138            subgraph_executor_map,
139        })
140    }
141}
142
143pub struct SupergraphBackgroundLoader {
144    loader: Box<dyn SupergraphLoader + Send + Sync>,
145    sender: Arc<mpsc::Sender<String>>,
146}
147
148impl SupergraphBackgroundLoader {
149    pub fn new(
150        config: &SupergraphSource,
151        sender: mpsc::Sender<String>,
152    ) -> Result<Self, LoadSupergraphError> {
153        let loader = resolve_from_config(config)?;
154
155        Ok(Self {
156            loader,
157            sender: Arc::new(sender),
158        })
159    }
160}
161
162#[async_trait]
163impl BackgroundTask for SupergraphBackgroundLoader {
164    fn id(&self) -> &str {
165        "supergraph-background-loader"
166    }
167
168    async fn run(&self, token: CancellationToken) {
169        loop {
170            if token.is_cancelled() {
171                trace!("Background task cancelled");
172
173                break;
174            }
175
176            match self.loader.load().await {
177                Ok(ReloadSupergraphResult::Unchanged) => {
178                    debug!("Supergraph fetched successfully with no changes");
179                }
180                Ok(ReloadSupergraphResult::Changed { new_sdl }) => {
181                    debug!("Supergraph loaded successfully with changes, updating...");
182
183                    if self.sender.clone().send(new_sdl).await.is_err() {
184                        error!("Failed to send new supergraph SDL: receiver dropped.");
185                        break;
186                    }
187                }
188                Err(err) => {
189                    error!("Failed to load supergraph: {}", err);
190                }
191            }
192
193            if let Some(interval) = self.loader.reload_interval() {
194                debug!(
195                    "waiting for {:?}ms before checking again for supergraph changes",
196                    interval.as_millis()
197                );
198
199                ntex::time::sleep(*interval).await;
200            } else {
201                debug!("poll interval not configured for supergraph changes, breaking");
202
203                break;
204            }
205        }
206    }
207}