grafbase_sdk/test/
request.rs

1use std::{
2    borrow::Cow,
3    ops::{Deref, DerefMut},
4};
5
6use bytes::Bytes;
7use futures_util::{StreamExt as _, stream::BoxStream};
8use http::HeaderValue;
9use http_body_util::BodyExt as _;
10use serde::de::DeserializeOwned;
11
12/// Represents a GraphQL request.
13pub struct GraphqlRequest {
14    pub(super) builder: reqwest::RequestBuilder,
15    pub(super) body: Body,
16}
17
18impl GraphqlRequest {
19    /// Add a header to the request.
20    pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
21    where
22        Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
23        Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
24    {
25        self.builder = self.builder.header(name.try_into().unwrap(), value.try_into().unwrap());
26        self
27    }
28
29    /// Add a set of Headers to the existing ones on this Request.
30    ///
31    /// The headers will be merged in to any already set.
32    pub fn headers(mut self, headers: http::HeaderMap) -> Self {
33        self.builder = self.builder.headers(headers);
34        self
35    }
36
37    /// Add the GraphQL variables to the request.
38    pub fn variables(mut self, variables: impl serde::Serialize) -> Self {
39        self.body.variables = Some(serde_json::to_value(variables).expect("variables to be serializable"));
40        self
41    }
42
43    /// Send the GraphQL request to the gateway
44    pub async fn send(self) -> GraphqlResponse {
45        let response = self
46            .builder
47            .header(http::header::ACCEPT, "application/json")
48            .json(&self.body)
49            .send()
50            .await
51            .expect("Request suceeded");
52        let (parts, body) = http::Response::from(response).into_parts();
53        let bytes = body.collect().await.expect("Could retrieve response body").to_bytes();
54        http::Response::from_parts(parts, bytes).try_into().unwrap()
55    }
56
57    /// Send the GraphQL request to the gateway and return a streaming response through a
58    /// websocket.
59    pub async fn ws_stream(self) -> GraphqlStreamingResponse {
60        use async_tungstenite::tungstenite::client::IntoClientRequest as _;
61        use futures_util::StreamExt;
62
63        let mut req = self.builder.build().expect("Valid request");
64        req.url_mut().set_scheme("ws").expect("Valid URL scheme");
65        req.url_mut().set_path("/ws");
66        let (parts, _) = http::Request::try_from(req).expect("Valid HTTP request").into_parts();
67
68        let mut request = parts.uri.into_client_request().unwrap();
69
70        request.headers_mut().extend(parts.headers);
71        request.headers_mut().insert(
72            http::header::SEC_WEBSOCKET_PROTOCOL,
73            HeaderValue::from_str("graphql-transport-ws").unwrap(),
74        );
75
76        let (connection, response) = async_tungstenite::tokio::connect_async(request)
77            .await
78            .expect("Request suceeded");
79        let (parts, _) = response.into_parts();
80
81        let (client, actor) = graphql_ws_client::Client::build(connection)
82            .await
83            .expect("Client build succeeded");
84
85        tokio::spawn(actor.into_future());
86
87        let stream: BoxStream<'_, _> = Box::pin(
88            client
89                .subscribe(self.body)
90                .await
91                .expect("Subscription succeeded")
92                .map(move |item| item.unwrap()),
93        );
94
95        GraphqlStreamingResponse {
96            status: parts.status,
97            headers: parts.headers,
98            stream,
99        }
100    }
101}
102
103/// Represents the body of a GraphQL request.
104#[derive(serde::Serialize)]
105pub struct Body {
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub(super) query: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub(super) variables: Option<serde_json::Value>,
110}
111
112impl<'a> From<&'a str> for Body {
113    fn from(value: &'a str) -> Self {
114        value.to_string().into()
115    }
116}
117
118impl<'a> From<&'a String> for Body {
119    fn from(value: &'a String) -> Self {
120        value.clone().into()
121    }
122}
123
124impl From<String> for Body {
125    fn from(query: String) -> Self {
126        Body {
127            query: Some(query),
128            variables: None,
129        }
130    }
131}
132
133/// Represents a GraphQL response.
134#[derive(serde::Serialize, Debug, serde::Deserialize)]
135pub struct GraphqlResponse {
136    /// The HTTP status code of the response.
137    #[serde(skip)]
138    status: http::StatusCode,
139    /// The HTTP headers of the response.
140    #[serde(skip)]
141    headers: http::HeaderMap,
142    /// The body of the response, which contains the GraphQL data.
143    #[serde(flatten)]
144    body: serde_json::Value,
145}
146
147impl TryFrom<http::Response<Bytes>> for GraphqlResponse {
148    type Error = serde_json::Error;
149
150    fn try_from(response: http::Response<Bytes>) -> Result<Self, Self::Error> {
151        let (parts, body) = response.into_parts();
152
153        Ok(GraphqlResponse {
154            status: parts.status,
155            body: serde_json::from_slice(body.as_ref())
156                .unwrap_or_else(|err| serde_json::Value::String(format!("Could not deserialize JSON data: {err}"))),
157            headers: parts.headers,
158        })
159    }
160}
161
162impl std::fmt::Display for GraphqlResponse {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(f, "{}", serde_json::to_string_pretty(&self.body).unwrap())
165    }
166}
167
168impl Deref for GraphqlResponse {
169    type Target = serde_json::Value;
170
171    fn deref(&self) -> &Self::Target {
172        &self.body
173    }
174}
175
176impl DerefMut for GraphqlResponse {
177    fn deref_mut(&mut self) -> &mut Self::Target {
178        &mut self.body
179    }
180}
181
182impl GraphqlResponse {
183    /// Returns the HTTP status code of the response.
184    pub fn status(&self) -> http::StatusCode {
185        self.status
186    }
187
188    /// Returns the HTTP headers of the response.
189    pub fn headers(&self) -> &http::HeaderMap {
190        &self.headers
191    }
192
193    /// Consumes the response and returns the body as a JSON value.
194    pub fn into_body(self) -> serde_json::Value {
195        self.body
196    }
197
198    /// Deserializes the response body
199    pub fn deserialize<T: DeserializeOwned>(self) -> anyhow::Result<T> {
200        serde_json::from_value(self.body).map_err(Into::into)
201    }
202
203    /// Extracts the `data` field from the response body, if it exists.
204    #[track_caller]
205    pub fn into_data(self) -> serde_json::Value {
206        assert!(self.errors().is_empty(), "{self:#?}");
207
208        match self.body {
209            serde_json::Value::Object(mut value) => value.remove("data"),
210            _ => None,
211        }
212        .unwrap_or_default()
213    }
214
215    /// Returns the `errors` field from the response body, if it exists.
216    pub fn errors(&self) -> Cow<'_, Vec<serde_json::Value>> {
217        self.body["errors"]
218            .as_array()
219            .map(Cow::Borrowed)
220            .unwrap_or_else(|| Cow::Owned(Vec::new()))
221    }
222}
223
224/// Represents a GraphQL subscription response.
225pub struct GraphqlStreamingResponse {
226    /// The HTTP status code of the response.
227    status: http::StatusCode,
228    /// The HTTP headers of the response.
229    headers: http::HeaderMap,
230    /// The stream of messages from the subscription.
231    stream: BoxStream<'static, serde_json::Value>,
232}
233
234impl std::ops::Deref for GraphqlStreamingResponse {
235    type Target = BoxStream<'static, serde_json::Value>;
236    fn deref(&self) -> &Self::Target {
237        &self.stream
238    }
239}
240
241impl std::ops::DerefMut for GraphqlStreamingResponse {
242    fn deref_mut(&mut self) -> &mut Self::Target {
243        &mut self.stream
244    }
245}
246
247impl GraphqlStreamingResponse {
248    /// Returns the HTTP status code of the response.
249    pub fn status(&self) -> http::StatusCode {
250        self.status
251    }
252
253    /// Returns the HTTP headers of the response.
254    pub fn headers(&self) -> &http::HeaderMap {
255        &self.headers
256    }
257
258    /// Consumes the response and returns the underlying stream.
259    pub fn into_stream(self) -> BoxStream<'static, serde_json::Value> {
260        self.stream
261    }
262
263    /// Consumes the response and returns the first `n` messages.
264    pub async fn take(self, n: usize) -> GraphqlCollectedStreamingResponse {
265        let messages = self.stream.take(n).collect().await;
266        GraphqlCollectedStreamingResponse {
267            status: self.status,
268            headers: self.headers,
269            messages,
270        }
271    }
272
273    /// Collect all messages from the subscription stream.
274    pub async fn collect(self) -> GraphqlCollectedStreamingResponse {
275        let messages = self.stream.collect().await;
276        GraphqlCollectedStreamingResponse {
277            status: self.status,
278            headers: self.headers,
279            messages,
280        }
281    }
282}
283
284/// Represents a collected GraphQL subscription response.
285#[derive(Debug)]
286pub struct GraphqlCollectedStreamingResponse {
287    /// The HTTP status code of the response.
288    status: http::StatusCode,
289    /// The HTTP headers of the response.
290    headers: http::HeaderMap,
291    /// The collected messages from the subscription.
292    messages: Vec<serde_json::Value>,
293}
294
295impl GraphqlCollectedStreamingResponse {
296    /// Returns the HTTP status code of the response.
297    pub fn status(&self) -> http::StatusCode {
298        self.status
299    }
300    /// Returns the HTTP headers of the response.
301    pub fn headers(&self) -> &http::HeaderMap {
302        &self.headers
303    }
304    /// Returns the collected messages from the subscription.
305    pub fn messages(&self) -> &Vec<serde_json::Value> {
306        &self.messages
307    }
308    /// Consumes the response and returns the collected messages.
309    pub fn into_messages(self) -> Vec<serde_json::Value> {
310        self.messages
311    }
312}
313
314impl graphql_ws_client::graphql::GraphqlOperation for Body {
315    type Response = serde_json::Value;
316    type Error = serde_json::Error;
317
318    fn decode(&self, data: serde_json::Value) -> Result<Self::Response, Self::Error> {
319        Ok(data)
320    }
321}
322
323impl serde::Serialize for GraphqlCollectedStreamingResponse {
324    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
325    where
326        S: serde::Serializer,
327    {
328        self.messages.serialize(serializer)
329    }
330}
331
332pub struct IntrospectionRequest(pub(super) GraphqlRequest);
333
334impl IntrospectionRequest {
335    /// Add a header to the request.
336    pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
337    where
338        Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
339        Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
340    {
341        self.0 = self.0.header(name, value);
342        self
343    }
344
345    /// Add a set of Headers to the existing ones on this Request.
346    ///
347    /// The headers will be merged in to any already set.
348    pub fn headers(mut self, headers: http::HeaderMap) -> Self {
349        self.0 = self.0.headers(headers);
350        self
351    }
352
353    /// Send the GraphQL request to the gateway
354    pub async fn send(self) -> String {
355        let response = self.0.send().await;
356        serde_json::from_value::<cynic_introspection::IntrospectionQuery>(response.into_data())
357            .expect("valid response")
358            .into_schema()
359            .expect("valid schema")
360            .to_sdl()
361    }
362}