Skip to main content

faucet_source_xml/
config.rs

1//! XML source configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use reqwest::header::HeaderMap;
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Authentication for XML API endpoints.
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
11#[serde(tag = "type", content = "config", rename_all = "snake_case")]
12pub enum XmlAuth {
13    /// No authentication.
14    None,
15    /// Bearer token.
16    Bearer { token: String },
17    /// Basic authentication.
18    Basic { username: String, password: String },
19    /// Custom headers (e.g. SOAP action headers, API keys).
20    Custom { headers: HashMap<String, String> },
21}
22
23/// Pagination configuration for XML APIs.
24#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
25#[serde(tag = "type")]
26pub enum XmlPagination {
27    /// Page-number pagination with a query parameter.
28    PageNumber {
29        param_name: String,
30        start_page: usize,
31        page_size: Option<usize>,
32        page_size_param: Option<String>,
33    },
34    /// Offset/limit pagination.
35    Offset {
36        offset_param: String,
37        limit_param: String,
38        limit: usize,
39    },
40}
41
42/// Configuration for the XML source.
43#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
44pub struct XmlStreamConfig {
45    /// Base URL of the API.
46    pub base_url: String,
47    /// Request path (appended to base_url).
48    pub path: String,
49    /// HTTP method (GET or POST for SOAP).
50    #[serde(with = "crate::serde_helpers::http_method")]
51    #[schemars(with = "String")]
52    pub method: reqwest::Method,
53    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
54    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
55    pub auth: AuthSpec<XmlAuth>,
56    /// Additional request headers.
57    #[serde(skip, default)]
58    pub headers: HeaderMap,
59    /// Optional request body (e.g. SOAP envelope).
60    pub body: Option<String>,
61    /// Dot-separated path to the repeating element in the XML response
62    /// (e.g. `"Envelope.Body.GetUsersResponse.Users.User"`).
63    pub records_element_path: Option<String>,
64    /// Pagination configuration.
65    pub pagination: Option<XmlPagination>,
66    /// Maximum number of pages to fetch.
67    pub max_pages: Option<usize>,
68    /// Query parameters to include in every request.
69    pub query_params: std::collections::HashMap<String, String>,
70    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). The
71    /// event-driven XML parser accumulates matched subtrees into a buffer
72    /// and yields whenever the buffer reaches this size. Defaults to
73    /// [`DEFAULT_BATCH_SIZE`].
74    ///
75    /// `batch_size = 0` is the "no batching" sentinel: the document is
76    /// drained end-to-end and the entire result set is emitted in a single
77    /// page. Useful for small lookup payloads or for sinks (e.g. SQL `COPY`,
78    /// BigQuery load jobs) that prefer one large request to many small ones.
79    #[serde(default = "default_batch_size")]
80    pub batch_size: usize,
81}
82
83fn default_batch_size() -> usize {
84    DEFAULT_BATCH_SIZE
85}
86
87impl XmlStreamConfig {
88    /// Create a new config with required fields.
89    pub fn new(base_url: impl Into<String>, path: impl Into<String>) -> Self {
90        Self {
91            base_url: base_url.into(),
92            path: path.into(),
93            method: reqwest::Method::GET,
94            auth: AuthSpec::Inline(XmlAuth::None),
95            headers: HeaderMap::new(),
96            body: None,
97            records_element_path: None,
98            pagination: None,
99            max_pages: None,
100            query_params: std::collections::HashMap::new(),
101            batch_size: DEFAULT_BATCH_SIZE,
102        }
103    }
104
105    /// Set the HTTP method (default: GET).
106    pub fn method(mut self, method: reqwest::Method) -> Self {
107        self.method = method;
108        self
109    }
110
111    /// Set the authentication method.
112    pub fn auth(mut self, auth: XmlAuth) -> Self {
113        self.auth = AuthSpec::Inline(auth);
114        self
115    }
116
117    /// Set additional headers.
118    pub fn headers(mut self, headers: HeaderMap) -> Self {
119        self.headers = headers;
120        self
121    }
122
123    /// Set a SOAP or XML request body.
124    pub fn body(mut self, body: impl Into<String>) -> Self {
125        self.body = Some(body.into());
126        self
127    }
128
129    /// Set the dot-separated path to the repeating element.
130    pub fn records_element_path(mut self, path: impl Into<String>) -> Self {
131        self.records_element_path = Some(path.into());
132        self
133    }
134
135    /// Set pagination configuration.
136    pub fn pagination(mut self, pagination: XmlPagination) -> Self {
137        self.pagination = Some(pagination);
138        self
139    }
140
141    /// Set the maximum number of pages.
142    pub fn max_pages(mut self, max: usize) -> Self {
143        self.max_pages = Some(max);
144        self
145    }
146
147    /// Add a query parameter.
148    pub fn query_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
149        self.query_params.insert(key.into(), value.into());
150        self
151    }
152
153    /// Set the per-page record count for
154    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
155    ///
156    /// Pass `0` to opt out of batching — the entire document is drained and
157    /// emitted in a single [`StreamPage`](faucet_core::StreamPage).
158    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
159        self.batch_size = batch_size;
160        self
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167
168    #[test]
169    fn default_config() {
170        let config = XmlStreamConfig::new("https://api.example.com", "/users");
171        assert_eq!(config.base_url, "https://api.example.com");
172        assert_eq!(config.path, "/users");
173        assert_eq!(config.method, reqwest::Method::GET);
174        assert!(config.records_element_path.is_none());
175    }
176
177    #[test]
178    fn soap_config() {
179        let config = XmlStreamConfig::new("https://api.example.com", "/soap")
180            .method(reqwest::Method::POST)
181            .body("<Envelope><Body><GetUsers/></Body></Envelope>")
182            .records_element_path("Envelope.Body.GetUsersResponse.Users.User");
183        assert_eq!(config.method, reqwest::Method::POST);
184        assert!(config.body.is_some());
185        assert_eq!(
186            config.records_element_path.unwrap(),
187            "Envelope.Body.GetUsersResponse.Users.User"
188        );
189    }
190
191    #[test]
192    fn batch_size_defaults_to_default_batch_size() {
193        let config = XmlStreamConfig::new("https://api.example.com", "/users");
194        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
195    }
196
197    #[test]
198    fn with_batch_size_overrides_default() {
199        let config = XmlStreamConfig::new("https://api.example.com", "/users").with_batch_size(500);
200        assert_eq!(config.batch_size, 500);
201    }
202
203    #[test]
204    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
205        let config = XmlStreamConfig::new("https://api.example.com", "/users").with_batch_size(0);
206        assert_eq!(config.batch_size, 0);
207        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
208    }
209
210    #[test]
211    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
212        let config = XmlStreamConfig::new("https://api.example.com", "/users")
213            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
214        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
215    }
216
217    #[test]
218    fn batch_size_deserializes_from_json() {
219        let json = r#"{
220            "base_url": "https://api.example.com",
221            "path": "/users.xml",
222            "method": "GET",
223            "auth": { "type": "none" },
224            "body": null,
225            "records_element_path": "root.user",
226            "pagination": null,
227            "max_pages": null,
228            "query_params": {},
229            "batch_size": 250
230        }"#;
231        let config: XmlStreamConfig = serde_json::from_str(json).unwrap();
232        assert_eq!(config.batch_size, 250);
233    }
234
235    #[test]
236    fn batch_size_defaults_when_missing_from_json() {
237        // The `#[serde(default = "default_batch_size")]` attribute is the
238        // user-facing contract — older configs without `batch_size` must
239        // continue to deserialize and adopt the library default.
240        let json = r#"{
241            "base_url": "https://api.example.com",
242            "path": "/users.xml",
243            "method": "GET",
244            "auth": { "type": "none" },
245            "body": null,
246            "records_element_path": null,
247            "pagination": null,
248            "max_pages": null,
249            "query_params": {}
250        }"#;
251        let config: XmlStreamConfig = serde_json::from_str(json).unwrap();
252        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
253    }
254}