Skip to main content

faucet_source_graphql/
stream.rs

1//! GraphQL stream executor.
2
3use crate::config::{GraphqlAuth, GraphqlStreamConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
7use jsonpath_rust::JsonPath;
8use reqwest::Client;
9use serde_json::{Value, json};
10
11/// A configured GraphQL source that handles pagination and extraction.
12pub struct GraphqlStream {
13    config: GraphqlStreamConfig,
14    client: Client,
15}
16
17impl GraphqlStream {
18    /// Create a new GraphQL stream from the given configuration.
19    pub fn new(config: GraphqlStreamConfig) -> Self {
20        Self {
21            config,
22            client: Client::new(),
23        }
24    }
25
26    /// Fetch all records across all pages.
27    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
28        let mut all_records = Vec::new();
29        let mut cursor: Option<String> = None;
30        let mut pages_fetched = 0usize;
31        let mut prev_cursor: Option<String> = None;
32
33        loop {
34            if let Some(max) = self.config.max_pages
35                && pages_fetched >= max
36            {
37                tracing::warn!("max pages ({max}) reached");
38                break;
39            }
40
41            let body = self.execute_query(&cursor).await?;
42            let records = self.extract_records(&body)?;
43            all_records.extend(records);
44            pages_fetched += 1;
45
46            // Check pagination.
47            match &self.config.pagination {
48                Some(pag) => {
49                    let has_next = extract_bool(&body, &pag.has_next_page_path).unwrap_or(false);
50                    if !has_next {
51                        break;
52                    }
53                    let next_cursor = extract_string(&body, &pag.cursor_path);
54                    if next_cursor.is_none() {
55                        break;
56                    }
57                    // Loop detection: stop if cursor hasn't changed.
58                    if next_cursor == prev_cursor {
59                        tracing::warn!("cursor loop detected, stopping pagination");
60                        break;
61                    }
62                    prev_cursor = cursor;
63                    cursor = next_cursor;
64                }
65                None => break,
66            }
67        }
68
69        tracing::info!(
70            records = all_records.len(),
71            pages = pages_fetched,
72            "GraphQL fetch complete"
73        );
74        Ok(all_records)
75    }
76
77    /// Execute a single GraphQL query.
78    async fn execute_query(&self, cursor: &Option<String>) -> Result<Value, FaucetError> {
79        let mut variables = self.config.variables.clone();
80
81        // Inject cursor and page size into variables.
82        if let (Some(pag), Some(cursor_val)) = (&self.config.pagination, cursor)
83            && let Value::Object(ref mut map) = variables
84        {
85            map.insert(pag.cursor_variable.clone(), json!(cursor_val));
86        }
87        if let Some(pag) = &self.config.pagination
88            && let (Some(size), Value::Object(map)) = (pag.page_size, &mut variables)
89        {
90            map.insert(pag.page_size_variable.clone(), json!(size));
91        }
92
93        let payload = json!({
94            "query": self.config.query,
95            "variables": variables,
96        });
97
98        let mut req = self
99            .client
100            .post(&self.config.endpoint)
101            .headers(self.config.headers.clone())
102            .json(&payload);
103
104        // Apply auth.
105        match &self.config.auth {
106            GraphqlAuth::None => {}
107            GraphqlAuth::Bearer(token) => {
108                req = req.bearer_auth(token);
109            }
110            GraphqlAuth::Custom(headers) => {
111                req = req.headers(headers.clone());
112            }
113        }
114
115        let resp = req.send().await.map_err(FaucetError::Http)?;
116        let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
117
118        let body: Value = resp.json().await.map_err(FaucetError::Http)?;
119
120        // Check for GraphQL-level errors.
121        if let Some(errors) = body.get("errors")
122            && let Some(arr) = errors.as_array()
123            && !arr.is_empty()
124        {
125            let msg = arr
126                .iter()
127                .filter_map(|e| e.get("message").and_then(|m| m.as_str()))
128                .collect::<Vec<_>>()
129                .join("; ");
130            return Err(FaucetError::HttpStatus {
131                status: 200,
132                url: self.config.endpoint.clone(),
133                body: format!("GraphQL errors: {msg}"),
134            });
135        }
136
137        Ok(body)
138    }
139
140    /// Extract records from a GraphQL response using the configured JSONPath.
141    fn extract_records(&self, body: &Value) -> Result<Vec<Value>, FaucetError> {
142        match &self.config.records_path {
143            Some(path) => util::extract_records(body, Some(path)),
144            None => {
145                // GraphQL-specific: return the `data` field as a single record.
146                match body.get("data") {
147                    Some(data) => Ok(vec![data.clone()]),
148                    None => Ok(vec![body.clone()]),
149                }
150            }
151        }
152    }
153}
154
155#[async_trait]
156impl faucet_core::Source for GraphqlStream {
157    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
158        GraphqlStream::fetch_all(self).await
159    }
160
161    fn config_schema(&self) -> serde_json::Value {
162        serde_json::to_value(faucet_core::schema_for!(GraphqlStreamConfig))
163            .expect("schema serialization")
164    }
165}
166
167fn extract_string(body: &Value, path: &str) -> Option<String> {
168    let results = body.query(path).ok()?;
169    match results.first()? {
170        Value::String(s) => Some(s.clone()),
171        _ => None,
172    }
173}
174
175fn extract_bool(body: &Value, path: &str) -> Option<bool> {
176    let results = body.query(path).ok()?;
177    results.first()?.as_bool()
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn extract_string_from_json() {
186        let body = json!({"data": {"users": {"pageInfo": {"endCursor": "abc123"}}}});
187        assert_eq!(
188            extract_string(&body, "$.data.users.pageInfo.endCursor"),
189            Some("abc123".into())
190        );
191    }
192
193    #[test]
194    fn extract_bool_from_json() {
195        let body = json!({"data": {"users": {"pageInfo": {"hasNextPage": true}}}});
196        assert_eq!(
197            extract_bool(&body, "$.data.users.pageInfo.hasNextPage"),
198            Some(true)
199        );
200    }
201
202    #[test]
203    fn extract_records_with_path() {
204        let config =
205            GraphqlStreamConfig::new("https://api.example.com/graphql", "query { users { id } }")
206                .records_path("$.data.users[*]");
207        let stream = GraphqlStream::new(config);
208        let body = json!({"data": {"users": [{"id": 1}, {"id": 2}]}});
209        let records = stream.extract_records(&body).unwrap();
210        assert_eq!(records.len(), 2);
211        assert_eq!(records[0]["id"], 1);
212    }
213
214    #[test]
215    fn extract_records_without_path_returns_data() {
216        let config =
217            GraphqlStreamConfig::new("https://api.example.com/graphql", "query { user { id } }");
218        let stream = GraphqlStream::new(config);
219        let body = json!({"data": {"user": {"id": 1}}});
220        let records = stream.extract_records(&body).unwrap();
221        assert_eq!(records.len(), 1);
222        assert_eq!(records[0]["user"]["id"], 1);
223    }
224}