Skip to main content

stygian_graph/adapters/
graphql.rs

1//! GraphQL API adapter — a generic [`ScrapingService`](crate::ports::ScrapingService) for any spec-compliant
2//! GraphQL endpoint.
3//!
4//! Handles the full request/response lifecycle: query execution, variable
5//! injection, GraphQL error-envelope parsing, Jobber-style cost/throttle
6//! metadata, cursor-based pagination, and pluggable auth strategies.
7//!
8//! Target-specific knowledge (endpoint URL, version headers, default auth) is
9//! supplied by a [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin)
10//! resolved from an optional [`GraphQlPluginRegistry`](crate::application::graphql_plugin_registry::GraphQlPluginRegistry).
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18
19use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
20use crate::application::pipeline_parser::expand_template;
21use crate::domain::error::{StygianError, Result, ServiceError};
22use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Configuration
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for [`GraphQlService`].
29///
30/// # Example
31///
32/// ```rust
33/// use stygian_graph::adapters::graphql::GraphQlConfig;
34///
35/// let config = GraphQlConfig {
36///     timeout_secs: 30,
37///     max_pages: 1000,
38///     user_agent: "stygian-graph/1.0".to_string(),
39/// };
40/// ```
41#[derive(Debug, Clone)]
42pub struct GraphQlConfig {
43    /// Request timeout in seconds (default: 30)
44    pub timeout_secs: u64,
45    /// Maximum number of pages for cursor-paginated queries (default: 1000)
46    pub max_pages: usize,
47    /// User-Agent header sent with every request
48    pub user_agent: String,
49}
50
51impl Default for GraphQlConfig {
52    fn default() -> Self {
53        Self {
54            timeout_secs: 30,
55            max_pages: 1000,
56            user_agent: "stygian-graph/1.0".to_string(),
57        }
58    }
59}
60
61// ─────────────────────────────────────────────────────────────────────────────
62// Adapter
63// ─────────────────────────────────────────────────────────────────────────────
64
65/// `ScrapingService` adapter for GraphQL APIs.
66///
67/// Implement any spec-compliant GraphQL endpoint by constructing a
68/// [`GraphQlService`] with a config and an optional plugin registry. Target
69/// specifics (endpoint, version headers, auth) are supplied either via
70/// `ServiceInput.params` directly or through a registered
71/// [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin).
72///
73/// # Example
74///
75/// ```no_run
76/// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
77/// use stygian_graph::ports::{ScrapingService, ServiceInput};
78/// use serde_json::json;
79///
80/// #[tokio::main]
81/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
82///     let service = GraphQlService::new(GraphQlConfig::default(), None);
83///     let input = ServiceInput {
84///         url: "https://countries.trevorblades.com/".to_string(),
85///         params: json!({
86///             "query": "{ countries { code name } }"
87///         }),
88///     };
89///     let output = service.execute(input).await?;
90///     println!("{}", output.data);
91///     Ok(())
92/// }
93/// ```
94pub struct GraphQlService {
95    client: reqwest::Client,
96    config: GraphQlConfig,
97    plugins: Option<Arc<GraphQlPluginRegistry>>,
98}
99
100impl GraphQlService {
101    /// Create a new `GraphQlService`.
102    ///
103    /// `plugins` may be `None` for raw-params mode (no plugin resolution).
104    ///
105    /// # Example
106    ///
107    /// ```no_run
108    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
109    /// use stygian_graph::ports::ScrapingService;
110    ///
111    /// let service = GraphQlService::new(GraphQlConfig::default(), None);
112    /// assert_eq!(service.name(), "graphql");
113    /// ```
114    pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
115        let client = reqwest::Client::builder()
116            .timeout(Duration::from_secs(config.timeout_secs))
117            .user_agent(&config.user_agent)
118            .build()
119            .unwrap_or_default();
120        Self {
121            client,
122            config,
123            plugins,
124        }
125    }
126
127    // ── Internal helpers ─────────────────────────────────────────────────────
128
129    /// Apply auth to the request builder.
130    fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
131        let token = expand_template(&auth.token);
132        match auth.kind {
133            GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
134            GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
135            GraphQlAuthKind::Header => {
136                let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
137                builder.header(name, token)
138            }
139            GraphQlAuthKind::None => builder,
140        }
141    }
142
143    /// Parse `GraphQlAuth` from a JSON object like `{"kind":"bearer","token":"..."}`.
144    fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
145        let kind_str = val["kind"].as_str().unwrap_or("none");
146        let kind = match kind_str {
147            "bearer" => GraphQlAuthKind::Bearer,
148            "api_key" => GraphQlAuthKind::ApiKey,
149            "header" => GraphQlAuthKind::Header,
150            _ => GraphQlAuthKind::None,
151        };
152        if kind == GraphQlAuthKind::None {
153            return None;
154        }
155        let token = val["token"].as_str()?.to_string();
156        let header_name = val["header_name"].as_str().map(str::to_string);
157        Some(GraphQlAuth {
158            kind,
159            token,
160            header_name,
161        })
162    }
163
164    /// Check whether the response body indicates throttling.
165    ///
166    /// Returns `Some(retry_after_ms)` on throttle detection via any of:
167    /// 1. `extensions.cost.throttleStatus == "THROTTLED"`
168    /// 2. Any error entry with `extensions.code == "THROTTLED"`
169    /// 3. Any error message containing "throttled" (case-insensitive)
170    #[allow(clippy::indexing_slicing)]
171    fn detect_throttle(body: &Value) -> Option<u64> {
172        // 1. extensions.cost.throttleStatus
173        if body["extensions"]["cost"]["throttleStatus"]
174            .as_str()
175            .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
176        {
177            return Some(Self::throttle_backoff(body));
178        }
179
180        // 2 & 3. errors array
181        if let Some(errors) = body["errors"].as_array() {
182            for err in errors {
183                if err["extensions"]["code"]
184                    .as_str()
185                    .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
186                {
187                    return Some(Self::throttle_backoff(body));
188                }
189                if err["message"]
190                    .as_str()
191                    .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
192                {
193                    return Some(Self::throttle_backoff(body));
194                }
195            }
196        }
197
198        None
199    }
200
201    /// Calculate retry back-off from `extensions.cost`.
202    ///
203    /// ```text
204    /// deficit = maximumAvailable − currentlyAvailable
205    /// ms      = (deficit / restoreRate * 1000).clamp(500, 2000)
206    /// ```
207    #[allow(
208        clippy::indexing_slicing,
209        clippy::cast_possible_truncation,
210        clippy::cast_sign_loss
211    )]
212    fn throttle_backoff(body: &Value) -> u64 {
213        let cost = &body["extensions"]["cost"];
214        let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
215        let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
216        let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
217        let deficit = (max_avail - cur_avail).max(0.0);
218        let ms = if restore_rate > 0.0 {
219            (deficit / restore_rate * 1000.0) as u64
220        } else {
221            2_000
222        };
223        ms.clamp(500, 2_000)
224    }
225
226    /// Extract the `extensions.cost` object into a metadata-compatible [`Value`].
227    #[allow(clippy::indexing_slicing)]
228    fn extract_cost_metadata(body: &Value) -> Option<Value> {
229        let cost = &body["extensions"]["cost"];
230        if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
231            return None;
232        }
233        Some(cost.clone())
234    }
235
236    /// Navigate a dot-separated JSON path like `"data.clients.pageInfo"`.
237    #[allow(clippy::indexing_slicing)]
238    fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
239        let mut cur = root;
240        for key in path.split('.') {
241            cur = &cur[key];
242        }
243        cur
244    }
245
246    /// Execute one GraphQL POST and return the parsed JSON body or an error.
247    #[allow(clippy::indexing_slicing)]
248    async fn post_query(
249        &self,
250        url: &str,
251        query: &str,
252        variables: &Value,
253        operation_name: Option<&str>,
254        auth: Option<&GraphQlAuth>,
255        extra_headers: &HashMap<String, String>,
256    ) -> Result<Value> {
257        let mut body = json!({ "query": query, "variables": variables });
258        if let Some(op) = operation_name {
259            body["operationName"] = json!(op);
260        }
261
262        let mut builder = self
263            .client
264            .post(url)
265            .header("Content-Type", "application/json")
266            .header("Accept", "application/json");
267
268        for (k, v) in extra_headers {
269            builder = builder.header(k.as_str(), v.as_str());
270        }
271
272        if let Some(a) = auth {
273            builder = Self::apply_auth(builder, a);
274        }
275
276        let resp = builder
277            .json(&body)
278            .send()
279            .await
280            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
281
282        let status = resp.status();
283        let text = resp
284            .text()
285            .await
286            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
287
288        if status.as_u16() >= 400 {
289            return Err(StygianError::Service(ServiceError::Unavailable(format!(
290                "HTTP {status}: {text}"
291            ))));
292        }
293
294        serde_json::from_str::<Value>(&text).map_err(|e| {
295            StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
296        })
297    }
298
299    /// Validate a parsed GraphQL body (errors array, missing `data` key, throttle).
300    #[allow(clippy::indexing_slicing)]
301    fn validate_body(body: &Value) -> Result<()> {
302        // Throttle check takes priority so callers can retry with backoff.
303        if let Some(retry_after_ms) = Self::detect_throttle(body) {
304            return Err(StygianError::Service(ServiceError::RateLimited {
305                retry_after_ms,
306            }));
307        }
308
309        if let Some(errors) = body["errors"].as_array()
310            && !errors.is_empty()
311        {
312            let msg = errors[0]["message"]
313                .as_str()
314                .unwrap_or("unknown GraphQL error")
315                .to_string();
316            return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
317        }
318
319        // `data` key is missing — explicitly null with no errors is allowed (partial response)
320        if body.get("data").is_none() {
321            return Err(StygianError::Service(ServiceError::InvalidResponse(
322                "missing 'data' key in GraphQL response".to_string(),
323            )));
324        }
325
326        Ok(())
327    }
328}
329
330// ─────────────────────────────────────────────────────────────────────────────
331// ScrapingService impl
332// ─────────────────────────────────────────────────────────────────────────────
333
334#[async_trait]
335impl ScrapingService for GraphQlService {
336    fn name(&self) -> &'static str {
337        "graphql"
338    }
339
340    /// Execute a GraphQL query.
341    ///
342    /// Reads `ServiceInput.params` for:
343    /// - `query` (required) — the GraphQL query string
344    /// - `variables` — optional JSON object
345    /// - `operation_name` — optional string
346    /// - `auth` — optional `{"kind": "bearer"|"api_key"|"header"|"none", "token": "..."}`
347    /// - `headers` — optional extra headers object
348    /// - `plugin` — optional plugin name to resolve from the registry
349    /// - `pagination` — optional `{"strategy": "cursor", "page_info_path": "...", "edges_path": "...", "page_size": 50}`
350    ///
351    /// # Errors
352    ///
353    /// Returns `Err` for HTTP ≥ 400, invalid JSON, GraphQL `errors[]`, missing
354    /// `data` key, throttle detection, or pagination runaway.
355    #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
356    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
357        let params = &input.params;
358
359        // ── 1. Resolve plugin (if any) ────────────────────────────────────
360        let plugin_name = params["plugin"].as_str();
361        let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
362            Some(registry.get(name)?)
363        } else {
364            None
365        };
366
367        // ── 2. Resolve URL ────────────────────────────────────────────────
368        let url = if !input.url.is_empty() {
369            input.url.clone()
370        } else if let Some(ref p) = plugin {
371            p.endpoint().to_string()
372        } else {
373            return Err(StygianError::Service(ServiceError::Unavailable(
374                "no URL provided and no plugin endpoint available".to_string(),
375            )));
376        };
377
378        // ── 3. Resolve query ──────────────────────────────────────────────
379        let query = params["query"].as_str().ok_or_else(|| {
380            StygianError::Service(ServiceError::InvalidResponse(
381                "params.query is required".to_string(),
382            ))
383        })?;
384
385        let operation_name = params["operation_name"].as_str();
386        let mut variables = params["variables"].clone();
387        if variables.is_null() {
388            variables = json!({});
389        }
390
391        // ── 4. Resolve auth ───────────────────────────────────────────────
392        let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
393            Self::parse_auth(&params["auth"])
394        } else {
395            plugin.as_ref().and_then(|p| p.default_auth())
396        };
397
398        // ── 5. Build headers (extra + plugin version headers) ─────────────
399        let mut extra_headers: HashMap<String, String> = params["headers"]
400            .as_object()
401            .map(|obj| {
402                obj.iter()
403                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
404                    .collect()
405            })
406            .unwrap_or_default();
407
408        // Plugin version headers override ad-hoc ones for the same key
409        if let Some(ref p) = plugin {
410            for (k, v) in p.version_headers() {
411                extra_headers.insert(k, v);
412            }
413        }
414
415        // ── 6. Resolve pagination config ──────────────────────────────────
416        let pag = &params["pagination"];
417        let use_cursor = pag["strategy"].as_str() == Some("cursor");
418        let page_info_path = pag["page_info_path"]
419            .as_str()
420            .unwrap_or("data.pageInfo")
421            .to_string();
422        let edges_path = pag["edges_path"]
423            .as_str()
424            .unwrap_or("data.edges")
425            .to_string();
426        let page_size: u64 = pag["page_size"]
427            .as_u64()
428            .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
429
430        // ── 7. Execute (with optional cursor pagination) ───────────────────
431        if use_cursor {
432            // Inject the initial `first`/page-size variable and null cursor
433            variables["first"] = json!(page_size);
434            variables["after"] = json!(null);
435
436            let mut all_edges: Vec<Value> = Vec::new();
437            let mut page = 0usize;
438            let mut cost_meta = json!(null);
439
440            loop {
441                if page >= self.config.max_pages {
442                    return Err(StygianError::Service(ServiceError::InvalidResponse(
443                        format!("pagination exceeded max_pages ({})", self.config.max_pages),
444                    )));
445                }
446
447                let body = self
448                    .post_query(
449                        &url,
450                        query,
451                        &variables,
452                        operation_name,
453                        auth.as_ref(),
454                        &extra_headers,
455                    )
456                    .await?;
457
458                Self::validate_body(&body)?;
459
460                // Accumulate edges
461                let edges = Self::json_path(&body, &edges_path);
462                if let Some(arr) = edges.as_array() {
463                    all_edges.extend(arr.iter().cloned());
464                }
465
466                // Check for next page
467                let page_info = Self::json_path(&body, &page_info_path);
468                let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
469                let end_cursor = page_info["endCursor"].clone();
470
471                cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
472                page += 1;
473
474                if !has_next || end_cursor.is_null() {
475                    break;
476                }
477                variables["after"] = end_cursor;
478            }
479
480            let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
481            Ok(ServiceOutput {
482                data: serde_json::to_string(&all_edges).unwrap_or_default(),
483                metadata,
484            })
485        } else {
486            // Single-request mode
487            let body = self
488                .post_query(
489                    &url,
490                    query,
491                    &variables,
492                    operation_name,
493                    auth.as_ref(),
494                    &extra_headers,
495                )
496                .await?;
497
498            Self::validate_body(&body)?;
499
500            let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
501            let metadata = json!({ "cost": cost_meta });
502
503            Ok(ServiceOutput {
504                data: serde_json::to_string(&body["data"]).unwrap_or_default(),
505                metadata,
506            })
507        }
508    }
509}
510
511// ─────────────────────────────────────────────────────────────────────────────
512// Tests
513// ─────────────────────────────────────────────────────────────────────────────
514
515#[cfg(test)]
516#[allow(
517    clippy::unwrap_used,
518    clippy::indexing_slicing,
519    clippy::needless_pass_by_value,
520    clippy::field_reassign_with_default,
521    clippy::unnecessary_literal_bound
522)]
523mod tests {
524    use super::*;
525    use std::collections::HashMap;
526    use std::io::Write;
527    use std::sync::Arc;
528
529    use serde_json::json;
530    use tokio::io::{AsyncReadExt, AsyncWriteExt};
531    use tokio::net::TcpListener;
532
533    use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
534    use crate::ports::graphql_plugin::GraphQlTargetPlugin;
535
536    // ── Mock server ──────────────────────────────────────────────────────────
537
538    /// Minimal HTTP/1.1 mock server that serves one fixed JSON response body.
539    ///
540    /// The server listens on a random port, serves one request, then stops.
541    struct MockGraphQlServer;
542
543    impl MockGraphQlServer {
544        /// Spawn a server that returns HTTP `status` with `body` and run `f`.
545        ///
546        /// The closure receives the base URL `"http://127.0.0.1:<port>"`.
547        async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
548        where
549            F: FnOnce(String) -> Fut,
550            Fut: std::future::Future<Output = ()>,
551        {
552            let body_bytes: Vec<u8> = body.into();
553            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554            let addr = listener.local_addr().unwrap();
555            let url = format!("http://{addr}");
556
557            let body_clone = body_bytes.clone();
558            tokio::spawn(async move {
559                if let Ok((mut stream, _)) = listener.accept().await {
560                    let mut buf = [0u8; 4096];
561                    let _ = stream.read(&mut buf).await;
562                    // Build a minimal HTTP/1.1 response
563                    let mut response = Vec::new();
564                    write!(
565                        response,
566                        "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
567                        body_clone.len()
568                    ).unwrap();
569                    response.extend_from_slice(&body_clone);
570                    let _ = stream.write_all(&response).await;
571                }
572            });
573
574            f(url).await;
575        }
576
577        /// Variant that captures the received request headers for assertion.
578        async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
579        where
580            F: FnOnce(String) -> Fut,
581            Fut: std::future::Future<Output = ()>,
582        {
583            let body_bytes: Vec<u8> = body.into();
584            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
585            let addr = listener.local_addr().unwrap();
586            let url = format!("http://{addr}");
587
588            let body_clone = body_bytes.clone();
589            let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
590            tokio::spawn(async move {
591                if let Ok((mut stream, _)) = listener.accept().await {
592                    let mut buf = vec![0u8; 8192];
593                    let n = stream.read(&mut buf).await.unwrap_or(0);
594                    let request = buf[..n].to_vec();
595                    let _ = tx.send(request);
596
597                    let mut response = Vec::new();
598                    write!(
599                        response,
600                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
601                        body_clone.len()
602                    ).unwrap();
603                    response.extend_from_slice(&body_clone);
604                    let _ = stream.write_all(&response).await;
605                }
606            });
607
608            f(url).await;
609
610            rx.try_recv().unwrap_or_default()
611        }
612    }
613
614    fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
615        let mut config = GraphQlConfig::default();
616        config.max_pages = 5; // keep tests fast
617        GraphQlService::new(config, plugins)
618    }
619
620    fn simple_query_body(data: Value) -> Vec<u8> {
621        serde_json::to_vec(&json!({ "data": data })).unwrap()
622    }
623
624    // ── Tests ────────────────────────────────────────────────────────────────
625
626    #[tokio::test]
627    async fn execute_simple_query() {
628        let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
629        MockGraphQlServer::run_with(200, body, |url| async move {
630            let svc = make_service(None);
631            let input = ServiceInput {
632                url,
633                params: json!({ "query": "{ users { id } }" }),
634            };
635            let output = svc.execute(input).await.unwrap();
636            let data: Value = serde_json::from_str(&output.data).unwrap();
637            assert_eq!(data["users"][0]["id"], 1);
638        })
639        .await;
640    }
641
642    #[tokio::test]
643    async fn graphql_errors_in_200_response() {
644        let body =
645            serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
646                .unwrap();
647        MockGraphQlServer::run_with(200, body, |url| async move {
648            let svc = make_service(None);
649            let input = ServiceInput {
650                url,
651                params: json!({ "query": "{ missing }" }),
652            };
653            let err = svc.execute(input).await.unwrap_err();
654            assert!(
655                matches!(
656                    err,
657                    StygianError::Service(ServiceError::InvalidResponse(_))
658                ),
659                "expected InvalidResponse, got {err:?}"
660            );
661        })
662        .await;
663    }
664
665    #[tokio::test]
666    async fn http_error_returns_unavailable() {
667        let body = b"Internal Server Error".to_vec();
668        MockGraphQlServer::run_with(500, body, |url| async move {
669            let svc = make_service(None);
670            let input = ServiceInput {
671                url,
672                params: json!({ "query": "{ x }" }),
673            };
674            let err = svc.execute(input).await.unwrap_err();
675            assert!(
676                matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
677                "expected Unavailable, got {err:?}"
678            );
679        })
680        .await;
681    }
682
683    #[tokio::test]
684    async fn missing_data_key() {
685        let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
686        MockGraphQlServer::run_with(200, body, |url| async move {
687            let svc = make_service(None);
688            let input = ServiceInput {
689                url,
690                params: json!({ "query": "{ x }" }),
691            };
692            let err = svc.execute(input).await.unwrap_err();
693            assert!(
694                matches!(
695                    err,
696                    StygianError::Service(ServiceError::InvalidResponse(_))
697                ),
698                "expected InvalidResponse, got {err:?}"
699            );
700        })
701        .await;
702    }
703
704    #[tokio::test]
705    async fn bearer_auth_header_set() {
706        let body = simple_query_body(json!({}));
707        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
708            let svc = make_service(None);
709            let input = ServiceInput {
710                url,
711                params: json!({
712                    "query": "{ x }",
713                    "auth": { "kind": "bearer", "token": "test-token-123" }
714                }),
715            };
716            let _ = svc.execute(input).await;
717        })
718        .await;
719
720        let request_str = String::from_utf8_lossy(&request_bytes);
721        assert!(
722            request_str.contains("authorization: Bearer test-token-123"),
723            "auth header not found in request:\n{request_str}"
724        );
725    }
726
727    #[tokio::test]
728    async fn plugin_version_headers_merged() {
729        struct V1Plugin;
730        impl GraphQlTargetPlugin for V1Plugin {
731            fn name(&self) -> &str {
732                "v1"
733            }
734            fn endpoint(&self) -> &str {
735                "unused"
736            }
737            fn version_headers(&self) -> HashMap<String, String> {
738                [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
739            }
740        }
741
742        let mut registry = GraphQlPluginRegistry::new();
743        registry.register(Arc::new(V1Plugin));
744
745        let body = simple_query_body(json!({}));
746        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
747            let svc = make_service(Some(Arc::new(registry)));
748            let input = ServiceInput {
749                url,
750                params: json!({
751                    "query": "{ x }",
752                    "plugin": "v1"
753                }),
754            };
755            let _ = svc.execute(input).await;
756        })
757        .await;
758
759        let request_str = String::from_utf8_lossy(&request_bytes);
760        assert!(
761            request_str.contains("x-test-version: 2025-01-01"),
762            "version header not found:\n{request_str}"
763        );
764    }
765
766    #[tokio::test]
767    async fn plugin_default_auth_used_when_params_auth_absent() {
768        use crate::ports::{GraphQlAuth, GraphQlAuthKind};
769
770        struct TokenPlugin;
771        impl GraphQlTargetPlugin for TokenPlugin {
772            fn name(&self) -> &str {
773                "tokenplugin"
774            }
775            fn endpoint(&self) -> &str {
776                "unused"
777            }
778            fn default_auth(&self) -> Option<GraphQlAuth> {
779                Some(GraphQlAuth {
780                    kind: GraphQlAuthKind::Bearer,
781                    token: "plugin-default-token".to_string(),
782                    header_name: None,
783                })
784            }
785        }
786
787        let mut registry = GraphQlPluginRegistry::new();
788        registry.register(Arc::new(TokenPlugin));
789
790        let body = simple_query_body(json!({}));
791        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
792            let svc = make_service(Some(Arc::new(registry)));
793            let input = ServiceInput {
794                url,
795                // No `auth` field — plugin should supply it
796                params: json!({
797                    "query": "{ x }",
798                    "plugin": "tokenplugin"
799                }),
800            };
801            let _ = svc.execute(input).await;
802        })
803        .await;
804
805        let request_str = String::from_utf8_lossy(&request_bytes);
806        assert!(
807            request_str.contains("Bearer plugin-default-token"),
808            "plugin default auth not applied:\n{request_str}"
809        );
810    }
811
812    #[tokio::test]
813    async fn throttle_response_returns_rate_limited() {
814        let body = serde_json::to_vec(&json!({
815            "data": null,
816            "extensions": {
817                "cost": {
818                    "throttleStatus": "THROTTLED",
819                    "maximumAvailable": 10000,
820                    "currentlyAvailable": 0,
821                    "restoreRate": 500
822                }
823            }
824        }))
825        .unwrap();
826
827        MockGraphQlServer::run_with(200, body, |url| async move {
828            let svc = make_service(None);
829            let input = ServiceInput {
830                url,
831                params: json!({ "query": "{ x }" }),
832            };
833            let err = svc.execute(input).await.unwrap_err();
834            assert!(
835                matches!(
836                    err,
837                    StygianError::Service(ServiceError::RateLimited { retry_after_ms })
838                    if retry_after_ms > 0
839                ),
840                "expected RateLimited, got {err:?}"
841            );
842        })
843        .await;
844    }
845
846    #[tokio::test]
847    async fn cost_metadata_surfaced() {
848        let body = serde_json::to_vec(&json!({
849            "data": { "items": [] },
850            "extensions": {
851                "cost": {
852                    "throttleStatus": "PASS",
853                    "maximumAvailable": 10000,
854                    "currentlyAvailable": 9800,
855                    "actualQueryCost": 42,
856                    "restoreRate": 500
857                }
858            }
859        }))
860        .unwrap();
861
862        MockGraphQlServer::run_with(200, body, |url| async move {
863            let svc = make_service(None);
864            let input = ServiceInput {
865                url,
866                params: json!({ "query": "{ items { id } }" }),
867            };
868            let output = svc.execute(input).await.unwrap();
869            let cost = &output.metadata["cost"];
870            assert_eq!(cost["actualQueryCost"], 42);
871            assert_eq!(cost["throttleStatus"], "PASS");
872        })
873        .await;
874    }
875
876    #[tokio::test]
877    async fn cursor_pagination_accumulates_pages() {
878        // Two-page scenario: page 1 has next page, page 2 does not.
879        // We need two independent servers (one per page).
880        let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
881        let addr1 = listener1.local_addr().unwrap();
882        let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
883        let addr2 = listener2.local_addr().unwrap();
884
885        // Both pages go to the same host:port — use a single server that handles
886        // two sequential connections.
887        let page1_body = serde_json::to_vec(&json!({
888            "data": {
889                "items": {
890                    "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
891                    "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
892                }
893            }
894        }))
895        .unwrap();
896
897        let page2_body = serde_json::to_vec(&json!({
898            "data": {
899                "items": {
900                    "edges": [{"node": {"id": 3}}],
901                    "pageInfo": { "hasNextPage": false, "endCursor": null }
902                }
903            }
904        }))
905        .unwrap();
906
907        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
908        let addr = listener.local_addr().unwrap();
909        let url = format!("http://{addr}");
910
911        let bodies = vec![page1_body, page2_body];
912        tokio::spawn(async move {
913            for response_body in bodies {
914                if let Ok((mut stream, _)) = listener.accept().await {
915                    let mut buf = [0u8; 8192];
916                    let _ = stream.read(&mut buf).await;
917                    let mut resp = Vec::new();
918                    write!(
919                        resp,
920                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
921                        response_body.len()
922                    ).unwrap();
923                    resp.extend_from_slice(&response_body);
924                    let _ = stream.write_all(&resp).await;
925                }
926            }
927            // suppress unused warnings — listener1/2 and addr1/2 were created to
928            // demonstrate the two-listener approach; the actual test uses a single listener
929            let _ = listener1;
930            let _ = listener2;
931            let _ = addr1;
932            let _ = addr2;
933        });
934
935        let svc = make_service(None);
936        let input = ServiceInput {
937            url,
938            params: json!({
939                "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
940                "pagination": {
941                    "strategy": "cursor",
942                    "page_info_path": "data.items.pageInfo",
943                    "edges_path": "data.items.edges",
944                    "page_size": 2
945                }
946            }),
947        };
948
949        let output = svc.execute(input).await.unwrap();
950        let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
951        assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
952        assert_eq!(edges[0]["node"]["id"], 1);
953        assert_eq!(edges[2]["node"]["id"], 3);
954    }
955
956    #[tokio::test]
957    async fn pagination_cap_prevents_infinite_loop() {
958        // Every page reports hasNextPage=true — the cap should kick in.
959        let page_body = serde_json::to_vec(&json!({
960            "data": {
961                "rows": {
962                    "edges": [{"node": {"id": 1}}],
963                    "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
964                }
965            }
966        }))
967        .unwrap();
968
969        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
970        let addr = listener.local_addr().unwrap();
971        let url = format!("http://{addr}");
972
973        let page_body_clone = page_body.clone();
974        tokio::spawn(async move {
975            while let Ok((mut stream, _)) = listener.accept().await {
976                let mut buf = [0u8; 8192];
977                let _ = stream.read(&mut buf).await;
978                let mut resp = Vec::new();
979                write!(
980                    resp,
981                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
982                    page_body_clone.len()
983                )
984                .unwrap();
985                resp.extend_from_slice(&page_body_clone);
986                let _ = stream.write_all(&resp).await;
987            }
988        });
989
990        // max_pages = 5 from make_service
991        let svc = make_service(None);
992        let input = ServiceInput {
993            url,
994            params: json!({
995                "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
996                "pagination": {
997                    "strategy": "cursor",
998                    "page_info_path": "data.rows.pageInfo",
999                    "edges_path": "data.rows.edges",
1000                    "page_size": 1
1001                }
1002            }),
1003        };
1004
1005        let err = svc.execute(input).await.unwrap_err();
1006        assert!(
1007            matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1008            "expected pagination cap error, got {err:?}"
1009        );
1010    }
1011}