graphgate_handler/
shared_route_table.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Error, Result};
4use graphgate_planner::{PlanBuilder, Request, Response, ServerError};
5use graphgate_schema::ComposedSchema;
6use opentelemetry::trace::{TraceContextExt, Tracer};
7use opentelemetry::{global, Context as OpenTelemetryContext};
8use serde::Deserialize;
9use tokio::sync::{mpsc, RwLock};
10use tokio::time::{Duration, Instant};
11use value::ConstValue;
12use warp::http::{HeaderMap, Response as HttpResponse, StatusCode};
13
14use crate::executor::Executor;
15use crate::fetcher::HttpFetcher;
16use crate::service_route::ServiceRouteTable;
17
18enum Command {
19    Change(ServiceRouteTable),
20}
21
22struct Inner {
23    schema: Option<Arc<ComposedSchema>>,
24    route_table: Option<Arc<ServiceRouteTable>>,
25}
26
27#[derive(Clone)]
28pub struct SharedRouteTable {
29    inner: Arc<RwLock<Inner>>,
30    tx: mpsc::UnboundedSender<Command>,
31}
32
33impl Default for SharedRouteTable {
34    fn default() -> Self {
35        let (tx, rx) = mpsc::unbounded_channel();
36        let shared_route_table = Self {
37            inner: Arc::new(RwLock::new(Inner {
38                schema: None,
39                route_table: None,
40            })),
41            tx,
42        };
43        tokio::spawn({
44            let shared_route_table = shared_route_table.clone();
45            async move { shared_route_table.update_loop(rx).await }
46        });
47        shared_route_table
48    }
49}
50
51impl SharedRouteTable {
52    async fn update_loop(self, mut rx: mpsc::UnboundedReceiver<Command>) {
53        let mut update_interval = tokio::time::interval_at(
54            Instant::now() + Duration::from_secs(3),
55            Duration::from_secs(30),
56        );
57
58        loop {
59            tokio::select! {
60                _ = update_interval.tick() => {
61                    if let Err(err) = self.update().await {
62                        tracing::error!(error = %err, "Failed to update schema.");
63                    }
64                }
65                command = rx.recv() => {
66                    if let Some(command) = command {
67                        match command {
68                            Command::Change(route_table) => {
69                                let mut inner = self.inner.write().await;
70                                inner.route_table = Some(Arc::new(route_table));
71                                inner.schema = None;
72                            }
73                        }
74                    }
75                }
76            }
77        }
78    }
79
80    async fn update(&self) -> Result<()> {
81        const QUERY_SDL: &str = "{ _service { sdl }}";
82
83        #[derive(Deserialize)]
84        struct ResponseQuery {
85            #[serde(rename = "_service")]
86            service: ResponseService,
87        }
88
89        #[derive(Deserialize)]
90        struct ResponseService {
91            sdl: String,
92        }
93
94        let route_table = match self.inner.read().await.route_table.clone() {
95            Some(route_table) => route_table,
96            None => return Ok(()),
97        };
98
99        let resp = futures_util::future::try_join_all(route_table.keys().map(|service| {
100            let route_table = route_table.clone();
101            async move {
102                let resp = route_table
103                    .query(service, Request::new(QUERY_SDL), None)
104                    .await
105                    .with_context(|| format!("Failed to fetch SDL from '{}'.", service))?;
106                let resp: ResponseQuery =
107                    value::from_value(resp.data).context("Failed to parse response.")?;
108                let document = parser::parse_schema(resp.service.sdl)
109                    .with_context(|| format!("Invalid SDL from '{}'.", service))?;
110                Ok::<_, Error>((service.to_string(), document))
111            }
112        }))
113        .await?;
114
115        let schema = ComposedSchema::combine(resp)?;
116        self.inner.write().await.schema = Some(Arc::new(schema));
117        Ok(())
118    }
119
120    pub fn set_route_table(&self, route_table: ServiceRouteTable) {
121        self.tx.send(Command::Change(route_table)).ok();
122    }
123
124    pub async fn get(&self) -> Option<(Arc<ComposedSchema>, Arc<ServiceRouteTable>)> {
125        let (composed_schema, route_table) = {
126            let inner = self.inner.read().await;
127            (inner.schema.clone(), inner.route_table.clone())
128        };
129        composed_schema.zip(route_table)
130    }
131
132    pub async fn query(&self, request: Request, header_map: HeaderMap) -> HttpResponse<String> {
133        let tracer = global::tracer("graphql");
134
135        let document = match tracer.in_span("parse", |_| parser::parse_query(&request.query)) {
136            Ok(document) => document,
137            Err(err) => {
138                return HttpResponse::builder()
139                    .status(StatusCode::BAD_REQUEST)
140                    .body(err.to_string())
141                    .unwrap();
142            }
143        };
144
145        let (composed_schema, route_table) = match self.get().await {
146            Some((composed_schema, route_table)) => (composed_schema, route_table),
147            _ => {
148                return HttpResponse::builder()
149                    .status(StatusCode::BAD_REQUEST)
150                    .body(
151                        serde_json::to_string(&Response {
152                            data: ConstValue::Null,
153                            errors: vec![ServerError::new("Not ready.")],
154                            extensions: Default::default(),
155                        })
156                        .unwrap(),
157                    )
158                    .unwrap();
159            }
160        };
161
162        let mut plan_builder =
163            PlanBuilder::new(&composed_schema, document).variables(request.variables);
164        if let Some(operation) = request.operation {
165            plan_builder = plan_builder.operation_name(operation);
166        }
167
168        let plan = match tracer.in_span("plan", |_| plan_builder.plan()) {
169            Ok(plan) => plan,
170            Err(response) => {
171                return HttpResponse::builder()
172                    .status(StatusCode::OK)
173                    .body(serde_json::to_string(&response).unwrap())
174                    .unwrap();
175            }
176        };
177
178        let executor = Executor::new(&composed_schema);
179        let resp = opentelemetry::trace::FutureExt::with_context(
180            executor.execute_query(&HttpFetcher::new(&*route_table, &header_map), &plan),
181            OpenTelemetryContext::current_with_span(tracer.span_builder("execute").start(&tracer)),
182        )
183        .await;
184        HttpResponse::builder()
185            .status(StatusCode::OK)
186            .body(serde_json::to_string(&resp).unwrap())
187            .unwrap()
188    }
189}