use std::{
borrow::Cow,
ops::{Deref, DerefMut},
};
use bytes::Bytes;
use futures_util::{StreamExt as _, stream::BoxStream};
use http::HeaderValue;
use http_body_util::BodyExt as _;
use serde::de::DeserializeOwned;
pub struct GraphqlRequest {
pub(super) builder: reqwest::RequestBuilder,
pub(super) body: Body,
}
impl GraphqlRequest {
pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
where
Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
{
self.builder = self.builder.header(name.try_into().unwrap(), value.try_into().unwrap());
self
}
pub fn headers(mut self, headers: http::HeaderMap) -> Self {
self.builder = self.builder.headers(headers);
self
}
pub fn variables(mut self, variables: impl serde::Serialize) -> Self {
self.body.variables = Some(serde_json::to_value(variables).expect("variables to be serializable"));
self
}
pub async fn send(self) -> GraphqlResponse {
let response = self
.builder
.header(http::header::ACCEPT, "application/json")
.json(&self.body)
.send()
.await
.expect("Request suceeded");
let (parts, body) = http::Response::from(response).into_parts();
let bytes = body.collect().await.expect("Could retrieve response body").to_bytes();
http::Response::from_parts(parts, bytes).try_into().unwrap()
}
pub async fn ws_stream(self) -> GraphqlStreamingResponse {
use async_tungstenite::tungstenite::client::IntoClientRequest as _;
use futures_util::StreamExt;
let mut req = self.builder.build().expect("Valid request");
req.url_mut().set_scheme("ws").expect("Valid URL scheme");
req.url_mut().set_path("/ws");
let (parts, _) = http::Request::try_from(req).expect("Valid HTTP request").into_parts();
let mut request = parts.uri.into_client_request().unwrap();
request.headers_mut().extend(parts.headers);
request.headers_mut().insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str("graphql-transport-ws").unwrap(),
);
let (connection, response) = async_tungstenite::tokio::connect_async(request)
.await
.expect("Request suceeded");
let (parts, _) = response.into_parts();
let (client, actor) = graphql_ws_client::Client::build(connection)
.await
.expect("Client build succeeded");
tokio::spawn(actor.into_future());
let stream: BoxStream<'_, _> = Box::pin(
client
.subscribe(self.body)
.await
.expect("Subscription succeeded")
.map(move |item| item.unwrap()),
);
GraphqlStreamingResponse {
status: parts.status,
headers: parts.headers,
stream,
}
}
}
#[derive(serde::Serialize)]
pub struct Body {
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) variables: Option<serde_json::Value>,
}
impl<'a> From<&'a str> for Body {
fn from(value: &'a str) -> Self {
value.to_string().into()
}
}
impl<'a> From<&'a String> for Body {
fn from(value: &'a String) -> Self {
value.clone().into()
}
}
impl From<String> for Body {
fn from(query: String) -> Self {
Body {
query: Some(query),
variables: None,
}
}
}
#[derive(serde::Serialize, Debug, serde::Deserialize)]
pub struct GraphqlResponse {
#[serde(skip)]
status: http::StatusCode,
#[serde(skip)]
headers: http::HeaderMap,
#[serde(flatten)]
body: serde_json::Value,
}
impl TryFrom<http::Response<Bytes>> for GraphqlResponse {
type Error = serde_json::Error;
fn try_from(response: http::Response<Bytes>) -> Result<Self, Self::Error> {
let (parts, body) = response.into_parts();
Ok(GraphqlResponse {
status: parts.status,
body: serde_json::from_slice(body.as_ref())
.unwrap_or_else(|err| serde_json::Value::String(format!("Could not deserialize JSON data: {err}"))),
headers: parts.headers,
})
}
}
impl std::fmt::Display for GraphqlResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::to_string_pretty(&self.body).unwrap())
}
}
impl Deref for GraphqlResponse {
type Target = serde_json::Value;
fn deref(&self) -> &Self::Target {
&self.body
}
}
impl DerefMut for GraphqlResponse {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.body
}
}
impl GraphqlResponse {
pub fn status(&self) -> http::StatusCode {
self.status
}
pub fn headers(&self) -> &http::HeaderMap {
&self.headers
}
pub fn into_body(self) -> serde_json::Value {
self.body
}
pub fn deserialize<T: DeserializeOwned>(self) -> anyhow::Result<T> {
serde_json::from_value(self.body).map_err(Into::into)
}
#[track_caller]
pub fn into_data(self) -> serde_json::Value {
assert!(self.errors().is_empty(), "{self:#?}");
match self.body {
serde_json::Value::Object(mut value) => value.remove("data"),
_ => None,
}
.unwrap_or_default()
}
pub fn errors(&self) -> Cow<'_, Vec<serde_json::Value>> {
self.body["errors"]
.as_array()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Vec::new()))
}
}
pub struct GraphqlStreamingResponse {
status: http::StatusCode,
headers: http::HeaderMap,
stream: BoxStream<'static, serde_json::Value>,
}
impl std::ops::Deref for GraphqlStreamingResponse {
type Target = BoxStream<'static, serde_json::Value>;
fn deref(&self) -> &Self::Target {
&self.stream
}
}
impl std::ops::DerefMut for GraphqlStreamingResponse {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}
impl GraphqlStreamingResponse {
pub fn status(&self) -> http::StatusCode {
self.status
}
pub fn headers(&self) -> &http::HeaderMap {
&self.headers
}
pub fn into_stream(self) -> BoxStream<'static, serde_json::Value> {
self.stream
}
pub async fn take(self, n: usize) -> GraphqlCollectedStreamingResponse {
let messages = self.stream.take(n).collect().await;
GraphqlCollectedStreamingResponse {
status: self.status,
headers: self.headers,
messages,
}
}
pub async fn collect(self) -> GraphqlCollectedStreamingResponse {
let messages = self.stream.collect().await;
GraphqlCollectedStreamingResponse {
status: self.status,
headers: self.headers,
messages,
}
}
}
#[derive(Debug)]
pub struct GraphqlCollectedStreamingResponse {
status: http::StatusCode,
headers: http::HeaderMap,
messages: Vec<serde_json::Value>,
}
impl GraphqlCollectedStreamingResponse {
pub fn status(&self) -> http::StatusCode {
self.status
}
pub fn headers(&self) -> &http::HeaderMap {
&self.headers
}
pub fn messages(&self) -> &Vec<serde_json::Value> {
&self.messages
}
pub fn into_messages(self) -> Vec<serde_json::Value> {
self.messages
}
}
impl graphql_ws_client::graphql::GraphqlOperation for Body {
type Response = serde_json::Value;
type Error = serde_json::Error;
fn decode(&self, data: serde_json::Value) -> Result<Self::Response, Self::Error> {
Ok(data)
}
}
impl serde::Serialize for GraphqlCollectedStreamingResponse {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.messages.serialize(serializer)
}
}
pub struct IntrospectionRequest(pub(super) GraphqlRequest);
impl IntrospectionRequest {
pub fn header<Name, Value>(mut self, name: Name, value: Value) -> Self
where
Name: TryInto<http::HeaderName, Error: std::fmt::Debug>,
Value: TryInto<http::HeaderValue, Error: std::fmt::Debug>,
{
self.0 = self.0.header(name, value);
self
}
pub fn headers(mut self, headers: http::HeaderMap) -> Self {
self.0 = self.0.headers(headers);
self
}
pub async fn send(self) -> String {
let response = self.0.send().await;
serde_json::from_value::<cynic_introspection::IntrospectionQuery>(response.into_data())
.expect("valid response")
.into_schema()
.expect("valid schema")
.to_sdl()
}
}