Skip to main content

faucet_source_xml/
stream.rs

1//! XML stream executor.
2
3use crate::config::{XmlAuth, XmlPagination, XmlStreamConfig};
4use crate::convert;
5use async_trait::async_trait;
6use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
7use faucet_core::{AuthSpec, Credential, FaucetError, SharedAuthProvider};
8use faucet_core::{Stream, StreamPage};
9use reqwest::Client;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::time::Duration;
14
15/// Content fingerprint of a fetched page, used as a pagination loop guard: a
16/// server that ignores the page/offset parameter (or clamps to the last page)
17/// returns the same non-empty page on every request, which would otherwise loop
18/// forever. Stopping when two consecutive pages fingerprint identically mirrors
19/// the REST source's body-fingerprint guard (audit #146 H4/H5).
20fn page_fingerprint(records: &[Value]) -> u64 {
21    use std::hash::{Hash, Hasher};
22    // `serde_json::Value` is not `Hash`; hash its canonical string form.
23    let mut hasher = std::collections::hash_map::DefaultHasher::new();
24    records.len().hash(&mut hasher);
25    for r in records {
26        r.to_string().hash(&mut hasher);
27    }
28    hasher.finish()
29}
30
31/// Retries on transient (5xx / connection) failures before giving up.
32const RETRY_MAX_ATTEMPTS: u32 = 3;
33/// Base exponential-backoff delay between retries.
34const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(500);
35
36/// A configured XML API source that handles pagination and extraction.
37pub struct XmlStream {
38    config: XmlStreamConfig,
39    client: Client,
40    /// Optional shared auth provider. When present it takes precedence over
41    /// inline auth, so several sources can share one token with single-flight
42    /// refresh. Used by the CLI to resolve `auth: { ref }`, and by library
43    /// callers who construct one provider and inject it into many sources.
44    auth_provider: Option<SharedAuthProvider>,
45}
46
47/// Map a [`Credential`] from a shared provider onto the XML [`XmlAuth`]
48/// representation so the existing header-application path can be reused.
49fn credential_to_auth(cred: Credential) -> XmlAuth {
50    match cred {
51        Credential::Bearer(token) => XmlAuth::Bearer { token },
52        Credential::Token(token) => XmlAuth::Custom {
53            headers: std::iter::once(("Authorization".to_string(), token)).collect(),
54        },
55        Credential::Basic { username, password } => XmlAuth::Basic { username, password },
56        Credential::Header { name, value } => XmlAuth::Custom {
57            headers: std::iter::once((name, value)).collect(),
58        },
59    }
60}
61
62impl XmlStream {
63    /// Create a new XML stream from the given configuration.
64    pub fn new(config: XmlStreamConfig) -> Self {
65        Self {
66            config,
67            client: Client::new(),
68            auth_provider: None,
69        }
70    }
71
72    /// Attach a shared [`AuthProvider`](faucet_core::AuthProvider). When set,
73    /// the provider supplies the credential for every request (taking precedence
74    /// over inline auth), so several sources can share one token with
75    /// single-flight refresh. Used by the CLI to resolve `auth: { ref }`, and by
76    /// library callers who construct one provider and inject it into many
77    /// sources.
78    pub fn with_auth_provider(mut self, provider: SharedAuthProvider) -> Self {
79        self.auth_provider = Some(provider);
80        self
81    }
82
83    /// Fetch all records across all pages.
84    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
85        self.fetch_all_with_context(&HashMap::new()).await
86    }
87
88    /// Fetch all records, substituting parent context into path, query_params, and body.
89    async fn fetch_all_with_context(
90        &self,
91        context: &HashMap<String, serde_json::Value>,
92    ) -> Result<Vec<Value>, FaucetError> {
93        let mut all_records = Vec::new();
94        let mut pages_fetched = 0usize;
95        let mut offset = 0usize;
96        let mut page_number = None;
97        let mut prev_fingerprint: Option<u64> = None;
98
99        // Initialize pagination state.
100        if let Some(XmlPagination::PageNumber { start_page, .. }) = &self.config.pagination {
101            page_number = Some(*start_page);
102        }
103
104        loop {
105            if let Some(max) = self.config.max_pages
106                && pages_fetched >= max
107            {
108                tracing::warn!("max pages ({max}) reached");
109                break;
110            }
111
112            let mut params = self.config.query_params.clone();
113            self.apply_pagination_params(&mut params, page_number, offset);
114
115            let xml_text = self.execute_request(&params, context).await?;
116            let json = convert::xml_to_json(&xml_text)?;
117
118            let records = match &self.config.records_element_path {
119                Some(path) => convert::extract_at_path(&json, path),
120                None => vec![json],
121            };
122
123            let record_count = records.len();
124            let fingerprint = page_fingerprint(&records);
125            all_records.extend(records);
126            pages_fetched += 1;
127
128            // Loop guard: a server that ignores the page/offset parameter (or
129            // clamps to the last page) returns the same non-empty page forever.
130            // Stop when two consecutive pages are identical (audit #146 H4/H5).
131            if record_count > 0 && prev_fingerprint == Some(fingerprint) {
132                tracing::warn!(
133                    "XML pagination returned an identical page; stopping to avoid an infinite loop"
134                );
135                break;
136            }
137            prev_fingerprint = Some(fingerprint);
138
139            // Advance pagination or stop.
140            match &self.config.pagination {
141                Some(XmlPagination::PageNumber { page_size, .. }) => {
142                    if record_count == 0 {
143                        break;
144                    }
145                    // Stop if page_size is set and we got fewer records than the page size.
146                    if let Some(size) = page_size
147                        && record_count < *size
148                    {
149                        break;
150                    }
151                    page_number = page_number.map(|p| p + 1);
152                }
153                Some(XmlPagination::Offset { limit, .. }) => {
154                    if record_count < *limit {
155                        break;
156                    }
157                    offset += record_count;
158                }
159                None => break,
160            }
161        }
162
163        tracing::info!(
164            records = all_records.len(),
165            pages = pages_fetched,
166            "XML fetch complete"
167        );
168        Ok(all_records)
169    }
170
171    fn apply_pagination_params(
172        &self,
173        params: &mut HashMap<String, String>,
174        page_number: Option<usize>,
175        offset: usize,
176    ) {
177        match &self.config.pagination {
178            Some(XmlPagination::PageNumber {
179                param_name,
180                page_size,
181                page_size_param,
182                ..
183            }) => {
184                if let Some(page) = page_number {
185                    params.insert(param_name.clone(), page.to_string());
186                }
187                if let (Some(size), Some(param)) = (page_size, page_size_param) {
188                    params.insert(param.clone(), size.to_string());
189                }
190            }
191            Some(XmlPagination::Offset {
192                offset_param,
193                limit_param,
194                limit,
195            }) => {
196                params.insert(offset_param.clone(), offset.to_string());
197                params.insert(limit_param.clone(), limit.to_string());
198            }
199            None => {}
200        }
201    }
202
203    async fn execute_request(
204        &self,
205        params: &HashMap<String, String>,
206        context: &HashMap<String, serde_json::Value>,
207    ) -> Result<String, FaucetError> {
208        let path = if context.is_empty() {
209            self.config.path.clone()
210        } else {
211            faucet_core::util::substitute_context(&self.config.path, context)
212        };
213
214        let url = format!("{}/{}", self.config.base_url, path.trim_start_matches('/'));
215
216        // Substitute context into query parameter values.
217        let resolved_params: HashMap<String, String> = if context.is_empty() {
218            params.clone()
219        } else {
220            params
221                .iter()
222                .map(|(k, v)| (k.clone(), faucet_core::util::substitute_context(v, context)))
223                .collect()
224        };
225
226        let mut req = self
227            .client
228            .request(self.config.method.clone(), &url)
229            .headers(self.config.headers.clone())
230            .query(&resolved_params);
231
232        // Resolve credentials to concrete auth. A shared auth provider
233        // (from `auth: { ref }` or injected by a library caller) takes
234        // precedence; otherwise inline auth is used.
235        let effective_auth: XmlAuth = if let Some(provider) = &self.auth_provider {
236            credential_to_auth(provider.credential().await?)
237        } else {
238            match &self.config.auth {
239                AuthSpec::Inline(a) => a.clone(),
240                AuthSpec::Reference(r) => {
241                    return Err(FaucetError::Auth(format!(
242                        "auth references provider '{}' but no provider was supplied; \
243                         set one via the CLI `auth:` catalog or `with_auth_provider`",
244                        r.name
245                    )));
246                }
247            }
248        };
249
250        // Apply auth.
251        match &effective_auth {
252            XmlAuth::None => {}
253            XmlAuth::Bearer { token } => {
254                req = req.bearer_auth(token);
255            }
256            XmlAuth::Basic { username, password } => {
257                req = req.basic_auth(username, Some(password));
258            }
259            XmlAuth::Custom { headers } => {
260                let mut hm = reqwest::header::HeaderMap::new();
261                for (name, value) in headers {
262                    let n =
263                        reqwest::header::HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
264                            FaucetError::Auth(format!("invalid custom header name {name:?}: {e}"))
265                        })?;
266                    let v = reqwest::header::HeaderValue::from_str(value).map_err(|e| {
267                        FaucetError::Auth(format!("invalid custom header value for {name:?}: {e}"))
268                    })?;
269                    hm.insert(n, v);
270                }
271                req = req.headers(hm);
272            }
273        }
274
275        // Set body for POST requests (SOAP), with context substitution.
276        if let Some(body) = &self.config.body {
277            let resolved_body = if context.is_empty() {
278                body.clone()
279            } else {
280                faucet_core::util::substitute_context(body, context)
281            };
282            req = req
283                .header("Content-Type", "text/xml; charset=utf-8")
284                .body(resolved_body);
285        }
286
287        // Retry transient failures (5xx / connection resets) with jittered
288        // backoff, matching the REST source's reliability layer (#78/#16).
289        // The request body is a String, so `try_clone` always succeeds.
290        faucet_core::execute_with_retry(RETRY_MAX_ATTEMPTS, RETRY_BASE_BACKOFF, || {
291            let attempt = req.try_clone();
292            async move {
293                let req = attempt.ok_or_else(|| {
294                    FaucetError::Source("xml: request is not cloneable for retry".into())
295                })?;
296                let resp = req.send().await.map_err(FaucetError::Http)?;
297                let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
298                resp.text().await.map_err(FaucetError::Http)
299            }
300        })
301        .await
302    }
303}
304
305#[async_trait]
306impl faucet_core::Source for XmlStream {
307    async fn fetch_with_context(
308        &self,
309        context: &std::collections::HashMap<String, serde_json::Value>,
310    ) -> Result<Vec<Value>, FaucetError> {
311        self.fetch_all_with_context(context).await
312    }
313
314    /// Stream records from the XML response without materialising the whole
315    /// document tree. The event-driven parser only builds JSON values for
316    /// elements matching [`XmlStreamConfig::records_element_path`]; other
317    /// elements are observed and discarded, so client-side memory is bounded
318    /// at `O(batch_size * record_size)` regardless of how large the document
319    /// is.
320    ///
321    /// Records are accumulated into a buffer of
322    /// [`XmlStreamConfig::batch_size`] entries and yielded as a
323    /// [`StreamPage`] once the buffer is full. The trailing partial buffer
324    /// (if any) is emitted after the parser hits EOF and all pagination
325    /// rounds drain.
326    ///
327    /// The trait-level `batch_size` argument is intentionally ignored in
328    /// favour of the config field — the config is the user-facing knob the
329    /// README documents, and routing the pipeline-supplied hint through it
330    /// would silently override an explicit config value. `batch_size = 0`
331    /// drains every page into a single emitted page.
332    ///
333    /// Bookmarks are always `None` — the XML source has no
334    /// incremental-replication mode today; pagination only walks the
335    /// API's own page-number / offset cursor.
336    fn stream_pages<'a>(
337        &'a self,
338        context: &'a HashMap<String, Value>,
339        _batch_size: usize,
340    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
341        let batch_size = self.config.batch_size;
342        let owned_context = context.clone();
343
344        Box::pin(async_stream::try_stream! {
345            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
346            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
347            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
348            let mut total = 0usize;
349            let mut pages_fetched = 0usize;
350            let mut offset = 0usize;
351            let mut page_number = None;
352            let mut prev_fingerprint: Option<u64> = None;
353
354            if let Some(XmlPagination::PageNumber { start_page, .. }) =
355                &self.config.pagination
356            {
357                page_number = Some(*start_page);
358            }
359
360            loop {
361                if let Some(max) = self.config.max_pages
362                    && pages_fetched >= max
363                {
364                    tracing::warn!("max pages ({max}) reached");
365                    break;
366                }
367
368                let mut params = self.config.query_params.clone();
369                self.apply_pagination_params(&mut params, page_number, offset);
370
371                let xml_text = self.execute_request(&params, &owned_context).await?;
372
373                // Event-driven extraction: only the matched subtree is
374                // ever materialised. The closure pushes into the local
375                // buffer; once it crosses `chunk`, the surrounding loop
376                // can flush, but we can't `yield` from inside the closure,
377                // so we collect this HTTP page's records into a scratch
378                // Vec and then iterate them after.
379                let mut page_records: Vec<Value> = Vec::new();
380                convert::stream_extract(
381                    &xml_text,
382                    self.config.records_element_path.as_deref(),
383                    |rec| page_records.push(rec),
384                )?;
385
386                let record_count = page_records.len();
387                let fingerprint = page_fingerprint(&page_records);
388
389                for rec in page_records.drain(..) {
390                    buffer.push(rec);
391                    if buffer.len() >= chunk {
392                        let flush = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
393                        total += flush.len();
394                        yield StreamPage { records: flush, bookmark: None };
395                    }
396                }
397                pages_fetched += 1;
398
399                // Loop guard: stop when two consecutive pages are identical — a
400                // server ignoring the page/offset parameter (or clamping to the
401                // last page) returns the same non-empty page forever (#146 H4/H5).
402                if record_count > 0 && prev_fingerprint == Some(fingerprint) {
403                    tracing::warn!(
404                        "XML pagination returned an identical page; stopping to avoid an infinite loop"
405                    );
406                    break;
407                }
408                prev_fingerprint = Some(fingerprint);
409
410                // Advance pagination using the same rules as
411                // `fetch_all_with_context`.
412                match &self.config.pagination {
413                    Some(XmlPagination::PageNumber { page_size, .. }) => {
414                        if record_count == 0 {
415                            break;
416                        }
417                        if let Some(size) = page_size
418                            && record_count < *size
419                        {
420                            break;
421                        }
422                        page_number = page_number.map(|p| p + 1);
423                    }
424                    Some(XmlPagination::Offset { limit, .. }) => {
425                        if record_count < *limit {
426                            break;
427                        }
428                        offset += record_count;
429                    }
430                    None => break,
431                }
432            }
433
434            if !buffer.is_empty() {
435                total += buffer.len();
436                yield StreamPage { records: buffer, bookmark: None };
437            }
438
439            tracing::info!(
440                records = total,
441                pages = pages_fetched,
442                batch_size,
443                "XML source stream complete",
444            );
445        })
446    }
447
448    fn config_schema(&self) -> serde_json::Value {
449        serde_json::to_value(faucet_core::schema_for!(XmlStreamConfig))
450            .expect("schema serialization")
451    }
452}