1use anyhow::{Context, Result};
21use async_trait::async_trait;
22use chrono::Utc;
23use log::{debug, error, info, warn};
24use reqwest::Client;
25use std::collections::HashSet;
26use std::time::Duration;
27
28use drasi_core::models::SourceChange;
29use drasi_lib::bootstrap::{
30 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
31};
32use drasi_lib::channels::BootstrapEvent;
33
34use crate::auth::{self, ResolvedAuth};
35use crate::config::{EndpointConfig, HttpBootstrapConfig, HttpMethod, OperationType};
36use crate::content_parser::{self, ContentType};
37use crate::pagination::{self, NextPage, Paginator};
38use crate::response;
39use crate::template_engine::TemplateEngine;
40
41const DEFAULT_MAX_PAGES: u64 = 10_000;
43
44const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
46
47fn safe_truncate(s: &str, max_chars: usize) -> &str {
50 if s.len() <= max_chars {
51 return s;
52 }
53 let mut end = max_chars;
54 while end > 0 && !s.is_char_boundary(end) {
55 end -= 1;
56 }
57 &s[..end]
58}
59
60struct ResolvedEndpoint {
62 config: EndpointConfig,
63 auth: Option<ResolvedAuth>,
64}
65
66pub struct HttpBootstrapProvider {
68 config: HttpBootstrapConfig,
69 client: Client,
70 endpoints: Vec<ResolvedEndpoint>,
71 engine: TemplateEngine,
72}
73
74impl HttpBootstrapProvider {
75 pub fn new(config: HttpBootstrapConfig) -> Result<Self> {
77 let timeout = Duration::from_secs(config.timeout_seconds);
78 let client = Client::builder()
79 .timeout(timeout)
80 .http1_only()
84 .build()
85 .context("Failed to build HTTP client")?;
86
87 let mut endpoints = Vec::new();
89 for endpoint in &config.endpoints {
90 let auth = match &endpoint.auth {
91 Some(auth_config) => Some(
92 auth::resolve_auth(auth_config, &client)
93 .context("Failed to resolve authentication")?,
94 ),
95 None => None,
96 };
97 endpoints.push(ResolvedEndpoint {
98 config: endpoint.clone(),
99 auth,
100 });
101 }
102
103 let engine = TemplateEngine::new();
104
105 Ok(Self {
106 config,
107 client,
108 endpoints,
109 engine,
110 })
111 }
112
113 async fn fetch_endpoint(
115 &self,
116 endpoint: &EndpointConfig,
117 auth: &Option<ResolvedAuth>,
118 context: &BootstrapContext,
119 request: &BootstrapRequest,
120 event_tx: &drasi_lib::channels::BootstrapEventSender,
121 ) -> Result<u64> {
122 let mut total_sent: u64 = 0;
123
124 let content_type_override = endpoint
126 .response
127 .content_type
128 .as_ref()
129 .map(ContentType::from_override);
130
131 let origin_host = pagination::extract_origin_host(&endpoint.url).unwrap_or_default();
133 let mut paginator: Option<Box<dyn Paginator>> = endpoint
134 .pagination
135 .as_ref()
136 .map(|p| pagination::create_paginator(p, origin_host));
137
138 let initial_params: Vec<(String, String)> = paginator
140 .as_ref()
141 .map(|p| p.initial_params())
142 .unwrap_or_default();
143
144 let mut current_url = endpoint.url.clone();
145 let mut current_params = initial_params;
146 let mut page_num = 0u64;
147 let mut seen_requests: HashSet<(String, Vec<(String, String)>)> = HashSet::new();
148
149 loop {
150 page_num += 1;
151
152 let max_pages = self.config.max_pages.unwrap_or(DEFAULT_MAX_PAGES);
154 if page_num > max_pages {
155 error!(
156 "Reached maximum page limit ({max_pages}) for endpoint '{}', stopping pagination. \
157 Configure 'maxPages' to increase this limit.",
158 endpoint.url
159 );
160 break;
161 }
162
163 let current_key = (current_url.clone(), current_params.clone());
165 if !seen_requests.insert(current_key) {
166 warn!(
167 "Pagination cycle detected for endpoint '{}', stopping",
168 endpoint.url
169 );
170 break;
171 }
172
173 debug!("Fetching page {page_num} from endpoint: {}", endpoint.url);
174
175 let (parsed_body, response_headers) = self
178 .fetch_and_parse_with_retry(
179 ¤t_url,
180 endpoint,
181 auth,
182 ¤t_params,
183 &content_type_override,
184 )
185 .await
186 .with_context(|| {
187 format!(
188 "Failed to fetch from endpoint '{}' (page {page_num})",
189 endpoint.url
190 )
191 })?;
192
193 let items = response::extract_items(&parsed_body, &endpoint.response.items_path)?;
195 let items_count = items.len();
196
197 debug!(
198 "Extracted {items_count} items from page {page_num} of {}",
199 endpoint.url
200 );
201
202 if items_count == 0 {
204 break;
205 }
206
207 let element_results = response::map_items_to_elements(
209 &items,
210 &endpoint.response.mappings,
211 &context.source_id,
212 &self.engine,
213 );
214
215 for result in element_results {
216 match result {
217 Ok(mapped) => {
218 if !should_include_change(&mapped, request) {
220 continue;
221 }
222
223 let source_change = match mapped {
224 response::MappedChange::Upsert { element, operation } => {
225 match operation {
226 OperationType::Insert => SourceChange::Insert { element },
227 OperationType::Update => SourceChange::Update { element },
228 OperationType::Delete => unreachable!(),
229 }
230 }
231 response::MappedChange::Delete { metadata } => {
232 SourceChange::Delete { metadata }
233 }
234 };
235 let sequence = context.next_sequence();
236
237 let bootstrap_event = BootstrapEvent {
238 source_id: context.source_id.clone(),
239 change: source_change,
240 timestamp: Utc::now(),
241 sequence,
242 };
243
244 event_tx
245 .send(bootstrap_event)
246 .await
247 .context("Failed to send bootstrap event")?;
248
249 total_sent += 1;
250 }
251 Err(e) => {
252 warn!("Failed to map item to element: {e}");
253 }
254 }
255 }
256
257 match paginator.as_mut() {
259 Some(ref mut pag) => {
260 match pag.next_page(&parsed_body, &response_headers, items_count)? {
261 Some(NextPage::QueryParams(params)) => {
262 current_params = params;
263 }
264 Some(NextPage::NewUrl(url)) => {
265 current_url = url;
266 current_params = Vec::new();
267 }
268 None => break,
269 }
270 }
271 None => break, }
273 }
274
275 info!(
276 "Completed fetching from endpoint '{}': {} pages, {} elements",
277 endpoint.url, page_num, total_sent
278 );
279
280 Ok(total_sent)
281 }
282
283 async fn fetch_and_parse_with_retry(
289 &self,
290 url: &str,
291 endpoint: &EndpointConfig,
292 auth: &Option<ResolvedAuth>,
293 query_params: &[(String, String)],
294 content_type_override: &Option<ContentType>,
295 ) -> Result<(serde_json::Value, reqwest::header::HeaderMap)> {
296 let max_retries = self.config.max_retries;
297 let retry_delay = Duration::from_millis(self.config.retry_delay_ms);
298
299 let mut last_error = None;
300
301 for attempt in 0..=max_retries {
302 if attempt > 0 {
303 let factor = 1u64.checked_shl(attempt - 1).unwrap_or(u64::MAX);
304 let delay = retry_delay
305 .saturating_mul(factor.min(u32::MAX as u64) as u32)
306 .min(MAX_RETRY_DELAY);
307 debug!("Retry attempt {attempt} after {delay:?} delay");
308 tokio::time::sleep(delay).await;
309 }
310
311 let (response_text, response_headers) =
313 match self.make_request(url, endpoint, auth, query_params).await {
314 Ok(result) => result,
315 Err(e) => {
316 warn!(
317 "Request to endpoint failed (attempt {}/{}): {}",
318 attempt + 1,
319 max_retries + 1,
320 e
321 );
322 last_error = Some(e);
323 continue;
324 }
325 };
326
327 let ct = resolve_content_type(content_type_override, &response_headers);
329
330 match content_parser::parse_body(&response_text, &ct) {
332 Ok(parsed) => return Ok((parsed, response_headers)),
333 Err(e) => {
334 warn!(
337 "Response body parse failed (attempt {}/{}, possible truncation). \
338 Body length: {}, content-type: {ct:?}, error: {e}",
339 attempt + 1,
340 max_retries + 1,
341 response_text.len(),
342 );
343 last_error = Some(e.context(format!(
344 "Failed to parse response from '{}' as {ct:?} \
345 (body length: {}, possible truncation)",
346 url,
347 response_text.len()
348 )));
349 }
350 }
351 }
352
353 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Request failed with no error details")))
354 }
355
356 async fn make_request(
359 &self,
360 url: &str,
361 endpoint: &EndpointConfig,
362 auth: &Option<ResolvedAuth>,
363 query_params: &[(String, String)],
364 ) -> Result<(String, reqwest::header::HeaderMap)> {
365 let mut builder = match endpoint.method {
367 HttpMethod::Get => self.client.get(url),
368 HttpMethod::Post => self.client.post(url),
369 HttpMethod::Put => self.client.put(url),
370 };
371
372 for (key, value) in &endpoint.headers {
374 builder = builder.header(key.as_str(), value.as_str());
375 }
376
377 if !query_params.is_empty() {
379 builder = builder.query(query_params);
380 }
381
382 if let Some(ref body) = endpoint.body {
384 builder = builder.json(body);
385 }
386
387 if let Some(ref resolved_auth) = auth {
389 builder = auth::apply_auth(builder, resolved_auth).await?;
390 }
391
392 let response = builder.send().await.context("HTTP request failed")?;
394
395 if !response.status().is_success() {
396 let status = response.status();
397 let body = response
398 .text()
399 .await
400 .unwrap_or_else(|_| "Unable to read error response".to_string());
401 let truncated = if body.len() > 256 {
402 format!("{}... (truncated)", safe_truncate(&body, 256))
403 } else {
404 body
405 };
406 return Err(anyhow::anyhow!(
407 "HTTP request returned error status {status}: {truncated}"
408 ));
409 }
410
411 let headers = response.headers().clone();
412 let body_text = response
413 .text()
414 .await
415 .context("Failed to read response body")?;
416
417 Ok((body_text, headers))
418 }
419}
420
421fn resolve_content_type(
423 override_ct: &Option<ContentType>,
424 headers: &reqwest::header::HeaderMap,
425) -> ContentType {
426 override_ct.clone().unwrap_or_else(|| {
427 let header_value = headers
428 .get(reqwest::header::CONTENT_TYPE)
429 .and_then(|v| v.to_str().ok());
430 ContentType::from_header(header_value)
431 })
432}
433
434fn should_include_change(change: &response::MappedChange, request: &BootstrapRequest) -> bool {
436 let metadata = match change {
437 response::MappedChange::Upsert { element, .. } => element.get_metadata(),
438 response::MappedChange::Delete { metadata } => metadata,
439 };
440
441 let is_relation = matches!(
444 change,
445 response::MappedChange::Upsert {
446 element: drasi_core::models::Element::Relation { .. },
447 ..
448 }
449 );
450
451 if is_relation {
452 if request.relation_labels.is_empty() {
453 return true;
454 }
455 metadata
456 .labels
457 .iter()
458 .any(|l| request.relation_labels.iter().any(|rl| rl.as_str() == &**l))
459 } else {
460 if request.node_labels.is_empty() {
461 return true;
462 }
463 metadata
464 .labels
465 .iter()
466 .any(|l| request.node_labels.iter().any(|nl| nl.as_str() == &**l))
467 }
468}
469
470#[async_trait]
471impl BootstrapProvider for HttpBootstrapProvider {
472 async fn bootstrap(
473 &self,
474 request: BootstrapRequest,
475 context: &BootstrapContext,
476 event_tx: drasi_lib::channels::BootstrapEventSender,
477 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
478 ) -> Result<BootstrapResult> {
479 info!(
480 "Starting HTTP bootstrap for query {} from source {}",
481 request.query_id, context.source_id
482 );
483
484 let mut total_events: u64 = 0;
485
486 for resolved in &self.endpoints {
487 match self
488 .fetch_endpoint(
489 &resolved.config,
490 &resolved.auth,
491 context,
492 &request,
493 &event_tx,
494 )
495 .await
496 {
497 Ok(count) => {
498 total_events += count;
499 }
500 Err(e) => {
501 error!(
502 "Failed to bootstrap from endpoint '{}': {}",
503 resolved.config.url, e
504 );
505 return Err(e);
506 }
507 }
508 }
509
510 info!(
511 "Completed HTTP bootstrap for query {}: {} total elements",
512 request.query_id, total_events
513 );
514
515 Ok(BootstrapResult {
516 event_count: total_events as usize,
517 source_position: None,
518 })
519 }
520}