faucet_source_graphql/
stream.rs1use crate::config::{GraphqlAuth, GraphqlStreamConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
7use jsonpath_rust::JsonPath;
8use reqwest::Client;
9use serde_json::{Value, json};
10
11pub struct GraphqlStream {
13 config: GraphqlStreamConfig,
14 client: Client,
15}
16
17impl GraphqlStream {
18 pub fn new(config: GraphqlStreamConfig) -> Self {
20 Self {
21 config,
22 client: Client::new(),
23 }
24 }
25
26 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
28 let mut all_records = Vec::new();
29 let mut cursor: Option<String> = None;
30 let mut pages_fetched = 0usize;
31 let mut prev_cursor: Option<String> = None;
32
33 loop {
34 if let Some(max) = self.config.max_pages
35 && pages_fetched >= max
36 {
37 tracing::warn!("max pages ({max}) reached");
38 break;
39 }
40
41 let body = self.execute_query(&cursor).await?;
42 let records = self.extract_records(&body)?;
43 all_records.extend(records);
44 pages_fetched += 1;
45
46 match &self.config.pagination {
48 Some(pag) => {
49 let has_next = extract_bool(&body, &pag.has_next_page_path).unwrap_or(false);
50 if !has_next {
51 break;
52 }
53 let next_cursor = extract_string(&body, &pag.cursor_path);
54 if next_cursor.is_none() {
55 break;
56 }
57 if next_cursor == prev_cursor {
59 tracing::warn!("cursor loop detected, stopping pagination");
60 break;
61 }
62 prev_cursor = cursor;
63 cursor = next_cursor;
64 }
65 None => break,
66 }
67 }
68
69 tracing::info!(
70 records = all_records.len(),
71 pages = pages_fetched,
72 "GraphQL fetch complete"
73 );
74 Ok(all_records)
75 }
76
77 async fn execute_query(&self, cursor: &Option<String>) -> Result<Value, FaucetError> {
79 let mut variables = self.config.variables.clone();
80
81 if let (Some(pag), Some(cursor_val)) = (&self.config.pagination, cursor)
83 && let Value::Object(ref mut map) = variables
84 {
85 map.insert(pag.cursor_variable.clone(), json!(cursor_val));
86 }
87 if let Some(pag) = &self.config.pagination
88 && let (Some(size), Value::Object(map)) = (pag.page_size, &mut variables)
89 {
90 map.insert(pag.page_size_variable.clone(), json!(size));
91 }
92
93 let payload = json!({
94 "query": self.config.query,
95 "variables": variables,
96 });
97
98 let mut req = self
99 .client
100 .post(&self.config.endpoint)
101 .headers(self.config.headers.clone())
102 .json(&payload);
103
104 match &self.config.auth {
106 GraphqlAuth::None => {}
107 GraphqlAuth::Bearer(token) => {
108 req = req.bearer_auth(token);
109 }
110 GraphqlAuth::Custom(headers) => {
111 req = req.headers(headers.clone());
112 }
113 }
114
115 let resp = req.send().await.map_err(FaucetError::Http)?;
116 let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
117
118 let body: Value = resp.json().await.map_err(FaucetError::Http)?;
119
120 if let Some(errors) = body.get("errors")
122 && let Some(arr) = errors.as_array()
123 && !arr.is_empty()
124 {
125 let msg = arr
126 .iter()
127 .filter_map(|e| e.get("message").and_then(|m| m.as_str()))
128 .collect::<Vec<_>>()
129 .join("; ");
130 return Err(FaucetError::HttpStatus {
131 status: 200,
132 url: self.config.endpoint.clone(),
133 body: format!("GraphQL errors: {msg}"),
134 });
135 }
136
137 Ok(body)
138 }
139
140 fn extract_records(&self, body: &Value) -> Result<Vec<Value>, FaucetError> {
142 match &self.config.records_path {
143 Some(path) => util::extract_records(body, Some(path)),
144 None => {
145 match body.get("data") {
147 Some(data) => Ok(vec![data.clone()]),
148 None => Ok(vec![body.clone()]),
149 }
150 }
151 }
152 }
153}
154
155#[async_trait]
156impl faucet_core::Source for GraphqlStream {
157 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
158 GraphqlStream::fetch_all(self).await
159 }
160
161 fn config_schema(&self) -> serde_json::Value {
162 serde_json::to_value(faucet_core::schema_for!(GraphqlStreamConfig))
163 .expect("schema serialization")
164 }
165}
166
167fn extract_string(body: &Value, path: &str) -> Option<String> {
168 let results = body.query(path).ok()?;
169 match results.first()? {
170 Value::String(s) => Some(s.clone()),
171 _ => None,
172 }
173}
174
175fn extract_bool(body: &Value, path: &str) -> Option<bool> {
176 let results = body.query(path).ok()?;
177 results.first()?.as_bool()
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn extract_string_from_json() {
186 let body = json!({"data": {"users": {"pageInfo": {"endCursor": "abc123"}}}});
187 assert_eq!(
188 extract_string(&body, "$.data.users.pageInfo.endCursor"),
189 Some("abc123".into())
190 );
191 }
192
193 #[test]
194 fn extract_bool_from_json() {
195 let body = json!({"data": {"users": {"pageInfo": {"hasNextPage": true}}}});
196 assert_eq!(
197 extract_bool(&body, "$.data.users.pageInfo.hasNextPage"),
198 Some(true)
199 );
200 }
201
202 #[test]
203 fn extract_records_with_path() {
204 let config =
205 GraphqlStreamConfig::new("https://api.example.com/graphql", "query { users { id } }")
206 .records_path("$.data.users[*]");
207 let stream = GraphqlStream::new(config);
208 let body = json!({"data": {"users": [{"id": 1}, {"id": 2}]}});
209 let records = stream.extract_records(&body).unwrap();
210 assert_eq!(records.len(), 2);
211 assert_eq!(records[0]["id"], 1);
212 }
213
214 #[test]
215 fn extract_records_without_path_returns_data() {
216 let config =
217 GraphqlStreamConfig::new("https://api.example.com/graphql", "query { user { id } }");
218 let stream = GraphqlStream::new(config);
219 let body = json!({"data": {"user": {"id": 1}}});
220 let records = stream.extract_records(&body).unwrap();
221 assert_eq!(records.len(), 1);
222 assert_eq!(records[0]["user"]["id"], 1);
223 }
224}