Skip to main content

heliosdb_proxy/
graphql_gateway.rs

1//! GraphQL-to-SQL gateway — HTTP listener.
2//!
3//! When `[graphql_gateway] enabled = true`, the proxy exposes an HTTP endpoint
4//! that accepts a GraphQL query (`POST` with `{"query": "..."}`), generates SQL
5//! from the configured schema, executes it over the backend PG-wire client, and
6//! returns a GraphQL JSON response (`{"data": {...}}`). Flat top-level
7//! selections are supported; nested-relationship shaping is a follow-on.
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use serde_json::{json, Value};
13use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15
16use crate::backend::{tls::default_client_config, BackendConfig, TlsMode};
17use crate::config::GraphqlGatewayConfig;
18use crate::graphql::introspector::{ColumnDefinition, TableDefinition};
19use crate::graphql::{GraphQLConfig, GraphQLEngine, GraphQLRequest, SchemaIntrospector};
20use crate::{ProxyError, Result};
21
22pub struct GraphqlGateway {
23    config: Arc<GraphqlGatewayConfig>,
24    engine: Arc<GraphQLEngine>,
25}
26
27impl GraphqlGateway {
28    pub fn new(config: GraphqlGatewayConfig) -> Self {
29        // Build the GraphQL schema from the configured tables.
30        let tabledefs: Vec<TableDefinition> = config
31            .tables
32            .iter()
33            .map(|t| TableDefinition {
34                name: t.name.clone(),
35                schema: "public".to_string(),
36                columns: t
37                    .columns
38                    .iter()
39                    .map(|c| ColumnDefinition {
40                        name: c.clone(),
41                        data_type: "text".to_string(),
42                        nullable: true,
43                        is_primary_key: c == "id",
44                        has_default: false,
45                    })
46                    .collect(),
47                foreign_keys: Vec::new(),
48            })
49            .collect();
50        let schema = SchemaIntrospector::new().build_schema(&tabledefs);
51
52        let bcfg = BackendConfig {
53            host: config.backend_host.clone(),
54            port: config.backend_port,
55            user: config.backend_user.clone(),
56            password: config.backend_password.clone(),
57            database: config.backend_database.clone(),
58            application_name: Some("heliosproxy-graphql".to_string()),
59            tls_mode: TlsMode::Disable,
60            connect_timeout: Duration::from_secs(5),
61            query_timeout: Duration::from_secs(30),
62            tls_config: default_client_config(),
63        };
64        let engine = GraphQLEngine::new(GraphQLConfig::default(), schema).with_backend(bcfg);
65
66        Self {
67            config: Arc::new(config),
68            engine: Arc::new(engine),
69        }
70    }
71
72    pub async fn run(self) -> Result<()> {
73        let listener = TcpListener::bind(&self.config.listen_address)
74            .await
75            .map_err(|e| {
76                ProxyError::Network(format!(
77                    "GraphQL gateway bind {}: {}",
78                    self.config.listen_address, e
79                ))
80            })?;
81        tracing::info!(addr = %self.config.listen_address, "GraphQL gateway listening");
82        let config = self.config.clone();
83        let engine = self.engine.clone();
84        loop {
85            let (stream, peer) = match listener.accept().await {
86                Ok(x) => x,
87                Err(e) => {
88                    tracing::warn!("GraphQL gateway accept error: {}", e);
89                    continue;
90                }
91            };
92            let config = config.clone();
93            let engine = engine.clone();
94            tokio::spawn(async move {
95                if let Err(e) = Self::handle(stream, config, engine).await {
96                    tracing::debug!(%peer, "GraphQL gateway error: {}", e);
97                }
98            });
99        }
100    }
101
102    async fn handle(
103        mut stream: tokio::net::TcpStream,
104        cfg: Arc<GraphqlGatewayConfig>,
105        engine: Arc<GraphQLEngine>,
106    ) -> Result<()> {
107        use tokio::io::AsyncBufReadExt;
108        let (reader, mut writer) = stream.split();
109        let mut reader = BufReader::new(reader);
110        let mut line = String::new();
111        let mut content_length = 0usize;
112        let mut method = String::new();
113        let mut path = String::new();
114        let mut authorized = cfg.auth_token.is_none();
115        let mut first = true;
116        loop {
117            line.clear();
118            let n = reader
119                .read_line(&mut line)
120                .await
121                .map_err(|e| ProxyError::Network(format!("GraphQL gw read: {}", e)))?;
122            if n == 0 || line == "\r\n" {
123                break;
124            }
125            if first {
126                let mut parts = line.split_whitespace();
127                method = parts.next().unwrap_or("").to_string();
128                path = parts.next().unwrap_or("").to_string();
129                first = false;
130                continue;
131            }
132            let lower = line.to_ascii_lowercase();
133            if lower.starts_with("content-length:") {
134                content_length = line
135                    .split(':')
136                    .nth(1)
137                    .and_then(|v| v.trim().parse().ok())
138                    .unwrap_or(0);
139            } else if lower.starts_with("authorization:") {
140                if let Some(tok) = cfg.auth_token.as_ref() {
141                    let v = line.split_once(':').map(|x| x.1).unwrap_or("").trim();
142                    authorized = v == format!("Bearer {}", tok);
143                }
144            }
145        }
146
147        if method == "GET" && (path == "/health" || path == "/") {
148            return Self::respond(&mut writer, 200, &json!({"status":"ok"})).await;
149        }
150        if !authorized {
151            return Self::respond(&mut writer, 401, &json!({"error":"unauthorized"})).await;
152        }
153        if method != "POST" {
154            return Self::respond(
155                &mut writer,
156                405,
157                &json!({"error":"use POST with a GraphQL query"}),
158            )
159            .await;
160        }
161
162        let mut body_buf = vec![0u8; content_length];
163        if content_length > 0 {
164            reader
165                .read_exact(&mut body_buf)
166                .await
167                .map_err(|e| ProxyError::Network(format!("GraphQL gw body: {}", e)))?;
168        }
169        let req: Value = match serde_json::from_slice(&body_buf) {
170            Ok(v) => v,
171            Err(e) => {
172                return Self::respond(
173                    &mut writer,
174                    400,
175                    &json!({"errors":[{"message": format!("invalid JSON: {}", e)}]}),
176                )
177                .await
178            }
179        };
180        let query = req
181            .get("query")
182            .and_then(|q| q.as_str())
183            .unwrap_or("")
184            .trim();
185        if query.is_empty() {
186            return Self::respond(
187                &mut writer,
188                400,
189                &json!({"errors":[{"message":"missing 'query'"}]}),
190            )
191            .await;
192        }
193
194        let response = engine.execute(GraphQLRequest::new(query)).await;
195        let errors = response.errors.map(|errs| {
196            errs.iter()
197                .map(|e| json!({ "message": e.to_string() }))
198                .collect::<Vec<_>>()
199        });
200        let body = json!({ "data": response.data, "errors": errors });
201        Self::respond(&mut writer, 200, &body).await
202    }
203
204    async fn respond<W: AsyncWriteExt + Unpin>(
205        writer: &mut W,
206        status: u16,
207        body: &Value,
208    ) -> Result<()> {
209        let payload = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
210        let reason = match status {
211            200 => "OK",
212            400 => "Bad Request",
213            401 => "Unauthorized",
214            405 => "Method Not Allowed",
215            _ => "OK",
216        };
217        let head = format!(
218            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
219            status,
220            reason,
221            payload.len()
222        );
223        writer
224            .write_all(head.as_bytes())
225            .await
226            .map_err(|e| ProxyError::Network(format!("GraphQL gw write: {}", e)))?;
227        writer
228            .write_all(&payload)
229            .await
230            .map_err(|e| ProxyError::Network(format!("GraphQL gw write: {}", e)))?;
231        let _ = writer.flush().await;
232        Ok(())
233    }
234}