Skip to main content

faucet_source_graphql/
config.rs

1//! GraphQL source configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use reqwest::header::HeaderMap;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9
10/// Authentication for GraphQL endpoints.
11#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
12#[serde(tag = "type", content = "config", rename_all = "snake_case")]
13pub enum GraphqlAuth {
14    /// No authentication.
15    None,
16    /// Bearer token in the Authorization header.
17    Bearer { token: String },
18    /// Custom headers (e.g. API keys, cookies).
19    Custom { headers: HashMap<String, String> },
20}
21
22/// Cursor-based pagination configuration for GraphQL.
23///
24/// Most GraphQL APIs use the Relay cursor specification with
25/// `pageInfo { hasNextPage, endCursor }`.
26#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
27pub struct GraphqlPagination {
28    /// JSONPath to the `hasNextPage` boolean in the response.
29    pub has_next_page_path: String,
30    /// JSONPath to the `endCursor` string in the response.
31    pub cursor_path: String,
32    /// Name of the cursor variable in the GraphQL query (default: `"after"`).
33    pub cursor_variable: String,
34    /// Name of the page size variable (default: `"first"`).
35    ///
36    /// The per-page record count itself comes from
37    /// [`GraphqlStreamConfig::batch_size`] — the variable named here is the
38    /// GraphQL variable that the `batch_size` value is injected into on each
39    /// request. The plain `batch_size = 0` sentinel omits the variable so the
40    /// upstream uses its own default page size.
41    pub page_size_variable: String,
42}
43
44impl Default for GraphqlPagination {
45    fn default() -> Self {
46        Self {
47            has_next_page_path: "$.data.*.pageInfo.hasNextPage".into(),
48            cursor_path: "$.data.*.pageInfo.endCursor".into(),
49            cursor_variable: "after".into(),
50            page_size_variable: "first".into(),
51        }
52    }
53}
54
55/// Configuration for the GraphQL source.
56#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
57pub struct GraphqlStreamConfig {
58    /// GraphQL endpoint URL.
59    pub endpoint: String,
60    /// The GraphQL query string.
61    pub query: String,
62    /// Variables to pass with the query.
63    pub variables: Value,
64    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
65    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
66    pub auth: AuthSpec<GraphqlAuth>,
67    /// Additional request headers.
68    #[serde(skip, default)]
69    pub headers: HeaderMap,
70    /// JSONPath expression to extract records from the response.
71    pub records_path: Option<String>,
72    /// Pagination configuration. `None` for single-page queries.
73    pub pagination: Option<GraphqlPagination>,
74    /// Maximum number of pages to fetch.
75    pub max_pages: Option<usize>,
76    /// Records per emitted [`StreamPage`](faucet_core::StreamPage), and the
77    /// value injected as the GraphQL `first:` cursor argument (or whatever
78    /// variable name [`GraphqlPagination::page_size_variable`] specifies).
79    /// Defaults to [`DEFAULT_BATCH_SIZE`].
80    ///
81    /// `batch_size = 0` is the "no batching" sentinel: the page-size variable
82    /// is omitted from the request so the upstream uses its own default page
83    /// size, and the entire result set is emitted as a single page. If the
84    /// upstream schema requires a non-null `first:` argument this will
85    /// surface as `FaucetError::Config` at stream-time.
86    #[serde(default = "default_batch_size")]
87    pub batch_size: usize,
88}
89
90fn default_batch_size() -> usize {
91    DEFAULT_BATCH_SIZE
92}
93
94impl GraphqlStreamConfig {
95    /// Create a new config with an endpoint and query.
96    pub fn new(endpoint: impl Into<String>, query: impl Into<String>) -> Self {
97        Self {
98            endpoint: endpoint.into(),
99            query: query.into(),
100            variables: Value::Object(Default::default()),
101            auth: AuthSpec::Inline(GraphqlAuth::None),
102            headers: HeaderMap::new(),
103            records_path: None,
104            pagination: None,
105            max_pages: None,
106            batch_size: DEFAULT_BATCH_SIZE,
107        }
108    }
109
110    /// Set the GraphQL variables.
111    pub fn variables(mut self, vars: Value) -> Self {
112        self.variables = vars;
113        self
114    }
115
116    /// Set the authentication method.
117    pub fn auth(mut self, auth: GraphqlAuth) -> Self {
118        self.auth = AuthSpec::Inline(auth);
119        self
120    }
121
122    /// Set additional headers.
123    pub fn headers(mut self, headers: HeaderMap) -> Self {
124        self.headers = headers;
125        self
126    }
127
128    /// Set the JSONPath expression for record extraction.
129    pub fn records_path(mut self, path: impl Into<String>) -> Self {
130        self.records_path = Some(path.into());
131        self
132    }
133
134    /// Enable cursor-based pagination with the given configuration.
135    pub fn pagination(mut self, pagination: GraphqlPagination) -> Self {
136        self.pagination = Some(pagination);
137        self
138    }
139
140    /// Set the maximum number of pages to fetch.
141    pub fn max_pages(mut self, max: usize) -> Self {
142        self.max_pages = Some(max);
143        self
144    }
145
146    /// Set the per-page record count for [`Source::stream_pages`](faucet_core::Source::stream_pages)
147    /// and the GraphQL `first:` cursor argument.
148    ///
149    /// Pass `0` to opt out of batching — the page-size variable is omitted
150    /// from the request so the upstream uses its own default page size, and
151    /// the response is emitted as a single [`StreamPage`](faucet_core::StreamPage).
152    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
153        self.batch_size = batch_size;
154        self
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use serde_json::json;
162
163    #[test]
164    fn default_config() {
165        let config = GraphqlStreamConfig::new(
166            "https://api.example.com/graphql",
167            "query { users { id name } }",
168        );
169        assert_eq!(config.endpoint, "https://api.example.com/graphql");
170        assert!(config.records_path.is_none());
171        assert!(config.pagination.is_none());
172        assert!(config.max_pages.is_none());
173    }
174
175    #[test]
176    fn builder_methods() {
177        let config =
178            GraphqlStreamConfig::new("https://api.example.com/graphql", "query { users { id } }")
179                .variables(json!({"org": "acme"}))
180                .records_path("$.data.users.edges[*].node")
181                .max_pages(10)
182                .auth(GraphqlAuth::Bearer {
183                    token: "token".into(),
184                });
185        assert_eq!(config.variables["org"], "acme");
186        assert_eq!(config.records_path.unwrap(), "$.data.users.edges[*].node");
187        assert_eq!(config.max_pages, Some(10));
188    }
189
190    #[test]
191    fn default_pagination() {
192        let pag = GraphqlPagination::default();
193        assert_eq!(pag.cursor_variable, "after");
194        assert_eq!(pag.page_size_variable, "first");
195    }
196
197    #[test]
198    fn batch_size_defaults_to_default_batch_size() {
199        let config = GraphqlStreamConfig::new("https://api.example.com/graphql", "query { x }");
200        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
201    }
202
203    #[test]
204    fn with_batch_size_overrides_default() {
205        let config = GraphqlStreamConfig::new("https://api.example.com/graphql", "query { x }")
206            .with_batch_size(250);
207        assert_eq!(config.batch_size, 250);
208    }
209
210    #[test]
211    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
212        let config = GraphqlStreamConfig::new("https://api.example.com/graphql", "query { x }")
213            .with_batch_size(0);
214        assert_eq!(config.batch_size, 0);
215        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
216    }
217
218    #[test]
219    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
220        let config = GraphqlStreamConfig::new("https://api.example.com/graphql", "query { x }")
221            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
222        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
223    }
224
225    #[test]
226    fn batch_size_deserializes_from_json() {
227        let json = r#"{
228            "endpoint": "https://api.example.com/graphql",
229            "query": "query { x }",
230            "variables": {},
231            "auth": {"type": "none"},
232            "records_path": null,
233            "pagination": null,
234            "max_pages": null,
235            "batch_size": 500
236        }"#;
237        let config: GraphqlStreamConfig = serde_json::from_str(json).unwrap();
238        assert_eq!(config.batch_size, 500);
239    }
240}