faucet_source_xml/
stream.rs1use crate::config::{XmlAuth, XmlPagination, XmlStreamConfig};
4use crate::convert;
5use async_trait::async_trait;
6use faucet_core::FaucetError;
7use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
8use reqwest::Client;
9use serde_json::Value;
10use std::collections::HashMap;
11
12pub struct XmlStream {
14 config: XmlStreamConfig,
15 client: Client,
16}
17
18impl XmlStream {
19 pub fn new(config: XmlStreamConfig) -> Self {
21 Self {
22 config,
23 client: Client::new(),
24 }
25 }
26
27 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
29 let mut all_records = Vec::new();
30 let mut pages_fetched = 0usize;
31 let mut offset = 0usize;
32 let mut page_number = None;
33 let mut prev_record_count: Option<usize> = None;
34
35 if let Some(XmlPagination::PageNumber { start_page, .. }) = &self.config.pagination {
37 page_number = Some(*start_page);
38 }
39
40 loop {
41 if let Some(max) = self.config.max_pages
42 && pages_fetched >= max
43 {
44 tracing::warn!("max pages ({max}) reached");
45 break;
46 }
47
48 let mut params = self.config.query_params.clone();
49 self.apply_pagination_params(&mut params, page_number, offset);
50
51 let xml_text = self.execute_request(¶ms).await?;
52 let json = convert::xml_to_json(&xml_text)?;
53
54 let records = match &self.config.records_element_path {
55 Some(path) => convert::extract_at_path(&json, path),
56 None => vec![json],
57 };
58
59 let record_count = records.len();
60 all_records.extend(records);
61 pages_fetched += 1;
62
63 match &self.config.pagination {
65 Some(XmlPagination::PageNumber { page_size, .. }) => {
66 if record_count == 0 {
67 break;
68 }
69 if let Some(size) = page_size
71 && record_count < *size
72 {
73 break;
74 }
75 page_number = page_number.map(|p| p + 1);
76 }
77 Some(XmlPagination::Offset { limit, .. }) => {
78 if record_count < *limit {
79 break;
80 }
81 if prev_record_count == Some(record_count) && record_count == 0 {
83 tracing::warn!("offset pagination loop detected, stopping");
84 break;
85 }
86 offset += record_count;
87 }
88 None => break,
89 }
90 prev_record_count = Some(record_count);
91 }
92
93 tracing::info!(
94 records = all_records.len(),
95 pages = pages_fetched,
96 "XML fetch complete"
97 );
98 Ok(all_records)
99 }
100
101 fn apply_pagination_params(
102 &self,
103 params: &mut HashMap<String, String>,
104 page_number: Option<usize>,
105 offset: usize,
106 ) {
107 match &self.config.pagination {
108 Some(XmlPagination::PageNumber {
109 param_name,
110 page_size,
111 page_size_param,
112 ..
113 }) => {
114 if let Some(page) = page_number {
115 params.insert(param_name.clone(), page.to_string());
116 }
117 if let (Some(size), Some(param)) = (page_size, page_size_param) {
118 params.insert(param.clone(), size.to_string());
119 }
120 }
121 Some(XmlPagination::Offset {
122 offset_param,
123 limit_param,
124 limit,
125 }) => {
126 params.insert(offset_param.clone(), offset.to_string());
127 params.insert(limit_param.clone(), limit.to_string());
128 }
129 None => {}
130 }
131 }
132
133 async fn execute_request(
134 &self,
135 params: &HashMap<String, String>,
136 ) -> Result<String, FaucetError> {
137 let url = format!(
138 "{}/{}",
139 self.config.base_url,
140 self.config.path.trim_start_matches('/')
141 );
142
143 let mut req = self
144 .client
145 .request(self.config.method.clone(), &url)
146 .headers(self.config.headers.clone())
147 .query(params);
148
149 match &self.config.auth {
151 XmlAuth::None => {}
152 XmlAuth::Bearer(token) => {
153 req = req.bearer_auth(token);
154 }
155 XmlAuth::Basic { username, password } => {
156 req = req.basic_auth(username, Some(password));
157 }
158 XmlAuth::Custom(headers) => {
159 req = req.headers(headers.clone());
160 }
161 }
162
163 if let Some(body) = &self.config.body {
165 req = req
166 .header("Content-Type", "text/xml; charset=utf-8")
167 .body(body.clone());
168 }
169
170 let resp = req.send().await.map_err(FaucetError::Http)?;
171 let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
172 resp.text().await.map_err(FaucetError::Http)
173 }
174}
175
176#[async_trait]
177impl faucet_core::Source for XmlStream {
178 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
179 XmlStream::fetch_all(self).await
180 }
181
182 fn config_schema(&self) -> serde_json::Value {
183 serde_json::to_value(faucet_core::schema_for!(XmlStreamConfig))
184 .expect("schema serialization")
185 }
186}