Skip to main content

stygian_graph/adapters/
rest_api.rs

1//! REST API scraping adapter with authentication and pagination support.
2//!
3//! Implements [`crate::ports::ScrapingService`] for structured REST JSON APIs. Supports:
4//!
5//! - HTTP methods: `GET`, `POST`, `PUT`, `PATCH`, `DELETE`, `HEAD`
6//! - Authentication: Bearer token, HTTP Basic, API key (header or query param)
7//! - Automatic pagination: offset/page, cursor, or RFC 8288 `Link` header
8//! - JSON response data extraction via dot-separated path
9//! - Custom request headers and query string parameters
10//! - Configurable retries with exponential backoff
11//!
12//! All per-request options live in `ServiceInput::params`; see the
13//! `RestApiAdapter::execute` docs for the full contract.
14//!
15//! # Example
16//!
17//! ```no_run
18//! use stygian_graph::adapters::rest_api::{RestApiAdapter, RestApiConfig};
19//! use stygian_graph::ports::{ScrapingService, ServiceInput};
20//! use serde_json::json;
21//! use std::time::Duration;
22//!
23//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
24//! let adapter = RestApiAdapter::with_config(RestApiConfig {
25//!     timeout:      Duration::from_secs(20),
26//!     max_retries:  2,
27//!     ..Default::default()
28//! });
29//!
30//! let input = ServiceInput {
31//!     url: "https://api.github.com/repos/rust-lang/rust/issues".to_string(),
32//!     params: json!({
33//!         "auth": { "type": "bearer", "token": "ghp_..." },
34//!         "query": { "state": "open", "per_page": "30" },
35//!         "pagination": { "strategy": "link_header", "max_pages": 5 },
36//!         "response": { "data_path": "" }
37//!     }),
38//! };
39//! // let output = adapter.execute(input).await.unwrap();
40//! # });
41//! ```
42
43use std::collections::HashMap;
44use std::time::Duration;
45
46use async_trait::async_trait;
47use reqwest::{Client, Method, Proxy, header};
48use serde_json::{Value, json};
49use tracing::{debug, info, warn};
50
51use crate::domain::error::{Result, ServiceError, StygianError};
52use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
53
54// ─── Config ───────────────────────────────────────────────────────────────────
55
56/// Configuration for [`RestApiAdapter`].
57///
58/// Adapter-level defaults; per-request settings come from `ServiceInput.params`.
59///
60/// # Example
61///
62/// ```
63/// use stygian_graph::adapters::rest_api::RestApiConfig;
64/// use std::time::Duration;
65///
66/// let cfg = RestApiConfig {
67///     timeout:          Duration::from_secs(20),
68///     max_retries:      2,
69///     retry_base_delay: Duration::from_millis(500),
70///     proxy_url:        None,
71/// };
72/// ```
73#[derive(Debug, Clone)]
74pub struct RestApiConfig {
75    /// Per-request timeout (default: 30 s).
76    pub timeout: Duration,
77    /// Maximum retry attempts per page request on transient errors (default: 3).
78    pub max_retries: u32,
79    /// Base delay for exponential backoff (default: 1 s).
80    pub retry_base_delay: Duration,
81    /// Optional HTTP/HTTPS/SOCKS5 proxy URL.
82    pub proxy_url: Option<String>,
83}
84
85impl Default for RestApiConfig {
86    fn default() -> Self {
87        Self {
88            timeout: Duration::from_secs(30),
89            max_retries: 3,
90            retry_base_delay: Duration::from_secs(1),
91            proxy_url: None,
92        }
93    }
94}
95
96// ─── Internal request model ───────────────────────────────────────────────────
97
98/// Authentication scheme, parsed from `params.auth`.
99#[derive(Debug, Clone)]
100enum AuthScheme {
101    /// No authentication.
102    None,
103    /// `Authorization: Bearer <token>`
104    Bearer(String),
105    /// HTTP Basic authentication.
106    Basic { username: String, password: String },
107    /// Arbitrary header: `<header>: <key>`
108    ApiKeyHeader { header: String, key: String },
109    /// Append `?<param>=<key>` to the query string.
110    ApiKeyQuery { param: String, key: String },
111}
112
113/// Request body variant.
114#[derive(Debug, Clone)]
115enum RequestBody {
116    Json(Value),
117    Raw(String),
118}
119
120/// How to advance to the next page.
121#[derive(Debug, Clone)]
122enum PaginationStrategy {
123    /// Single request — no pagination.
124    None,
125    /// Increment a page/offset query parameter.
126    Offset {
127        page_param: String,
128        page_size_param: Option<String>,
129        page_size: Option<u64>,
130        current_page: u64,
131    },
132    /// Follow a cursor embedded in the response JSON.
133    Cursor {
134        /// Query parameter name that carries the cursor on subsequent requests.
135        cursor_param: String,
136        /// Dot-separated path into the response JSON where the next cursor lives.
137        cursor_field: String,
138    },
139    /// Follow RFC 8288 `Link: <URL>; rel="next"` response header.
140    LinkHeader,
141}
142
143/// Fully-parsed per-request specification, derived from `ServiceInput.params`.
144#[derive(Debug, Clone)]
145struct RequestSpec {
146    method: Method,
147    extra_headers: HashMap<String, String>,
148    query_params: HashMap<String, String>,
149    body: Option<RequestBody>,
150    auth: AuthScheme,
151    accept: String,
152    /// Dot-separated path into the JSON response to extract as data.
153    /// `None` means use the full response body.
154    data_path: Option<String>,
155    /// Return paged data as a flat JSON array even when only one page was fetched.
156    collect_as_array: bool,
157    pagination: PaginationStrategy,
158    max_pages: usize,
159}
160
161// ─── Adapter ──────────────────────────────────────────────────────────────────
162
163/// REST API scraping adapter.
164///
165/// Thread-safe and cheaply cloneable — the inner `reqwest::Client` uses `Arc`
166/// internally. Build once, share across tasks.
167///
168/// # Example
169///
170/// ```
171/// use stygian_graph::adapters::rest_api::RestApiAdapter;
172///
173/// let adapter = RestApiAdapter::new();
174/// ```
175#[derive(Clone)]
176pub struct RestApiAdapter {
177    client: Client,
178    config: RestApiConfig,
179}
180
181impl RestApiAdapter {
182    /// Create a new adapter with default configuration.
183    ///
184    /// # Example
185    ///
186    /// ```
187    /// use stygian_graph::adapters::rest_api::RestApiAdapter;
188    /// let adapter = RestApiAdapter::new();
189    /// ```
190    #[must_use]
191    pub fn new() -> Self {
192        Self::with_config(RestApiConfig::default())
193    }
194
195    /// Create an adapter with custom configuration.
196    ///
197    /// # Panics
198    ///
199    /// Panics only if TLS is unavailable on the host (extremely rare).
200    ///
201    /// # Example
202    ///
203    /// ```
204    /// use stygian_graph::adapters::rest_api::{RestApiAdapter, RestApiConfig};
205    /// use std::time::Duration;
206    ///
207    /// let adapter = RestApiAdapter::with_config(RestApiConfig {
208    ///     timeout: Duration::from_secs(10),
209    ///     ..Default::default()
210    /// });
211    /// ```
212    #[must_use]
213    pub fn with_config(config: RestApiConfig) -> Self {
214        let mut builder = Client::builder()
215            .timeout(config.timeout)
216            .gzip(true)
217            .brotli(true)
218            .use_rustls_tls();
219
220        if let Some(ref proxy_url) = config.proxy_url
221            && let Ok(proxy) = Proxy::all(proxy_url)
222        {
223            builder = builder.proxy(proxy);
224        }
225
226        // SAFETY: TLS via rustls is always available; build() can only fail if the
227        // TLS backend is completely absent, which cannot happen with use_rustls_tls().
228        #[allow(clippy::expect_used)]
229        let client = builder.build().expect("TLS backend unavailable");
230
231        Self { client, config }
232    }
233
234    /// Resolve a dot-separated path into a JSON [`Value`].
235    ///
236    /// Returns `None` if any path segment is missing.
237    ///
238    /// # Example
239    ///
240    /// ```
241    /// use serde_json::json;
242    /// use stygian_graph::adapters::rest_api::RestApiAdapter;
243    ///
244    /// let v = json!({"meta": {"next": "abc123"}});
245    /// assert_eq!(
246    ///     RestApiAdapter::extract_path(&v, "meta.next"),
247    ///     Some(&json!("abc123"))
248    /// );
249    /// assert!(RestApiAdapter::extract_path(&v, "meta.gone").is_none());
250    /// ```
251    #[must_use]
252    pub fn extract_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
253        let mut current = value;
254        for segment in path.split('.') {
255            current = current.get(segment)?;
256        }
257        Some(current)
258    }
259
260    /// Parse an RFC 8288 `Link` header and return the `rel="next"` URL, if any.
261    ///
262    /// # Example
263    ///
264    /// ```
265    /// use stygian_graph::adapters::rest_api::RestApiAdapter;
266    ///
267    /// let link = r#"<https://api.example.com/items?page=2>; rel="next", <https://api.example.com/items?page=1>; rel="prev""#;
268    /// assert_eq!(
269    ///     RestApiAdapter::parse_link_next(link),
270    ///     Some("https://api.example.com/items?page=2".to_owned())
271    /// );
272    /// ```
273    #[must_use]
274    pub fn parse_link_next(link_header: &str) -> Option<String> {
275        for part in link_header.split(',') {
276            let part = part.trim();
277            let mut url: Option<String> = None;
278            let mut is_next = false;
279            for segment in part.split(';') {
280                let segment = segment.trim();
281                if segment.starts_with('<') && segment.ends_with('>') {
282                    url = Some(segment[1..segment.len() - 1].to_owned());
283                } else if segment.trim_start_matches("rel=").trim_matches('"') == "next" {
284                    is_next = true;
285                }
286            }
287            if is_next {
288                return url;
289            }
290        }
291        None
292    }
293
294    /// Parse `ServiceInput.params` into a `RequestSpec`.
295    #[allow(clippy::indexing_slicing)]
296    fn parse_spec(params: &Value) -> Result<RequestSpec> {
297        let method_str = params["method"].as_str().unwrap_or("GET").to_uppercase();
298        let method = match method_str.as_str() {
299            "GET" => Method::GET,
300            "POST" => Method::POST,
301            "PUT" => Method::PUT,
302            "PATCH" => Method::PATCH,
303            "DELETE" => Method::DELETE,
304            "HEAD" => Method::HEAD,
305            other => {
306                return Err(StygianError::from(ServiceError::Unavailable(format!(
307                    "unknown HTTP method: {other}"
308                ))));
309            }
310        };
311
312        let extra_headers = params["headers"]
313            .as_object()
314            .map(|obj| {
315                obj.iter()
316                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_owned())))
317                    .collect()
318            })
319            .unwrap_or_default();
320
321        let query_params = params["query"]
322            .as_object()
323            .map(|obj| {
324                obj.iter()
325                    .filter_map(|(k, v)| {
326                        let s = if v.is_string() {
327                            v.as_str().map(ToOwned::to_owned)
328                        } else {
329                            Some(v.to_string())
330                        };
331                        s.map(|val| (k.clone(), val))
332                    })
333                    .collect()
334            })
335            .unwrap_or_default();
336
337        // body_raw takes precedence over body (raw string vs structured JSON).
338        let body = match params["body_raw"].as_str().filter(|s| !s.is_empty()) {
339            Some(raw) => Some(RequestBody::Raw(raw.to_owned())),
340            None if !params["body"].is_null() => Some(RequestBody::Json(params["body"].clone())),
341            None => None,
342        };
343
344        let accept = params["accept"]
345            .as_str()
346            .unwrap_or("application/json")
347            .to_owned();
348
349        let auth = Self::parse_auth(&params["auth"]);
350
351        let data_path = match params["response"]["data_path"].as_str() {
352            Some("") | None => None,
353            Some(p) => Some(p.to_owned()),
354        };
355        let collect_as_array = params["response"]["collect_as_array"]
356            .as_bool()
357            .unwrap_or(false);
358
359        let max_pages = params["pagination"]["max_pages"]
360            .as_u64()
361            .map_or(1, |n| usize::try_from(n).unwrap_or(usize::MAX));
362
363        let pagination = Self::parse_pagination(&params["pagination"]);
364
365        Ok(RequestSpec {
366            method,
367            extra_headers,
368            query_params,
369            body,
370            auth,
371            accept,
372            data_path,
373            collect_as_array,
374            pagination,
375            max_pages,
376        })
377    }
378
379    /// Parse `params.auth` into an [`AuthScheme`].
380    #[allow(clippy::indexing_slicing)]
381    fn parse_auth(auth: &Value) -> AuthScheme {
382        match auth["type"].as_str().unwrap_or("none") {
383            "bearer" | "oauth2" => auth["token"]
384                .as_str()
385                .map_or(AuthScheme::None, |t| AuthScheme::Bearer(t.to_owned())),
386            "basic" => AuthScheme::Basic {
387                username: auth["username"].as_str().unwrap_or("").to_owned(),
388                password: auth["password"].as_str().unwrap_or("").to_owned(),
389            },
390            "api_key_header" => AuthScheme::ApiKeyHeader {
391                header: auth["header"].as_str().unwrap_or("X-Api-Key").to_owned(),
392                key: auth["key"].as_str().unwrap_or("").to_owned(),
393            },
394            "api_key_query" => AuthScheme::ApiKeyQuery {
395                param: auth["param"].as_str().unwrap_or("api_key").to_owned(),
396                key: auth["key"].as_str().unwrap_or("").to_owned(),
397            },
398            _ => AuthScheme::None,
399        }
400    }
401
402    /// Parse `params.pagination` into a [`PaginationStrategy`].
403    #[allow(clippy::indexing_slicing)]
404    fn parse_pagination(pag: &Value) -> PaginationStrategy {
405        match pag["strategy"].as_str().unwrap_or("none") {
406            "offset" => PaginationStrategy::Offset {
407                page_param: pag["page_param"].as_str().unwrap_or("page").to_owned(),
408                page_size_param: pag["page_size_param"].as_str().map(ToOwned::to_owned),
409                page_size: pag["page_size"].as_u64(),
410                current_page: pag["start_page"].as_u64().unwrap_or(1),
411            },
412            "cursor" => PaginationStrategy::Cursor {
413                cursor_param: pag["cursor_param"].as_str().unwrap_or("cursor").to_owned(),
414                cursor_field: pag["cursor_field"]
415                    .as_str()
416                    .unwrap_or("next_cursor")
417                    .to_owned(),
418            },
419            "link_header" => PaginationStrategy::LinkHeader,
420            _ => PaginationStrategy::None,
421        }
422    }
423
424    /// Extract the data portion of a parsed response using `spec.data_path`.
425    fn extract_data(response: &Value, spec: &RequestSpec) -> Value {
426        spec.data_path
427            .as_deref()
428            .and_then(|path| Self::extract_path(response, path))
429            .cloned()
430            .unwrap_or_else(|| response.clone())
431    }
432
433    /// Execute a single HTTP request, retrying on transient failures.
434    async fn send_one(
435        &self,
436        url: &str,
437        spec: &RequestSpec,
438        extra_query: &HashMap<String, String>,
439    ) -> Result<(Value, Option<String>)> {
440        let mut last_err: Option<StygianError> = None;
441
442        for attempt in 0..=self.config.max_retries {
443            if attempt > 0 {
444                // Honour server Retry-After when available; otherwise exponential backoff.
445                let delay = match &last_err {
446                    Some(StygianError::Service(ServiceError::RateLimited { retry_after_ms })) => {
447                        Duration::from_millis(*retry_after_ms)
448                    }
449                    _ => self.config.retry_base_delay * 2u32.saturating_pow(attempt - 1),
450                };
451                tokio::time::sleep(delay).await;
452                debug!(url, attempt, ?delay, "REST API retry");
453            }
454
455            match self.do_send(url, spec, extra_query).await {
456                Ok(r) => return Ok(r),
457                Err(e) if is_retryable(&e) && attempt < self.config.max_retries => {
458                    last_err = Some(e);
459                }
460                Err(e) => return Err(e),
461            }
462        }
463
464        Err(last_err.unwrap_or_else(|| {
465            StygianError::from(ServiceError::Unavailable("max retries exceeded".into()))
466        }))
467    }
468
469    /// Perform exactly one HTTP round-trip (no retry).
470    ///
471    /// Returns the parsed JSON response body and the raw `Link` header value (if present).
472    async fn do_send(
473        &self,
474        url: &str,
475        spec: &RequestSpec,
476        extra_query: &HashMap<String, String>,
477    ) -> Result<(Value, Option<String>)> {
478        let mut req = self.client.request(spec.method.clone(), url);
479
480        // Accept header
481        req = req.header(header::ACCEPT, spec.accept.as_str());
482
483        // Auth — header-based schemes
484        req = match &spec.auth {
485            AuthScheme::Bearer(token) => req.bearer_auth(token),
486            AuthScheme::Basic { username, password } => req.basic_auth(username, Some(password)),
487            AuthScheme::ApiKeyHeader { header: hdr, key } => req.header(hdr.as_str(), key.as_str()),
488            AuthScheme::ApiKeyQuery { .. } | AuthScheme::None => req,
489        };
490
491        // Custom headers
492        for (k, v) in &spec.extra_headers {
493            req = req.header(k.as_str(), v.as_str());
494        }
495
496        // Merge query params: static + per-page extra + API key query (if applicable)
497        let mut merged: HashMap<String, String> = spec.query_params.clone();
498        merged.extend(extra_query.iter().map(|(k, v)| (k.clone(), v.clone())));
499        if let AuthScheme::ApiKeyQuery { param, key } = &spec.auth {
500            merged.insert(param.clone(), key.clone());
501        }
502        if !merged.is_empty() {
503            let pairs: Vec<(&String, &String)> = merged.iter().collect();
504            req = req.query(&pairs);
505        }
506
507        // Body
508        req = match &spec.body {
509            Some(RequestBody::Json(v)) => req.json(v),
510            Some(RequestBody::Raw(s)) => req.body(s.clone()),
511            None => req,
512        };
513
514        let response = req
515            .send()
516            .await
517            .map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
518
519        let status = response.status();
520
521        // Capture Link header before consuming the response
522        let link_header = response
523            .headers()
524            .get("link")
525            .and_then(|v| v.to_str().ok())
526            .map(ToOwned::to_owned);
527
528        // 429 — honour server Retry-After hint via dedicated error variant.
529        if status.as_u16() == 429 {
530            let retry_after_secs = response
531                .headers()
532                .get("retry-after")
533                .and_then(|v| v.to_str().ok())
534                .and_then(|s| s.parse::<u64>().ok())
535                .unwrap_or(5);
536            warn!(url, retry_after_secs, "REST API rate-limited (429)");
537            return Err(StygianError::from(ServiceError::RateLimited {
538                retry_after_ms: retry_after_secs.saturating_mul(1000),
539            }));
540        }
541
542        if !status.is_success() {
543            let snippet: String = response
544                .text()
545                .await
546                .unwrap_or_default()
547                .chars()
548                .take(200)
549                .collect();
550            return Err(StygianError::from(ServiceError::Unavailable(format!(
551                "HTTP {status}: {snippet}"
552            ))));
553        }
554
555        let body = response
556            .text()
557            .await
558            .map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
559
560        // Parse as JSON when possible; wrap plain text as a JSON string otherwise.
561        let parsed: Value = serde_json::from_str(&body).unwrap_or(Value::String(body));
562
563        Ok((parsed, link_header))
564    }
565}
566
567impl Default for RestApiAdapter {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573// ─── Helpers ──────────────────────────────────────────────────────────────────
574
575/// Returns `true` for transient errors that are worth retrying.
576fn is_retryable(err: &StygianError) -> bool {
577    match err {
578        StygianError::Service(ServiceError::RateLimited { .. }) => true,
579        StygianError::Service(ServiceError::Unavailable(msg)) => {
580            msg.contains("429")
581                || msg.contains("500")
582                || msg.contains("502")
583                || msg.contains("503")
584                || msg.contains("504")
585                || msg.contains("connection")
586                || msg.contains("timed out")
587        }
588        _ => false,
589    }
590}
591
592// ─── ScrapingService ──────────────────────────────────────────────────────────
593
594#[async_trait]
595impl ScrapingService for RestApiAdapter {
596    /// Execute one or more REST API requests and return the aggregated result.
597    ///
598    /// # `ServiceInput.url`
599    ///
600    /// Base URL of the REST endpoint (including path; query string is optional).
601    ///
602    /// # `ServiceInput.params` contract
603    ///
604    /// ```json
605    /// {
606    ///   "method":   "GET",
607    ///   "body":     { "key": "value" },
608    ///   "body_raw": "raw body string",
609    ///   "headers":  { "X-Custom-Header": "value" },
610    ///   "query":    { "state": "open", "per_page": "30" },
611    ///   "accept":   "application/json",
612    ///
613    ///   "auth": {
614    ///     "type":     "bearer",
615    ///     "token":    "...",
616    ///     "username": "user",
617    ///     "password": "pass",
618    ///     "header":   "X-Api-Key",
619    ///     "param":    "api_key",
620    ///     "key":      "sk-..."
621    ///   },
622    ///
623    ///   "response": {
624    ///     "data_path":        "items",
625    ///     "collect_as_array": true
626    ///   },
627    ///
628    ///   "pagination": {
629    ///     "strategy":        "link_header",
630    ///     "max_pages":       10,
631    ///     "page_param":      "page",
632    ///     "page_size_param": "per_page",
633    ///     "page_size":       100,
634    ///     "start_page":      1,
635    ///     "cursor_param":    "cursor",
636    ///     "cursor_field":    "meta.next_cursor"
637    ///   }
638    /// }
639    /// ```
640    ///
641    /// # Auth `type` values
642    ///
643    /// | `type` | Required fields | Description |
644    /// | --- | --- | --- |
645    /// | `"bearer"` / `"oauth2"` | `token` | `Authorization: Bearer <token>` |
646    /// | `"basic"` | `username`, `password` | HTTP Basic |
647    /// | `"api_key_header"` | `header`, `key` | Custom header |
648    /// | `"api_key_query"` | `param`, `key` | Query string |
649    /// | `"none"` or absent | — | No auth |
650    ///
651    /// # Pagination strategies
652    ///
653    /// | `strategy` | Description |
654    /// | --- | --- |
655    /// | `"none"` | Single request (default) |
656    /// | `"offset"` | Increment `page_param` from `start_page` |
657    /// | `"cursor"` | Extract next cursor at `cursor_field` in each response; pass it as `cursor_param` |
658    /// | `"link_header"` | Follow RFC 8288 `Link: <url>; rel="next"` header |
659    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
660        let spec = Self::parse_spec(&input.params)?;
661
662        let mut accumulated: Vec<Value> = Vec::new();
663        let mut page_count: usize = 0;
664        let mut current_url = input.url.clone();
665        let mut pagination = spec.pagination.clone();
666        let mut extra_query: HashMap<String, String> = HashMap::new();
667
668        // Cursor state lives outside the loop so it persists across pages.
669        let mut cursor_state: Option<String> = None;
670
671        info!(url = %input.url, "REST API execute start");
672
673        loop {
674            if page_count >= spec.max_pages {
675                debug!(%current_url, page_count, "REST API: max_pages reached");
676                break;
677            }
678
679            // Build per-page query additions
680            extra_query.clear();
681            match &pagination {
682                PaginationStrategy::Offset {
683                    page_param,
684                    page_size_param,
685                    page_size,
686                    current_page,
687                } => {
688                    extra_query.insert(page_param.clone(), current_page.to_string());
689                    if let (Some(size_param), Some(size)) = (page_size_param, page_size) {
690                        extra_query.insert(size_param.clone(), size.to_string());
691                    }
692                }
693                PaginationStrategy::Cursor { cursor_param, .. } => {
694                    if let Some(ref cursor) = cursor_state {
695                        extra_query.insert(cursor_param.clone(), cursor.clone());
696                    }
697                }
698                PaginationStrategy::None | PaginationStrategy::LinkHeader => {}
699            }
700
701            let (response, link_header) = self.send_one(&current_url, &spec, &extra_query).await?;
702
703            let page_data = Self::extract_data(&response, &spec);
704
705            // Accumulate — empty array responses signal end-of-pagination.
706            match &page_data {
707                Value::Array(items) => {
708                    if items.is_empty() {
709                        debug!("REST API: empty page, stopping pagination");
710                        break;
711                    }
712                    accumulated.extend(items.iter().cloned());
713                }
714                other => {
715                    accumulated.push(other.clone());
716                }
717            }
718            page_count += 1;
719
720            // Advance pagination state
721            let stop = match &mut pagination {
722                PaginationStrategy::None => true,
723                PaginationStrategy::Offset { current_page, .. } => {
724                    *current_page += 1;
725                    false
726                }
727                PaginationStrategy::Cursor { cursor_field, .. } => {
728                    Self::extract_path(&response, cursor_field.as_str())
729                        .and_then(Value::as_str)
730                        .filter(|s| !s.is_empty())
731                        .map(ToOwned::to_owned)
732                        .is_none_or(|cursor| {
733                            cursor_state = Some(cursor);
734                            false
735                        })
736                }
737                PaginationStrategy::LinkHeader => link_header
738                    .as_deref()
739                    .and_then(Self::parse_link_next)
740                    .is_none_or(|next_url| {
741                        current_url = next_url;
742                        false
743                    }),
744            };
745            if stop {
746                break;
747            }
748        }
749
750        // Serialise accumulated results
751        let data_value = if spec.collect_as_array || accumulated.len() > 1 {
752            Value::Array(accumulated)
753        } else {
754            accumulated.into_iter().next().unwrap_or(Value::Null)
755        };
756
757        let data_str = match &data_value {
758            Value::String(s) => s.clone(),
759            other => serde_json::to_string_pretty(other).unwrap_or_default(),
760        };
761
762        let metadata = json!({
763            "url":        input.url,
764            "page_count": page_count,
765        });
766
767        info!(%input.url, page_count, "REST API execute done");
768
769        Ok(ServiceOutput {
770            data: data_str,
771            metadata,
772        })
773    }
774
775    fn name(&self) -> &'static str {
776        "rest-api"
777    }
778}
779
780// ─── Tests ────────────────────────────────────────────────────────────────────
781
782#[cfg(test)]
783#[allow(clippy::unwrap_used, clippy::panic, clippy::indexing_slicing)]
784mod tests {
785    use super::*;
786    use serde_json::json;
787
788    // ── parse_auth ─────────────────────────────────────────────────────────────
789
790    #[test]
791    fn parse_auth_bearer() {
792        let auth = json!({"type": "bearer", "token": "tok123"});
793        match RestApiAdapter::parse_auth(&auth) {
794            AuthScheme::Bearer(t) => assert_eq!(t, "tok123"),
795            other => panic!("unexpected: {other:?}"),
796        }
797    }
798
799    #[test]
800    fn parse_auth_oauth2_alias() {
801        let auth = json!({"type": "oauth2", "token": "oauth_tok"});
802        match RestApiAdapter::parse_auth(&auth) {
803            AuthScheme::Bearer(t) => assert_eq!(t, "oauth_tok"),
804            other => panic!("unexpected: {other:?}"),
805        }
806    }
807
808    #[test]
809    fn parse_auth_basic() {
810        let auth = json!({"type": "basic", "username": "alice", "password": "s3cr3t"});
811        match RestApiAdapter::parse_auth(&auth) {
812            AuthScheme::Basic { username, password } => {
813                assert_eq!(username, "alice");
814                assert_eq!(password, "s3cr3t");
815            }
816            other => panic!("unexpected: {other:?}"),
817        }
818    }
819
820    #[test]
821    fn parse_auth_api_key_header() {
822        let auth = json!({"type": "api_key_header", "header": "X-Token", "key": "k123"});
823        match RestApiAdapter::parse_auth(&auth) {
824            AuthScheme::ApiKeyHeader { header, key } => {
825                assert_eq!(header, "X-Token");
826                assert_eq!(key, "k123");
827            }
828            other => panic!("unexpected: {other:?}"),
829        }
830    }
831
832    #[test]
833    fn parse_auth_api_key_query() {
834        let auth = json!({"type": "api_key_query", "param": "api_key", "key": "qk"});
835        match RestApiAdapter::parse_auth(&auth) {
836            AuthScheme::ApiKeyQuery { param, key } => {
837                assert_eq!(param, "api_key");
838                assert_eq!(key, "qk");
839            }
840            other => panic!("unexpected: {other:?}"),
841        }
842    }
843
844    #[test]
845    fn parse_auth_none_default() {
846        let auth = json!(null);
847        assert!(matches!(
848            RestApiAdapter::parse_auth(&auth),
849            AuthScheme::None
850        ));
851    }
852
853    // ── extract_path ───────────────────────────────────────────────────────────
854
855    #[test]
856    fn extract_path_top_level() {
857        let v = json!({"items": [1, 2, 3]});
858        assert_eq!(
859            RestApiAdapter::extract_path(&v, "items"),
860            Some(&json!([1, 2, 3]))
861        );
862    }
863
864    #[test]
865    fn extract_path_nested() {
866        let v = json!({"meta": {"next_cursor": "abc"}});
867        assert_eq!(
868            RestApiAdapter::extract_path(&v, "meta.next_cursor"),
869            Some(&json!("abc"))
870        );
871    }
872
873    #[test]
874    fn extract_path_missing() {
875        let v = json!({"a": {"b": 1}});
876        assert!(RestApiAdapter::extract_path(&v, "a.c").is_none());
877    }
878
879    // ── parse_link_next ────────────────────────────────────────────────────────
880
881    #[test]
882    fn parse_link_next_present() {
883        let h = r#"<https://api.example.com/items?page=2>; rel="next", <https://api.example.com/items?page=1>; rel="prev""#;
884        assert_eq!(
885            RestApiAdapter::parse_link_next(h),
886            Some("https://api.example.com/items?page=2".to_owned())
887        );
888    }
889
890    #[test]
891    fn parse_link_next_absent() {
892        let h = r#"<https://api.example.com/items?page=1>; rel="prev""#;
893        assert!(RestApiAdapter::parse_link_next(h).is_none());
894    }
895
896    #[test]
897    fn parse_link_next_single() {
898        let h = r#"<https://api.example.com/items?page=3>; rel="next""#;
899        assert_eq!(
900            RestApiAdapter::parse_link_next(h),
901            Some("https://api.example.com/items?page=3".to_owned())
902        );
903    }
904
905    // ── parse_spec ─────────────────────────────────────────────────────────────
906
907    #[test]
908    fn parse_spec_defaults() {
909        let spec = RestApiAdapter::parse_spec(&json!({})).unwrap();
910        assert_eq!(spec.method, Method::GET);
911        assert_eq!(spec.accept, "application/json");
912        assert_eq!(spec.max_pages, 1);
913        assert!(spec.data_path.is_none());
914        assert!(!spec.collect_as_array);
915        assert!(matches!(spec.pagination, PaginationStrategy::None));
916    }
917
918    #[test]
919    fn parse_spec_post_with_body_and_headers() {
920        let params = json!({
921            "method":  "POST",
922            "body":    { "key": "value" },
923            "headers": { "X-Foo": "bar" },
924            "query":   { "limit": "10" }
925        });
926        let spec = RestApiAdapter::parse_spec(&params).unwrap();
927        assert_eq!(spec.method, Method::POST);
928        assert_eq!(spec.extra_headers.get("X-Foo"), Some(&"bar".to_string()));
929        assert_eq!(spec.query_params.get("limit"), Some(&"10".to_string()));
930        assert!(matches!(spec.body, Some(RequestBody::Json(_))));
931    }
932
933    #[test]
934    fn parse_spec_unknown_method_returns_error() {
935        let result = RestApiAdapter::parse_spec(&json!({"method": "BREW"}));
936        assert!(result.is_err());
937    }
938
939    #[test]
940    fn parse_spec_cursor_pagination() {
941        let params = json!({
942            "pagination": {
943                "strategy":     "cursor",
944                "cursor_param": "after",
945                "cursor_field": "page_info.end_cursor",
946                "max_pages":    10
947            }
948        });
949        let spec = RestApiAdapter::parse_spec(&params).unwrap();
950        assert_eq!(spec.max_pages, 10);
951        match spec.pagination {
952            PaginationStrategy::Cursor {
953                cursor_param,
954                cursor_field,
955            } => {
956                assert_eq!(cursor_param, "after");
957                assert_eq!(cursor_field, "page_info.end_cursor");
958            }
959            other => panic!("unexpected: {other:?}"),
960        }
961    }
962
963    #[test]
964    fn parse_spec_offset_pagination() {
965        let params = json!({
966            "pagination": {
967                "strategy":        "offset",
968                "page_param":      "page",
969                "page_size_param": "per_page",
970                "page_size":       50,
971                "start_page":      1,
972                "max_pages":       3
973            }
974        });
975        let spec = RestApiAdapter::parse_spec(&params).unwrap();
976        assert_eq!(spec.max_pages, 3);
977        match spec.pagination {
978            PaginationStrategy::Offset {
979                page_size,
980                current_page,
981                page_param,
982                ..
983            } => {
984                assert_eq!(page_size, Some(50));
985                assert_eq!(current_page, 1);
986                assert_eq!(page_param, "page");
987            }
988            other => panic!("unexpected: {other:?}"),
989        }
990    }
991
992    #[test]
993    fn parse_spec_link_header_pagination() {
994        let params = json!({
995            "pagination": { "strategy": "link_header", "max_pages": 5 }
996        });
997        let spec = RestApiAdapter::parse_spec(&params).unwrap();
998        assert_eq!(spec.max_pages, 5);
999        assert!(matches!(spec.pagination, PaginationStrategy::LinkHeader));
1000    }
1001
1002    #[test]
1003    fn parse_spec_data_path_and_collect_as_array() {
1004        let params = json!({
1005            "response": { "data_path": "data.items", "collect_as_array": true }
1006        });
1007        let spec = RestApiAdapter::parse_spec(&params).unwrap();
1008        assert_eq!(spec.data_path, Some("data.items".to_owned()));
1009        assert!(spec.collect_as_array);
1010    }
1011
1012    #[test]
1013    fn parse_spec_empty_data_path_is_none() {
1014        let params = json!({ "response": { "data_path": "" } });
1015        let spec = RestApiAdapter::parse_spec(&params).unwrap();
1016        assert!(spec.data_path.is_none());
1017    }
1018
1019    // ── adapter_name ───────────────────────────────────────────────────────────
1020
1021    #[test]
1022    fn adapter_name() {
1023        assert_eq!(RestApiAdapter::new().name(), "rest-api");
1024    }
1025
1026    // ── is_retryable ────────────────────────────────────────────────────────────
1027
1028    #[test]
1029    fn is_retryable_429() {
1030        let e = StygianError::from(ServiceError::Unavailable(
1031            "HTTP 429 rate-limited".to_string(),
1032        ));
1033        assert!(is_retryable(&e));
1034    }
1035
1036    #[test]
1037    fn is_retryable_503() {
1038        let e = StygianError::from(ServiceError::Unavailable(
1039            "HTTP 503 Service Unavailable".to_string(),
1040        ));
1041        assert!(is_retryable(&e));
1042    }
1043
1044    #[test]
1045    fn is_retryable_404_not_retryable() {
1046        let e = StygianError::from(ServiceError::Unavailable("HTTP 404 Not Found".to_string()));
1047        assert!(!is_retryable(&e));
1048    }
1049
1050    // ── integration ────────────────────────────────────────────────────────────
1051
1052    /// Real HTTP integration test — requires `REST_API_TEST_URL` env var.
1053    ///
1054    /// Run with: `REST_API_TEST_URL=https://httpbin.org/get cargo test -- --ignored`
1055    #[tokio::test]
1056    #[ignore = "requires live REST API endpoint; set REST_API_TEST_URL env var"]
1057    async fn integration_get_httpbin() {
1058        let url = std::env::var("REST_API_TEST_URL")
1059            .unwrap_or_else(|_| "https://httpbin.org/get".to_string());
1060
1061        let adapter = RestApiAdapter::new();
1062        let input = ServiceInput {
1063            url,
1064            params: json!({}),
1065        };
1066        let output = adapter.execute(input).await.unwrap();
1067        assert!(!output.data.is_empty());
1068        assert_eq!(output.metadata["page_count"], 1);
1069    }
1070}