Skip to main content

faucet_source_xml/
stream.rs

1//! XML stream executor.
2
3use crate::config::{XmlAuth, XmlPagination, XmlStreamConfig};
4use crate::convert;
5use async_trait::async_trait;
6use faucet_core::FaucetError;
7use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
8use reqwest::Client;
9use serde_json::Value;
10use std::collections::HashMap;
11
12/// A configured XML API source that handles pagination and extraction.
13pub struct XmlStream {
14    config: XmlStreamConfig,
15    client: Client,
16}
17
18impl XmlStream {
19    /// Create a new XML stream from the given configuration.
20    pub fn new(config: XmlStreamConfig) -> Self {
21        Self {
22            config,
23            client: Client::new(),
24        }
25    }
26
27    /// Fetch all records across all pages.
28    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
29        let mut all_records = Vec::new();
30        let mut pages_fetched = 0usize;
31        let mut offset = 0usize;
32        let mut page_number = None;
33        let mut prev_record_count: Option<usize> = None;
34
35        // Initialize pagination state.
36        if let Some(XmlPagination::PageNumber { start_page, .. }) = &self.config.pagination {
37            page_number = Some(*start_page);
38        }
39
40        loop {
41            if let Some(max) = self.config.max_pages
42                && pages_fetched >= max
43            {
44                tracing::warn!("max pages ({max}) reached");
45                break;
46            }
47
48            let mut params = self.config.query_params.clone();
49            self.apply_pagination_params(&mut params, page_number, offset);
50
51            let xml_text = self.execute_request(&params).await?;
52            let json = convert::xml_to_json(&xml_text)?;
53
54            let records = match &self.config.records_element_path {
55                Some(path) => convert::extract_at_path(&json, path),
56                None => vec![json],
57            };
58
59            let record_count = records.len();
60            all_records.extend(records);
61            pages_fetched += 1;
62
63            // Advance pagination or stop.
64            match &self.config.pagination {
65                Some(XmlPagination::PageNumber { page_size, .. }) => {
66                    if record_count == 0 {
67                        break;
68                    }
69                    // Stop if page_size is set and we got fewer records than the page size.
70                    if let Some(size) = page_size
71                        && record_count < *size
72                    {
73                        break;
74                    }
75                    page_number = page_number.map(|p| p + 1);
76                }
77                Some(XmlPagination::Offset { limit, .. }) => {
78                    if record_count < *limit {
79                        break;
80                    }
81                    // Loop detection: stop if record count is identical and offset hasn't advanced.
82                    if prev_record_count == Some(record_count) && record_count == 0 {
83                        tracing::warn!("offset pagination loop detected, stopping");
84                        break;
85                    }
86                    offset += record_count;
87                }
88                None => break,
89            }
90            prev_record_count = Some(record_count);
91        }
92
93        tracing::info!(
94            records = all_records.len(),
95            pages = pages_fetched,
96            "XML fetch complete"
97        );
98        Ok(all_records)
99    }
100
101    fn apply_pagination_params(
102        &self,
103        params: &mut HashMap<String, String>,
104        page_number: Option<usize>,
105        offset: usize,
106    ) {
107        match &self.config.pagination {
108            Some(XmlPagination::PageNumber {
109                param_name,
110                page_size,
111                page_size_param,
112                ..
113            }) => {
114                if let Some(page) = page_number {
115                    params.insert(param_name.clone(), page.to_string());
116                }
117                if let (Some(size), Some(param)) = (page_size, page_size_param) {
118                    params.insert(param.clone(), size.to_string());
119                }
120            }
121            Some(XmlPagination::Offset {
122                offset_param,
123                limit_param,
124                limit,
125            }) => {
126                params.insert(offset_param.clone(), offset.to_string());
127                params.insert(limit_param.clone(), limit.to_string());
128            }
129            None => {}
130        }
131    }
132
133    async fn execute_request(
134        &self,
135        params: &HashMap<String, String>,
136    ) -> Result<String, FaucetError> {
137        let url = format!(
138            "{}/{}",
139            self.config.base_url,
140            self.config.path.trim_start_matches('/')
141        );
142
143        let mut req = self
144            .client
145            .request(self.config.method.clone(), &url)
146            .headers(self.config.headers.clone())
147            .query(params);
148
149        // Apply auth.
150        match &self.config.auth {
151            XmlAuth::None => {}
152            XmlAuth::Bearer(token) => {
153                req = req.bearer_auth(token);
154            }
155            XmlAuth::Basic { username, password } => {
156                req = req.basic_auth(username, Some(password));
157            }
158            XmlAuth::Custom(headers) => {
159                req = req.headers(headers.clone());
160            }
161        }
162
163        // Set body for POST requests (SOAP).
164        if let Some(body) = &self.config.body {
165            req = req
166                .header("Content-Type", "text/xml; charset=utf-8")
167                .body(body.clone());
168        }
169
170        let resp = req.send().await.map_err(FaucetError::Http)?;
171        let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
172        resp.text().await.map_err(FaucetError::Http)
173    }
174}
175
176#[async_trait]
177impl faucet_core::Source for XmlStream {
178    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
179        XmlStream::fetch_all(self).await
180    }
181
182    fn config_schema(&self) -> serde_json::Value {
183        serde_json::to_value(faucet_core::schema_for!(XmlStreamConfig))
184            .expect("schema serialization")
185    }
186}