use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::Utc;
use log::{debug, error, info, warn};
use reqwest::Client;
use std::collections::HashSet;
use std::time::Duration;
use drasi_core::models::SourceChange;
use drasi_lib::bootstrap::{
BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
};
use drasi_lib::channels::BootstrapEvent;
use crate::auth::{self, ResolvedAuth};
use crate::config::{EndpointConfig, HttpBootstrapConfig, HttpMethod, OperationType};
use crate::content_parser::{self, ContentType};
use crate::pagination::{self, NextPage, Paginator};
use crate::response;
use crate::template_engine::TemplateEngine;
const DEFAULT_MAX_PAGES: u64 = 10_000;
const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
fn safe_truncate(s: &str, max_chars: usize) -> &str {
if s.len() <= max_chars {
return s;
}
let mut end = max_chars;
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
struct ResolvedEndpoint {
config: EndpointConfig,
auth: Option<ResolvedAuth>,
}
pub struct HttpBootstrapProvider {
config: HttpBootstrapConfig,
client: Client,
endpoints: Vec<ResolvedEndpoint>,
engine: TemplateEngine,
}
impl HttpBootstrapProvider {
pub fn new(config: HttpBootstrapConfig) -> Result<Self> {
let timeout = Duration::from_secs(config.timeout_seconds);
let client = Client::builder()
.timeout(timeout)
.http1_only()
.build()
.context("Failed to build HTTP client")?;
let mut endpoints = Vec::new();
for endpoint in &config.endpoints {
let auth = match &endpoint.auth {
Some(auth_config) => Some(
auth::resolve_auth(auth_config, &client)
.context("Failed to resolve authentication")?,
),
None => None,
};
endpoints.push(ResolvedEndpoint {
config: endpoint.clone(),
auth,
});
}
let engine = TemplateEngine::new();
Ok(Self {
config,
client,
endpoints,
engine,
})
}
async fn fetch_endpoint(
&self,
endpoint: &EndpointConfig,
auth: &Option<ResolvedAuth>,
context: &BootstrapContext,
request: &BootstrapRequest,
event_tx: &drasi_lib::channels::BootstrapEventSender,
) -> Result<u64> {
let mut total_sent: u64 = 0;
let content_type_override = endpoint
.response
.content_type
.as_ref()
.map(ContentType::from_override);
let origin_host = pagination::extract_origin_host(&endpoint.url).unwrap_or_default();
let mut paginator: Option<Box<dyn Paginator>> = endpoint
.pagination
.as_ref()
.map(|p| pagination::create_paginator(p, origin_host));
let initial_params: Vec<(String, String)> = paginator
.as_ref()
.map(|p| p.initial_params())
.unwrap_or_default();
let mut current_url = endpoint.url.clone();
let mut current_params = initial_params;
let mut page_num = 0u64;
let mut seen_requests: HashSet<(String, Vec<(String, String)>)> = HashSet::new();
loop {
page_num += 1;
let max_pages = self.config.max_pages.unwrap_or(DEFAULT_MAX_PAGES);
if page_num > max_pages {
error!(
"Reached maximum page limit ({max_pages}) for endpoint '{}', stopping pagination. \
Configure 'maxPages' to increase this limit.",
endpoint.url
);
break;
}
let current_key = (current_url.clone(), current_params.clone());
if !seen_requests.insert(current_key) {
warn!(
"Pagination cycle detected for endpoint '{}', stopping",
endpoint.url
);
break;
}
debug!("Fetching page {page_num} from endpoint: {}", endpoint.url);
let (parsed_body, response_headers) = self
.fetch_and_parse_with_retry(
¤t_url,
endpoint,
auth,
¤t_params,
&content_type_override,
)
.await
.with_context(|| {
format!(
"Failed to fetch from endpoint '{}' (page {page_num})",
endpoint.url
)
})?;
let items = response::extract_items(&parsed_body, &endpoint.response.items_path)?;
let items_count = items.len();
debug!(
"Extracted {items_count} items from page {page_num} of {}",
endpoint.url
);
if items_count == 0 {
break;
}
let element_results = response::map_items_to_elements(
&items,
&endpoint.response.mappings,
&context.source_id,
&self.engine,
);
for result in element_results {
match result {
Ok(mapped) => {
if !should_include_change(&mapped, request) {
continue;
}
let source_change = match mapped {
response::MappedChange::Upsert { element, operation } => {
match operation {
OperationType::Insert => SourceChange::Insert { element },
OperationType::Update => SourceChange::Update { element },
OperationType::Delete => unreachable!(),
}
}
response::MappedChange::Delete { metadata } => {
SourceChange::Delete { metadata }
}
};
let sequence = context.next_sequence();
let bootstrap_event = BootstrapEvent {
source_id: context.source_id.clone(),
change: source_change,
timestamp: Utc::now(),
sequence,
};
event_tx
.send(bootstrap_event)
.await
.context("Failed to send bootstrap event")?;
total_sent += 1;
}
Err(e) => {
warn!("Failed to map item to element: {e}");
}
}
}
match paginator.as_mut() {
Some(ref mut pag) => {
match pag.next_page(&parsed_body, &response_headers, items_count)? {
Some(NextPage::QueryParams(params)) => {
current_params = params;
}
Some(NextPage::NewUrl(url)) => {
current_url = url;
current_params = Vec::new();
}
None => break,
}
}
None => break, }
}
info!(
"Completed fetching from endpoint '{}': {} pages, {} elements",
endpoint.url, page_num, total_sent
);
Ok(total_sent)
}
async fn fetch_and_parse_with_retry(
&self,
url: &str,
endpoint: &EndpointConfig,
auth: &Option<ResolvedAuth>,
query_params: &[(String, String)],
content_type_override: &Option<ContentType>,
) -> Result<(serde_json::Value, reqwest::header::HeaderMap)> {
let max_retries = self.config.max_retries;
let retry_delay = Duration::from_millis(self.config.retry_delay_ms);
let mut last_error = None;
for attempt in 0..=max_retries {
if attempt > 0 {
let factor = 1u64.checked_shl(attempt - 1).unwrap_or(u64::MAX);
let delay = retry_delay
.saturating_mul(factor.min(u32::MAX as u64) as u32)
.min(MAX_RETRY_DELAY);
debug!("Retry attempt {attempt} after {delay:?} delay");
tokio::time::sleep(delay).await;
}
let (response_text, response_headers) =
match self.make_request(url, endpoint, auth, query_params).await {
Ok(result) => result,
Err(e) => {
warn!(
"Request to endpoint failed (attempt {}/{}): {}",
attempt + 1,
max_retries + 1,
e
);
last_error = Some(e);
continue;
}
};
let ct = resolve_content_type(content_type_override, &response_headers);
match content_parser::parse_body(&response_text, &ct) {
Ok(parsed) => return Ok((parsed, response_headers)),
Err(e) => {
warn!(
"Response body parse failed (attempt {}/{}, possible truncation). \
Body length: {}, content-type: {ct:?}, error: {e}",
attempt + 1,
max_retries + 1,
response_text.len(),
);
last_error = Some(e.context(format!(
"Failed to parse response from '{}' as {ct:?} \
(body length: {}, possible truncation)",
url,
response_text.len()
)));
}
}
}
Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Request failed with no error details")))
}
async fn make_request(
&self,
url: &str,
endpoint: &EndpointConfig,
auth: &Option<ResolvedAuth>,
query_params: &[(String, String)],
) -> Result<(String, reqwest::header::HeaderMap)> {
let mut builder = match endpoint.method {
HttpMethod::Get => self.client.get(url),
HttpMethod::Post => self.client.post(url),
HttpMethod::Put => self.client.put(url),
};
for (key, value) in &endpoint.headers {
builder = builder.header(key.as_str(), value.as_str());
}
if !query_params.is_empty() {
builder = builder.query(query_params);
}
if let Some(ref body) = endpoint.body {
builder = builder.json(body);
}
if let Some(ref resolved_auth) = auth {
builder = auth::apply_auth(builder, resolved_auth).await?;
}
let response = builder.send().await.context("HTTP request failed")?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "Unable to read error response".to_string());
let truncated = if body.len() > 256 {
format!("{}... (truncated)", safe_truncate(&body, 256))
} else {
body
};
return Err(anyhow::anyhow!(
"HTTP request returned error status {status}: {truncated}"
));
}
let headers = response.headers().clone();
let body_text = response
.text()
.await
.context("Failed to read response body")?;
Ok((body_text, headers))
}
}
fn resolve_content_type(
override_ct: &Option<ContentType>,
headers: &reqwest::header::HeaderMap,
) -> ContentType {
override_ct.clone().unwrap_or_else(|| {
let header_value = headers
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok());
ContentType::from_header(header_value)
})
}
fn should_include_change(change: &response::MappedChange, request: &BootstrapRequest) -> bool {
let metadata = match change {
response::MappedChange::Upsert { element, .. } => element.get_metadata(),
response::MappedChange::Delete { metadata } => metadata,
};
let is_relation = matches!(
change,
response::MappedChange::Upsert {
element: drasi_core::models::Element::Relation { .. },
..
}
);
if is_relation {
if request.relation_labels.is_empty() {
return true;
}
metadata
.labels
.iter()
.any(|l| request.relation_labels.iter().any(|rl| rl.as_str() == &**l))
} else {
if request.node_labels.is_empty() {
return true;
}
metadata
.labels
.iter()
.any(|l| request.node_labels.iter().any(|nl| nl.as_str() == &**l))
}
}
#[async_trait]
impl BootstrapProvider for HttpBootstrapProvider {
async fn bootstrap(
&self,
request: BootstrapRequest,
context: &BootstrapContext,
event_tx: drasi_lib::channels::BootstrapEventSender,
_settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
) -> Result<BootstrapResult> {
info!(
"Starting HTTP bootstrap for query {} from source {}",
request.query_id, context.source_id
);
let mut total_events: u64 = 0;
for resolved in &self.endpoints {
match self
.fetch_endpoint(
&resolved.config,
&resolved.auth,
context,
&request,
&event_tx,
)
.await
{
Ok(count) => {
total_events += count;
}
Err(e) => {
error!(
"Failed to bootstrap from endpoint '{}': {}",
resolved.config.url, e
);
return Err(e);
}
}
}
info!(
"Completed HTTP bootstrap for query {}: {} total elements",
request.query_id, total_events
);
Ok(BootstrapResult {
event_count: total_events as usize,
last_sequence: None,
sequences_aligned: false,
source_position: None,
})
}
}