1use crate::config::{XmlAuth, XmlPagination, XmlStreamConfig};
4use crate::convert;
5use async_trait::async_trait;
6use faucet_core::util::{self, DEFAULT_ERROR_BODY_MAX_LEN};
7use faucet_core::{AuthSpec, Credential, FaucetError, SharedAuthProvider};
8use faucet_core::{Stream, StreamPage};
9use reqwest::Client;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::pin::Pin;
13use std::time::Duration;
14
15fn page_fingerprint(records: &[Value]) -> u64 {
21 use std::hash::{Hash, Hasher};
22 let mut hasher = std::collections::hash_map::DefaultHasher::new();
24 records.len().hash(&mut hasher);
25 for r in records {
26 r.to_string().hash(&mut hasher);
27 }
28 hasher.finish()
29}
30
31const RETRY_MAX_ATTEMPTS: u32 = 3;
33const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(500);
35
36pub struct XmlStream {
38 config: XmlStreamConfig,
39 client: Client,
40 auth_provider: Option<SharedAuthProvider>,
45}
46
47fn credential_to_auth(cred: Credential) -> XmlAuth {
50 match cred {
51 Credential::Bearer(token) => XmlAuth::Bearer { token },
52 Credential::Token(token) => XmlAuth::Custom {
53 headers: std::iter::once(("Authorization".to_string(), token)).collect(),
54 },
55 Credential::Basic { username, password } => XmlAuth::Basic { username, password },
56 Credential::Header { name, value } => XmlAuth::Custom {
57 headers: std::iter::once((name, value)).collect(),
58 },
59 }
60}
61
62impl XmlStream {
63 pub fn new(config: XmlStreamConfig) -> Self {
65 Self {
66 config,
67 client: Client::new(),
68 auth_provider: None,
69 }
70 }
71
72 pub fn with_auth_provider(mut self, provider: SharedAuthProvider) -> Self {
79 self.auth_provider = Some(provider);
80 self
81 }
82
83 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
85 self.fetch_all_with_context(&HashMap::new()).await
86 }
87
88 async fn fetch_all_with_context(
90 &self,
91 context: &HashMap<String, serde_json::Value>,
92 ) -> Result<Vec<Value>, FaucetError> {
93 let mut all_records = Vec::new();
94 let mut pages_fetched = 0usize;
95 let mut offset = 0usize;
96 let mut page_number = None;
97 let mut prev_fingerprint: Option<u64> = None;
98
99 if let Some(XmlPagination::PageNumber { start_page, .. }) = &self.config.pagination {
101 page_number = Some(*start_page);
102 }
103
104 loop {
105 if let Some(max) = self.config.max_pages
106 && pages_fetched >= max
107 {
108 tracing::warn!("max pages ({max}) reached");
109 break;
110 }
111
112 let mut params = self.config.query_params.clone();
113 self.apply_pagination_params(&mut params, page_number, offset);
114
115 let xml_text = self.execute_request(¶ms, context).await?;
116 let json = convert::xml_to_json(&xml_text)?;
117
118 let records = match &self.config.records_element_path {
119 Some(path) => convert::extract_at_path(&json, path),
120 None => vec![json],
121 };
122
123 let record_count = records.len();
124 let fingerprint = page_fingerprint(&records);
125 all_records.extend(records);
126 pages_fetched += 1;
127
128 if record_count > 0 && prev_fingerprint == Some(fingerprint) {
132 tracing::warn!(
133 "XML pagination returned an identical page; stopping to avoid an infinite loop"
134 );
135 break;
136 }
137 prev_fingerprint = Some(fingerprint);
138
139 match &self.config.pagination {
141 Some(XmlPagination::PageNumber { page_size, .. }) => {
142 if record_count == 0 {
143 break;
144 }
145 if let Some(size) = page_size
147 && record_count < *size
148 {
149 break;
150 }
151 page_number = page_number.map(|p| p + 1);
152 }
153 Some(XmlPagination::Offset { limit, .. }) => {
154 if record_count < *limit {
155 break;
156 }
157 offset += record_count;
158 }
159 None => break,
160 }
161 }
162
163 tracing::info!(
164 records = all_records.len(),
165 pages = pages_fetched,
166 "XML fetch complete"
167 );
168 Ok(all_records)
169 }
170
171 fn apply_pagination_params(
172 &self,
173 params: &mut HashMap<String, String>,
174 page_number: Option<usize>,
175 offset: usize,
176 ) {
177 match &self.config.pagination {
178 Some(XmlPagination::PageNumber {
179 param_name,
180 page_size,
181 page_size_param,
182 ..
183 }) => {
184 if let Some(page) = page_number {
185 params.insert(param_name.clone(), page.to_string());
186 }
187 if let (Some(size), Some(param)) = (page_size, page_size_param) {
188 params.insert(param.clone(), size.to_string());
189 }
190 }
191 Some(XmlPagination::Offset {
192 offset_param,
193 limit_param,
194 limit,
195 }) => {
196 params.insert(offset_param.clone(), offset.to_string());
197 params.insert(limit_param.clone(), limit.to_string());
198 }
199 None => {}
200 }
201 }
202
203 async fn execute_request(
204 &self,
205 params: &HashMap<String, String>,
206 context: &HashMap<String, serde_json::Value>,
207 ) -> Result<String, FaucetError> {
208 let path = if context.is_empty() {
209 self.config.path.clone()
210 } else {
211 faucet_core::util::substitute_context(&self.config.path, context)
212 };
213
214 let url = format!("{}/{}", self.config.base_url, path.trim_start_matches('/'));
215
216 let resolved_params: HashMap<String, String> = if context.is_empty() {
218 params.clone()
219 } else {
220 params
221 .iter()
222 .map(|(k, v)| (k.clone(), faucet_core::util::substitute_context(v, context)))
223 .collect()
224 };
225
226 let mut req = self
227 .client
228 .request(self.config.method.clone(), &url)
229 .headers(self.config.headers.clone())
230 .query(&resolved_params);
231
232 let effective_auth: XmlAuth = if let Some(provider) = &self.auth_provider {
236 credential_to_auth(provider.credential().await?)
237 } else {
238 match &self.config.auth {
239 AuthSpec::Inline(a) => a.clone(),
240 AuthSpec::Reference(r) => {
241 return Err(FaucetError::Auth(format!(
242 "auth references provider '{}' but no provider was supplied; \
243 set one via the CLI `auth:` catalog or `with_auth_provider`",
244 r.name
245 )));
246 }
247 }
248 };
249
250 match &effective_auth {
252 XmlAuth::None => {}
253 XmlAuth::Bearer { token } => {
254 req = req.bearer_auth(token);
255 }
256 XmlAuth::Basic { username, password } => {
257 req = req.basic_auth(username, Some(password));
258 }
259 XmlAuth::Custom { headers } => {
260 let mut hm = reqwest::header::HeaderMap::new();
261 for (name, value) in headers {
262 let n =
263 reqwest::header::HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
264 FaucetError::Auth(format!("invalid custom header name {name:?}: {e}"))
265 })?;
266 let v = reqwest::header::HeaderValue::from_str(value).map_err(|e| {
267 FaucetError::Auth(format!("invalid custom header value for {name:?}: {e}"))
268 })?;
269 hm.insert(n, v);
270 }
271 req = req.headers(hm);
272 }
273 }
274
275 if let Some(body) = &self.config.body {
277 let resolved_body = if context.is_empty() {
278 body.clone()
279 } else {
280 faucet_core::util::substitute_context(body, context)
281 };
282 req = req
283 .header("Content-Type", "text/xml; charset=utf-8")
284 .body(resolved_body);
285 }
286
287 faucet_core::execute_with_retry(RETRY_MAX_ATTEMPTS, RETRY_BASE_BACKOFF, || {
291 let attempt = req.try_clone();
292 async move {
293 let req = attempt.ok_or_else(|| {
294 FaucetError::Source("xml: request is not cloneable for retry".into())
295 })?;
296 let resp = req.send().await.map_err(FaucetError::Http)?;
297 let resp = util::check_http_response(resp, DEFAULT_ERROR_BODY_MAX_LEN).await?;
298 resp.text().await.map_err(FaucetError::Http)
299 }
300 })
301 .await
302 }
303}
304
305#[async_trait]
306impl faucet_core::Source for XmlStream {
307 async fn fetch_with_context(
308 &self,
309 context: &std::collections::HashMap<String, serde_json::Value>,
310 ) -> Result<Vec<Value>, FaucetError> {
311 self.fetch_all_with_context(context).await
312 }
313
314 fn stream_pages<'a>(
337 &'a self,
338 context: &'a HashMap<String, Value>,
339 _batch_size: usize,
340 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
341 let batch_size = self.config.batch_size;
342 let owned_context = context.clone();
343
344 Box::pin(async_stream::try_stream! {
345 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
346 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
347 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
348 let mut total = 0usize;
349 let mut pages_fetched = 0usize;
350 let mut offset = 0usize;
351 let mut page_number = None;
352 let mut prev_fingerprint: Option<u64> = None;
353
354 if let Some(XmlPagination::PageNumber { start_page, .. }) =
355 &self.config.pagination
356 {
357 page_number = Some(*start_page);
358 }
359
360 loop {
361 if let Some(max) = self.config.max_pages
362 && pages_fetched >= max
363 {
364 tracing::warn!("max pages ({max}) reached");
365 break;
366 }
367
368 let mut params = self.config.query_params.clone();
369 self.apply_pagination_params(&mut params, page_number, offset);
370
371 let xml_text = self.execute_request(¶ms, &owned_context).await?;
372
373 let mut page_records: Vec<Value> = Vec::new();
380 convert::stream_extract(
381 &xml_text,
382 self.config.records_element_path.as_deref(),
383 |rec| page_records.push(rec),
384 )?;
385
386 let record_count = page_records.len();
387 let fingerprint = page_fingerprint(&page_records);
388
389 for rec in page_records.drain(..) {
390 buffer.push(rec);
391 if buffer.len() >= chunk {
392 let flush = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
393 total += flush.len();
394 yield StreamPage { records: flush, bookmark: None };
395 }
396 }
397 pages_fetched += 1;
398
399 if record_count > 0 && prev_fingerprint == Some(fingerprint) {
403 tracing::warn!(
404 "XML pagination returned an identical page; stopping to avoid an infinite loop"
405 );
406 break;
407 }
408 prev_fingerprint = Some(fingerprint);
409
410 match &self.config.pagination {
413 Some(XmlPagination::PageNumber { page_size, .. }) => {
414 if record_count == 0 {
415 break;
416 }
417 if let Some(size) = page_size
418 && record_count < *size
419 {
420 break;
421 }
422 page_number = page_number.map(|p| p + 1);
423 }
424 Some(XmlPagination::Offset { limit, .. }) => {
425 if record_count < *limit {
426 break;
427 }
428 offset += record_count;
429 }
430 None => break,
431 }
432 }
433
434 if !buffer.is_empty() {
435 total += buffer.len();
436 yield StreamPage { records: buffer, bookmark: None };
437 }
438
439 tracing::info!(
440 records = total,
441 pages = pages_fetched,
442 batch_size,
443 "XML source stream complete",
444 );
445 })
446 }
447
448 fn config_schema(&self) -> serde_json::Value {
449 serde_json::to_value(faucet_core::schema_for!(XmlStreamConfig))
450 .expect("schema serialization")
451 }
452}