grafbase_sdk/test/
request.rs1use std::{
2 borrow::Cow,
3 ops::{Deref, DerefMut},
4};
5
6use bytes::Bytes;
7use futures_util::{StreamExt as _, stream::BoxStream};
8use http::HeaderValue;
9use http_body_util::BodyExt as _;
10use serde::de::DeserializeOwned;
11
12pub struct GraphqlRequest {
14 pub(super) builder: reqwest::RequestBuilder,
15 pub(super) body: Body,
16}
17
18impl GraphqlRequest {
19 pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
21 where
22 Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
23 Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
24 {
25 self.builder = self.builder.header(name.try_into().unwrap(), value.try_into().unwrap());
26 self
27 }
28
29 pub fn headers(mut self, headers: http::HeaderMap) -> Self {
33 self.builder = self.builder.headers(headers);
34 self
35 }
36
37 pub fn variables(mut self, variables: impl serde::Serialize) -> Self {
39 self.body.variables = Some(serde_json::to_value(variables).expect("variables to be serializable"));
40 self
41 }
42
43 pub async fn send(self) -> GraphqlResponse {
45 let response = self
46 .builder
47 .header(http::header::ACCEPT, "application/json")
48 .json(&self.body)
49 .send()
50 .await
51 .expect("Request suceeded");
52 let (parts, body) = http::Response::from(response).into_parts();
53 let bytes = body.collect().await.expect("Could retrieve response body").to_bytes();
54 http::Response::from_parts(parts, bytes).try_into().unwrap()
55 }
56
57 pub async fn ws_stream(self) -> GraphqlStreamingResponse {
60 use async_tungstenite::tungstenite::client::IntoClientRequest as _;
61 use futures_util::StreamExt;
62
63 let mut req = self.builder.build().expect("Valid request");
64 req.url_mut().set_scheme("ws").expect("Valid URL scheme");
65 req.url_mut().set_path("/ws");
66 let (parts, _) = http::Request::try_from(req).expect("Valid HTTP request").into_parts();
67
68 let mut request = parts.uri.into_client_request().unwrap();
69
70 request.headers_mut().extend(parts.headers);
71 request.headers_mut().insert(
72 http::header::SEC_WEBSOCKET_PROTOCOL,
73 HeaderValue::from_str("graphql-transport-ws").unwrap(),
74 );
75
76 let (connection, response) = async_tungstenite::tokio::connect_async(request)
77 .await
78 .expect("Request suceeded");
79 let (parts, _) = response.into_parts();
80
81 let (client, actor) = graphql_ws_client::Client::build(connection)
82 .await
83 .expect("Client build succeeded");
84
85 tokio::spawn(actor.into_future());
86
87 let stream: BoxStream<'_, _> = Box::pin(
88 client
89 .subscribe(self.body)
90 .await
91 .expect("Subscription succeeded")
92 .map(move |item| item.unwrap()),
93 );
94
95 GraphqlStreamingResponse {
96 status: parts.status,
97 headers: parts.headers,
98 stream,
99 }
100 }
101}
102
103#[derive(serde::Serialize)]
105pub struct Body {
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub(super) query: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub(super) variables: Option<serde_json::Value>,
110}
111
112impl<'a> From<&'a str> for Body {
113 fn from(value: &'a str) -> Self {
114 value.to_string().into()
115 }
116}
117
118impl<'a> From<&'a String> for Body {
119 fn from(value: &'a String) -> Self {
120 value.clone().into()
121 }
122}
123
124impl From<String> for Body {
125 fn from(query: String) -> Self {
126 Body {
127 query: Some(query),
128 variables: None,
129 }
130 }
131}
132
133#[derive(serde::Serialize, Debug, serde::Deserialize)]
135pub struct GraphqlResponse {
136 #[serde(skip)]
138 status: http::StatusCode,
139 #[serde(skip)]
141 headers: http::HeaderMap,
142 #[serde(flatten)]
144 body: serde_json::Value,
145}
146
147impl TryFrom<http::Response<Bytes>> for GraphqlResponse {
148 type Error = serde_json::Error;
149
150 fn try_from(response: http::Response<Bytes>) -> Result<Self, Self::Error> {
151 let (parts, body) = response.into_parts();
152
153 Ok(GraphqlResponse {
154 status: parts.status,
155 body: serde_json::from_slice(body.as_ref())
156 .unwrap_or_else(|err| serde_json::Value::String(format!("Could not deserialize JSON data: {err}"))),
157 headers: parts.headers,
158 })
159 }
160}
161
162impl std::fmt::Display for GraphqlResponse {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "{}", serde_json::to_string_pretty(&self.body).unwrap())
165 }
166}
167
168impl Deref for GraphqlResponse {
169 type Target = serde_json::Value;
170
171 fn deref(&self) -> &Self::Target {
172 &self.body
173 }
174}
175
176impl DerefMut for GraphqlResponse {
177 fn deref_mut(&mut self) -> &mut Self::Target {
178 &mut self.body
179 }
180}
181
182impl GraphqlResponse {
183 pub fn status(&self) -> http::StatusCode {
185 self.status
186 }
187
188 pub fn headers(&self) -> &http::HeaderMap {
190 &self.headers
191 }
192
193 pub fn into_body(self) -> serde_json::Value {
195 self.body
196 }
197
198 pub fn deserialize<T: DeserializeOwned>(self) -> anyhow::Result<T> {
200 serde_json::from_value(self.body).map_err(Into::into)
201 }
202
203 #[track_caller]
205 pub fn into_data(self) -> serde_json::Value {
206 assert!(self.errors().is_empty(), "{self:#?}");
207
208 match self.body {
209 serde_json::Value::Object(mut value) => value.remove("data"),
210 _ => None,
211 }
212 .unwrap_or_default()
213 }
214
215 pub fn errors(&self) -> Cow<'_, Vec<serde_json::Value>> {
217 self.body["errors"]
218 .as_array()
219 .map(Cow::Borrowed)
220 .unwrap_or_else(|| Cow::Owned(Vec::new()))
221 }
222}
223
224pub struct GraphqlStreamingResponse {
226 status: http::StatusCode,
228 headers: http::HeaderMap,
230 stream: BoxStream<'static, serde_json::Value>,
232}
233
234impl std::ops::Deref for GraphqlStreamingResponse {
235 type Target = BoxStream<'static, serde_json::Value>;
236 fn deref(&self) -> &Self::Target {
237 &self.stream
238 }
239}
240
241impl std::ops::DerefMut for GraphqlStreamingResponse {
242 fn deref_mut(&mut self) -> &mut Self::Target {
243 &mut self.stream
244 }
245}
246
247impl GraphqlStreamingResponse {
248 pub fn status(&self) -> http::StatusCode {
250 self.status
251 }
252
253 pub fn headers(&self) -> &http::HeaderMap {
255 &self.headers
256 }
257
258 pub fn into_stream(self) -> BoxStream<'static, serde_json::Value> {
260 self.stream
261 }
262
263 pub async fn take(self, n: usize) -> GraphqlCollectedStreamingResponse {
265 let messages = self.stream.take(n).collect().await;
266 GraphqlCollectedStreamingResponse {
267 status: self.status,
268 headers: self.headers,
269 messages,
270 }
271 }
272
273 pub async fn collect(self) -> GraphqlCollectedStreamingResponse {
275 let messages = self.stream.collect().await;
276 GraphqlCollectedStreamingResponse {
277 status: self.status,
278 headers: self.headers,
279 messages,
280 }
281 }
282}
283
284#[derive(Debug)]
286pub struct GraphqlCollectedStreamingResponse {
287 status: http::StatusCode,
289 headers: http::HeaderMap,
291 messages: Vec<serde_json::Value>,
293}
294
295impl GraphqlCollectedStreamingResponse {
296 pub fn status(&self) -> http::StatusCode {
298 self.status
299 }
300 pub fn headers(&self) -> &http::HeaderMap {
302 &self.headers
303 }
304 pub fn messages(&self) -> &Vec<serde_json::Value> {
306 &self.messages
307 }
308 pub fn into_messages(self) -> Vec<serde_json::Value> {
310 self.messages
311 }
312}
313
314impl graphql_ws_client::graphql::GraphqlOperation for Body {
315 type Response = serde_json::Value;
316 type Error = serde_json::Error;
317
318 fn decode(&self, data: serde_json::Value) -> Result<Self::Response, Self::Error> {
319 Ok(data)
320 }
321}
322
323impl serde::Serialize for GraphqlCollectedStreamingResponse {
324 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
325 where
326 S: serde::Serializer,
327 {
328 self.messages.serialize(serializer)
329 }
330}
331
332pub struct IntrospectionRequest(pub(super) GraphqlRequest);
333
334impl IntrospectionRequest {
335 pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
337 where
338 Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
339 Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
340 {
341 self.0 = self.0.header(name, value);
342 self
343 }
344
345 pub fn headers(mut self, headers: http::HeaderMap) -> Self {
349 self.0 = self.0.headers(headers);
350 self
351 }
352
353 pub async fn send(self) -> String {
355 let response = self.0.send().await;
356 serde_json::from_value::<cynic_introspection::IntrospectionQuery>(response.into_data())
357 .expect("valid response")
358 .into_schema()
359 .expect("valid schema")
360 .to_sdl()
361 }
362}