Skip to main content

drasi_bootstrap_http/
provider.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP Bootstrap Provider implementation.
16//!
17//! Fetches initial state from HTTP REST APIs and emits graph elements
18//! through the bootstrap event channel.
19
20use anyhow::{Context, Result};
21use async_trait::async_trait;
22use chrono::Utc;
23use log::{debug, error, info, warn};
24use reqwest::Client;
25use std::collections::HashSet;
26use std::time::Duration;
27
28use drasi_core::models::SourceChange;
29use drasi_lib::bootstrap::{
30    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
31};
32use drasi_lib::channels::BootstrapEvent;
33
34use crate::auth::{self, ResolvedAuth};
35use crate::config::{EndpointConfig, HttpBootstrapConfig, HttpMethod, OperationType};
36use crate::content_parser::{self, ContentType};
37use crate::pagination::{self, NextPage, Paginator};
38use crate::response;
39use crate::template_engine::TemplateEngine;
40
41/// Default maximum number of pages to fetch per endpoint to prevent infinite loops.
42const DEFAULT_MAX_PAGES: u64 = 10_000;
43
44/// Maximum backoff delay (60 seconds) to prevent unbounded sleep.
45const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
46
47/// Safely truncate a string to at most `max_chars` characters without panicking
48/// on multi-byte UTF-8 boundaries.
49fn safe_truncate(s: &str, max_chars: usize) -> &str {
50    if s.len() <= max_chars {
51        return s;
52    }
53    let mut end = max_chars;
54    while end > 0 && !s.is_char_boundary(end) {
55        end -= 1;
56    }
57    &s[..end]
58}
59
60/// A resolved endpoint bundling config with its resolved auth.
61struct ResolvedEndpoint {
62    config: EndpointConfig,
63    auth: Option<ResolvedAuth>,
64}
65
66/// HTTP Bootstrap Provider that fetches data from REST APIs.
67pub struct HttpBootstrapProvider {
68    config: HttpBootstrapConfig,
69    client: Client,
70    endpoints: Vec<ResolvedEndpoint>,
71    engine: TemplateEngine,
72}
73
74impl HttpBootstrapProvider {
75    /// Create a new HTTP bootstrap provider from configuration.
76    pub fn new(config: HttpBootstrapConfig) -> Result<Self> {
77        let timeout = Duration::from_secs(config.timeout_seconds);
78        let client = Client::builder()
79            .timeout(timeout)
80            // Force HTTP/1.1 to avoid HTTP/2 multiplexing issues when multiple
81            // bootstrap calls share this client concurrently. H2 frame reassembly
82            // under heavy multiplexing can corrupt large response bodies.
83            .http1_only()
84            .build()
85            .context("Failed to build HTTP client")?;
86
87        // Resolve authentication for each endpoint and bundle them together
88        let mut endpoints = Vec::new();
89        for endpoint in &config.endpoints {
90            let auth = match &endpoint.auth {
91                Some(auth_config) => Some(
92                    auth::resolve_auth(auth_config, &client)
93                        .context("Failed to resolve authentication")?,
94                ),
95                None => None,
96            };
97            endpoints.push(ResolvedEndpoint {
98                config: endpoint.clone(),
99                auth,
100            });
101        }
102
103        let engine = TemplateEngine::new();
104
105        Ok(Self {
106            config,
107            client,
108            endpoints,
109            engine,
110        })
111    }
112
113    /// Fetch all pages from a single endpoint and emit bootstrap events.
114    async fn fetch_endpoint(
115        &self,
116        endpoint: &EndpointConfig,
117        auth: &Option<ResolvedAuth>,
118        context: &BootstrapContext,
119        request: &BootstrapRequest,
120        event_tx: &drasi_lib::channels::BootstrapEventSender,
121    ) -> Result<u64> {
122        let mut total_sent: u64 = 0;
123
124        // Determine content type override
125        let content_type_override = endpoint
126            .response
127            .content_type
128            .as_ref()
129            .map(ContentType::from_override);
130
131        // Set up pagination with SSRF protection (origin host validation)
132        let origin_host = pagination::extract_origin_host(&endpoint.url).unwrap_or_default();
133        let mut paginator: Option<Box<dyn Paginator>> = endpoint
134            .pagination
135            .as_ref()
136            .map(|p| pagination::create_paginator(p, origin_host));
137
138        // Get initial pagination params
139        let initial_params: Vec<(String, String)> = paginator
140            .as_ref()
141            .map(|p| p.initial_params())
142            .unwrap_or_default();
143
144        let mut current_url = endpoint.url.clone();
145        let mut current_params = initial_params;
146        let mut page_num = 0u64;
147        let mut seen_requests: HashSet<(String, Vec<(String, String)>)> = HashSet::new();
148
149        loop {
150            page_num += 1;
151
152            // Prevent infinite loops
153            let max_pages = self.config.max_pages.unwrap_or(DEFAULT_MAX_PAGES);
154            if page_num > max_pages {
155                error!(
156                    "Reached maximum page limit ({max_pages}) for endpoint '{}', stopping pagination. \
157                     Configure 'maxPages' to increase this limit.",
158                    endpoint.url
159                );
160                break;
161            }
162
163            // Detect cycles: same URL + same params seen before
164            let current_key = (current_url.clone(), current_params.clone());
165            if !seen_requests.insert(current_key) {
166                warn!(
167                    "Pagination cycle detected for endpoint '{}', stopping",
168                    endpoint.url
169                );
170                break;
171            }
172
173            debug!("Fetching page {page_num} from endpoint: {}", endpoint.url);
174
175            // Fetch and parse with retry: both transport errors AND body
176            // parsing failures (truncation) trigger retries.
177            let (parsed_body, response_headers) = self
178                .fetch_and_parse_with_retry(
179                    &current_url,
180                    endpoint,
181                    auth,
182                    &current_params,
183                    &content_type_override,
184                )
185                .await
186                .with_context(|| {
187                    format!(
188                        "Failed to fetch from endpoint '{}' (page {page_num})",
189                        endpoint.url
190                    )
191                })?;
192
193            // Extract items
194            let items = response::extract_items(&parsed_body, &endpoint.response.items_path)?;
195            let items_count = items.len();
196
197            debug!(
198                "Extracted {items_count} items from page {page_num} of {}",
199                endpoint.url
200            );
201
202            // If no items, we're done
203            if items_count == 0 {
204                break;
205            }
206
207            // Map items to elements and emit events
208            let element_results = response::map_items_to_elements(
209                &items,
210                &endpoint.response.mappings,
211                &context.source_id,
212                &self.engine,
213            );
214
215            for result in element_results {
216                match result {
217                    Ok(mapped) => {
218                        // Filter by requested labels
219                        if !should_include_change(&mapped, request) {
220                            continue;
221                        }
222
223                        let source_change = match mapped {
224                            response::MappedChange::Upsert { element, operation } => {
225                                match operation {
226                                    OperationType::Insert => SourceChange::Insert { element },
227                                    OperationType::Update => SourceChange::Update { element },
228                                    OperationType::Delete => unreachable!(),
229                                }
230                            }
231                            response::MappedChange::Delete { metadata } => {
232                                SourceChange::Delete { metadata }
233                            }
234                        };
235                        let sequence = context.next_sequence();
236
237                        let bootstrap_event = BootstrapEvent {
238                            source_id: context.source_id.clone(),
239                            change: source_change,
240                            timestamp: Utc::now(),
241                            sequence,
242                        };
243
244                        event_tx
245                            .send(bootstrap_event)
246                            .await
247                            .context("Failed to send bootstrap event")?;
248
249                        total_sent += 1;
250                    }
251                    Err(e) => {
252                        warn!("Failed to map item to element: {e}");
253                    }
254                }
255            }
256
257            // Check pagination for next page
258            match paginator.as_mut() {
259                Some(ref mut pag) => {
260                    match pag.next_page(&parsed_body, &response_headers, items_count)? {
261                        Some(NextPage::QueryParams(params)) => {
262                            current_params = params;
263                        }
264                        Some(NextPage::NewUrl(url)) => {
265                            current_url = url;
266                            current_params = Vec::new();
267                        }
268                        None => break,
269                    }
270                }
271                None => break, // No pagination, single page only
272            }
273        }
274
275        info!(
276            "Completed fetching from endpoint '{}': {} pages, {} elements",
277            endpoint.url, page_num, total_sent
278        );
279
280        Ok(total_sent)
281    }
282
283    /// Fetch a URL with retry logic, including body parsing validation.
284    /// Both transport errors AND body parsing failures (e.g., truncated responses)
285    /// are treated as retriable errors. This handles the case where an intermediary
286    /// (proxy, load balancer, CDN) returns a properly-terminated HTTP response with
287    /// incomplete body content.
288    async fn fetch_and_parse_with_retry(
289        &self,
290        url: &str,
291        endpoint: &EndpointConfig,
292        auth: &Option<ResolvedAuth>,
293        query_params: &[(String, String)],
294        content_type_override: &Option<ContentType>,
295    ) -> Result<(serde_json::Value, reqwest::header::HeaderMap)> {
296        let max_retries = self.config.max_retries;
297        let retry_delay = Duration::from_millis(self.config.retry_delay_ms);
298
299        let mut last_error = None;
300
301        for attempt in 0..=max_retries {
302            if attempt > 0 {
303                let factor = 1u64.checked_shl(attempt - 1).unwrap_or(u64::MAX);
304                let delay = retry_delay
305                    .saturating_mul(factor.min(u32::MAX as u64) as u32)
306                    .min(MAX_RETRY_DELAY);
307                debug!("Retry attempt {attempt} after {delay:?} delay");
308                tokio::time::sleep(delay).await;
309            }
310
311            // Fetch
312            let (response_text, response_headers) =
313                match self.make_request(url, endpoint, auth, query_params).await {
314                    Ok(result) => result,
315                    Err(e) => {
316                        warn!(
317                            "Request to endpoint failed (attempt {}/{}): {}",
318                            attempt + 1,
319                            max_retries + 1,
320                            e
321                        );
322                        last_error = Some(e);
323                        continue;
324                    }
325                };
326
327            // Determine content type
328            let ct = resolve_content_type(content_type_override, &response_headers);
329
330            // Validate: parse response body
331            match content_parser::parse_body(&response_text, &ct) {
332                Ok(parsed) => return Ok((parsed, response_headers)),
333                Err(e) => {
334                    // Body parsing failed — likely truncated response.
335                    // Treat as retriable: the next attempt may get a complete response.
336                    warn!(
337                        "Response body parse failed (attempt {}/{}, possible truncation). \
338                         Body length: {}, content-type: {ct:?}, error: {e}",
339                        attempt + 1,
340                        max_retries + 1,
341                        response_text.len(),
342                    );
343                    last_error = Some(e.context(format!(
344                        "Failed to parse response from '{}' as {ct:?} \
345                         (body length: {}, possible truncation)",
346                        url,
347                        response_text.len()
348                    )));
349                }
350            }
351        }
352
353        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Request failed with no error details")))
354    }
355
356    /// Make a single HTTP request.
357    /// Returns raw response text and headers for proper content-type handling.
358    async fn make_request(
359        &self,
360        url: &str,
361        endpoint: &EndpointConfig,
362        auth: &Option<ResolvedAuth>,
363        query_params: &[(String, String)],
364    ) -> Result<(String, reqwest::header::HeaderMap)> {
365        // Build request
366        let mut builder = match endpoint.method {
367            HttpMethod::Get => self.client.get(url),
368            HttpMethod::Post => self.client.post(url),
369            HttpMethod::Put => self.client.put(url),
370        };
371
372        // Add headers
373        for (key, value) in &endpoint.headers {
374            builder = builder.header(key.as_str(), value.as_str());
375        }
376
377        // Add query parameters
378        if !query_params.is_empty() {
379            builder = builder.query(query_params);
380        }
381
382        // Add request body
383        if let Some(ref body) = endpoint.body {
384            builder = builder.json(body);
385        }
386
387        // Apply auth
388        if let Some(ref resolved_auth) = auth {
389            builder = auth::apply_auth(builder, resolved_auth).await?;
390        }
391
392        // Send request
393        let response = builder.send().await.context("HTTP request failed")?;
394
395        if !response.status().is_success() {
396            let status = response.status();
397            let body = response
398                .text()
399                .await
400                .unwrap_or_else(|_| "Unable to read error response".to_string());
401            let truncated = if body.len() > 256 {
402                format!("{}... (truncated)", safe_truncate(&body, 256))
403            } else {
404                body
405            };
406            return Err(anyhow::anyhow!(
407                "HTTP request returned error status {status}: {truncated}"
408            ));
409        }
410
411        let headers = response.headers().clone();
412        let body_text = response
413            .text()
414            .await
415            .context("Failed to read response body")?;
416
417        Ok((body_text, headers))
418    }
419}
420
421/// Resolve the content type from the override or response headers.
422fn resolve_content_type(
423    override_ct: &Option<ContentType>,
424    headers: &reqwest::header::HeaderMap,
425) -> ContentType {
426    override_ct.clone().unwrap_or_else(|| {
427        let header_value = headers
428            .get(reqwest::header::CONTENT_TYPE)
429            .and_then(|v| v.to_str().ok());
430        ContentType::from_header(header_value)
431    })
432}
433
434/// Check if an element should be included based on the bootstrap request's label filters.
435fn should_include_change(change: &response::MappedChange, request: &BootstrapRequest) -> bool {
436    let metadata = match change {
437        response::MappedChange::Upsert { element, .. } => element.get_metadata(),
438        response::MappedChange::Delete { metadata } => metadata,
439    };
440
441    // Determine if this is a relation by checking element type for Upsert,
442    // or fall back to checking relation_labels for Delete
443    let is_relation = matches!(
444        change,
445        response::MappedChange::Upsert {
446            element: drasi_core::models::Element::Relation { .. },
447            ..
448        }
449    );
450
451    if is_relation {
452        if request.relation_labels.is_empty() {
453            return true;
454        }
455        metadata
456            .labels
457            .iter()
458            .any(|l| request.relation_labels.iter().any(|rl| rl.as_str() == &**l))
459    } else {
460        if request.node_labels.is_empty() {
461            return true;
462        }
463        metadata
464            .labels
465            .iter()
466            .any(|l| request.node_labels.iter().any(|nl| nl.as_str() == &**l))
467    }
468}
469
470#[async_trait]
471impl BootstrapProvider for HttpBootstrapProvider {
472    async fn bootstrap(
473        &self,
474        request: BootstrapRequest,
475        context: &BootstrapContext,
476        event_tx: drasi_lib::channels::BootstrapEventSender,
477        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
478    ) -> Result<BootstrapResult> {
479        info!(
480            "Starting HTTP bootstrap for query {} from source {}",
481            request.query_id, context.source_id
482        );
483
484        let mut total_events: u64 = 0;
485
486        for resolved in &self.endpoints {
487            match self
488                .fetch_endpoint(
489                    &resolved.config,
490                    &resolved.auth,
491                    context,
492                    &request,
493                    &event_tx,
494                )
495                .await
496            {
497                Ok(count) => {
498                    total_events += count;
499                }
500                Err(e) => {
501                    error!(
502                        "Failed to bootstrap from endpoint '{}': {}",
503                        resolved.config.url, e
504                    );
505                    return Err(e);
506                }
507            }
508        }
509
510        info!(
511            "Completed HTTP bootstrap for query {}: {} total elements",
512            request.query_id, total_events
513        );
514
515        Ok(BootstrapResult {
516            event_count: total_events as usize,
517            source_position: None,
518        })
519    }
520}