Skip to main content

stygian_graph/adapters/
graphql.rs

1//! GraphQL API adapter — a generic [`ScrapingService`](crate::ports::ScrapingService) for any spec-compliant
2//! GraphQL endpoint.
3//!
4//! Handles the full request/response lifecycle: query execution, variable
5//! injection, GraphQL error-envelope parsing, Jobber-style cost/throttle
6//! metadata, cursor-based pagination, and pluggable auth strategies.
7//!
8//! Target-specific knowledge (endpoint URL, version headers, default auth) is
9//! supplied by a [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin)
10//! resolved from an optional [`GraphQlPluginRegistry`](crate::application::graphql_plugin_registry::GraphQlPluginRegistry).
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18
19use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
20use crate::application::pipeline_parser::expand_template;
21use crate::domain::error::{Result, ServiceError, StygianError};
22use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Configuration
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for [`GraphQlService`].
29///
30/// # Example
31///
32/// ```rust
33/// use stygian_graph::adapters::graphql::GraphQlConfig;
34///
35/// let config = GraphQlConfig {
36///     timeout_secs: 30,
37///     max_pages: 1000,
38///     user_agent: "stygian-graph/1.0".to_string(),
39/// };
40/// ```
41#[derive(Debug, Clone)]
42pub struct GraphQlConfig {
43    /// Request timeout in seconds (default: 30)
44    pub timeout_secs: u64,
45    /// Maximum number of pages for cursor-paginated queries (default: 1000)
46    pub max_pages: usize,
47    /// User-Agent header sent with every request
48    pub user_agent: String,
49}
50
51impl Default for GraphQlConfig {
52    fn default() -> Self {
53        Self {
54            timeout_secs: 30,
55            max_pages: 1000,
56            user_agent: "stygian-graph/1.0".to_string(),
57        }
58    }
59}
60
61// ─────────────────────────────────────────────────────────────────────────────
62// Adapter
63// ─────────────────────────────────────────────────────────────────────────────
64
65/// `ScrapingService` adapter for GraphQL APIs.
66///
67/// Implement any spec-compliant GraphQL endpoint by constructing a
68/// [`GraphQlService`] with a config and an optional plugin registry. Target
69/// specifics (endpoint, version headers, auth) are supplied either via
70/// `ServiceInput.params` directly or through a registered
71/// [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin).
72///
73/// # Example
74///
75/// ```no_run
76/// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
77/// use stygian_graph::ports::{ScrapingService, ServiceInput};
78/// use serde_json::json;
79///
80/// #[tokio::main]
81/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
82///     let service = GraphQlService::new(GraphQlConfig::default(), None);
83///     let input = ServiceInput {
84///         url: "https://countries.trevorblades.com/".to_string(),
85///         params: json!({
86///             "query": "{ countries { code name } }"
87///         }),
88///     };
89///     let output = service.execute(input).await?;
90///     println!("{}", output.data);
91///     Ok(())
92/// }
93/// ```
94pub struct GraphQlService {
95    client: reqwest::Client,
96    config: GraphQlConfig,
97    plugins: Option<Arc<GraphQlPluginRegistry>>,
98}
99
100impl GraphQlService {
101    /// Create a new `GraphQlService`.
102    ///
103    /// `plugins` may be `None` for raw-params mode (no plugin resolution).
104    ///
105    /// # Example
106    ///
107    /// ```no_run
108    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
109    /// use stygian_graph::ports::ScrapingService;
110    ///
111    /// let service = GraphQlService::new(GraphQlConfig::default(), None);
112    /// assert_eq!(service.name(), "graphql");
113    /// ```
114    pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
115        let client = reqwest::Client::builder()
116            .timeout(Duration::from_secs(config.timeout_secs))
117            .user_agent(&config.user_agent)
118            .build()
119            .unwrap_or_default();
120        Self {
121            client,
122            config,
123            plugins,
124        }
125    }
126
127    // ── Internal helpers ─────────────────────────────────────────────────────
128
129    /// Apply auth to the request builder.
130    fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
131        let token = expand_template(&auth.token);
132        match auth.kind {
133            GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
134            GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
135            GraphQlAuthKind::Header => {
136                let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
137                builder.header(name, token)
138            }
139            GraphQlAuthKind::None => builder,
140        }
141    }
142
143    /// Parse `GraphQlAuth` from a JSON object like `{"kind":"bearer","token":"..."}`.
144    fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
145        let kind_str = val["kind"].as_str().unwrap_or("none");
146        let kind = match kind_str {
147            "bearer" => GraphQlAuthKind::Bearer,
148            "api_key" => GraphQlAuthKind::ApiKey,
149            "header" => GraphQlAuthKind::Header,
150            _ => GraphQlAuthKind::None,
151        };
152        if kind == GraphQlAuthKind::None {
153            return None;
154        }
155        let token = val["token"].as_str()?.to_string();
156        let header_name = val["header_name"].as_str().map(str::to_string);
157        Some(GraphQlAuth {
158            kind,
159            token,
160            header_name,
161        })
162    }
163
164    /// Check whether the response body indicates throttling.
165    ///
166    /// Returns `Some(retry_after_ms)` on throttle detection via any of:
167    /// 1. `extensions.cost.throttleStatus == "THROTTLED"`
168    /// 2. Any error entry with `extensions.code == "THROTTLED"`
169    /// 3. Any error message containing "throttled" (case-insensitive)
170    #[allow(clippy::indexing_slicing)]
171    fn detect_throttle(body: &Value) -> Option<u64> {
172        // 1. extensions.cost.throttleStatus
173        if body["extensions"]["cost"]["throttleStatus"]
174            .as_str()
175            .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
176        {
177            return Some(Self::throttle_backoff(body));
178        }
179
180        // 2 & 3. errors array
181        if let Some(errors) = body["errors"].as_array() {
182            for err in errors {
183                if err["extensions"]["code"]
184                    .as_str()
185                    .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
186                {
187                    return Some(Self::throttle_backoff(body));
188                }
189                if err["message"]
190                    .as_str()
191                    .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
192                {
193                    return Some(Self::throttle_backoff(body));
194                }
195            }
196        }
197
198        None
199    }
200
201    /// Calculate retry back-off from `extensions.cost`.
202    ///
203    /// ```text
204    /// deficit = maximumAvailable − currentlyAvailable
205    /// ms      = (deficit / restoreRate * 1000).clamp(500, 2000)
206    /// ```
207    #[allow(
208        clippy::indexing_slicing,
209        clippy::cast_possible_truncation,
210        clippy::cast_sign_loss
211    )]
212    fn throttle_backoff(body: &Value) -> u64 {
213        let cost = &body["extensions"]["cost"];
214        let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
215        let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
216        let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
217        let deficit = (max_avail - cur_avail).max(0.0);
218        let ms = if restore_rate > 0.0 {
219            (deficit / restore_rate * 1000.0) as u64
220        } else {
221            2_000
222        };
223        ms.clamp(500, 2_000)
224    }
225
226    /// Extract the `extensions.cost` object into a metadata-compatible [`Value`].
227    #[allow(clippy::indexing_slicing)]
228    fn extract_cost_metadata(body: &Value) -> Option<Value> {
229        let cost = &body["extensions"]["cost"];
230        if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
231            return None;
232        }
233        Some(cost.clone())
234    }
235
236    /// Navigate a dot-separated JSON path like `"data.clients.pageInfo"`.
237    #[allow(clippy::indexing_slicing)]
238    fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
239        let mut cur = root;
240        for key in path.split('.') {
241            cur = &cur[key];
242        }
243        cur
244    }
245
246    /// Execute one GraphQL POST and return the parsed JSON body or an error.
247    #[allow(clippy::indexing_slicing)]
248    async fn post_query(
249        &self,
250        url: &str,
251        query: &str,
252        variables: &Value,
253        operation_name: Option<&str>,
254        auth: Option<&GraphQlAuth>,
255        extra_headers: &HashMap<String, String>,
256    ) -> Result<Value> {
257        let mut body = json!({ "query": query, "variables": variables });
258        if let Some(op) = operation_name {
259            body["operationName"] = json!(op);
260        }
261
262        let mut builder = self
263            .client
264            .post(url)
265            .header("Content-Type", "application/json")
266            .header("Accept", "application/json");
267
268        for (k, v) in extra_headers {
269            builder = builder.header(k.as_str(), v.as_str());
270        }
271
272        if let Some(a) = auth {
273            builder = Self::apply_auth(builder, a);
274        }
275
276        let resp = builder
277            .json(&body)
278            .send()
279            .await
280            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
281
282        let status = resp.status();
283        let text = resp
284            .text()
285            .await
286            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
287
288        if status.as_u16() >= 400 {
289            return Err(StygianError::Service(ServiceError::Unavailable(format!(
290                "HTTP {status}: {text}"
291            ))));
292        }
293
294        serde_json::from_str::<Value>(&text).map_err(|e| {
295            StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
296        })
297    }
298
299    /// Validate a parsed GraphQL body (errors array, missing `data` key, throttle).
300    #[allow(clippy::indexing_slicing)]
301    fn validate_body(body: &Value) -> Result<()> {
302        // Throttle check takes priority so callers can retry with backoff.
303        if let Some(retry_after_ms) = Self::detect_throttle(body) {
304            return Err(StygianError::Service(ServiceError::RateLimited {
305                retry_after_ms,
306            }));
307        }
308
309        if let Some(errors) = body["errors"].as_array()
310            && !errors.is_empty()
311        {
312            let msg = errors[0]["message"]
313                .as_str()
314                .unwrap_or("unknown GraphQL error")
315                .to_string();
316            return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
317        }
318
319        // `data` key is missing — explicitly null with no errors is allowed (partial response)
320        if body.get("data").is_none() {
321            return Err(StygianError::Service(ServiceError::InvalidResponse(
322                "missing 'data' key in GraphQL response".to_string(),
323            )));
324        }
325
326        Ok(())
327    }
328}
329
330// ─────────────────────────────────────────────────────────────────────────────
331// ScrapingService impl
332// ─────────────────────────────────────────────────────────────────────────────
333
334#[async_trait]
335impl ScrapingService for GraphQlService {
336    fn name(&self) -> &'static str {
337        "graphql"
338    }
339
340    /// Execute a GraphQL query.
341    ///
342    /// Reads `ServiceInput.params` for:
343    /// - `query` (required) — the GraphQL query string
344    /// - `variables` — optional JSON object
345    /// - `operation_name` — optional string
346    /// - `auth` — optional `{"kind": "bearer"|"api_key"|"header"|"none", "token": "..."}`
347    /// - `headers` — optional extra headers object
348    /// - `plugin` — optional plugin name to resolve from the registry
349    /// - `pagination` — optional `{"strategy": "cursor", "page_info_path": "...", "edges_path": "...", "page_size": 50}`
350    ///
351    /// # Errors
352    ///
353    /// Returns `Err` for HTTP ≥ 400, invalid JSON, GraphQL `errors[]`, missing
354    /// `data` key, throttle detection, or pagination runaway.
355    #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
356    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
357        let params = &input.params;
358
359        // ── 1. Resolve plugin (if any) ────────────────────────────────────
360        let plugin_name = params["plugin"].as_str();
361        let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
362            Some(registry.get(name)?)
363        } else {
364            None
365        };
366
367        // ── 2. Resolve URL ────────────────────────────────────────────────
368        let url = if !input.url.is_empty() {
369            input.url.clone()
370        } else if let Some(ref p) = plugin {
371            p.endpoint().to_string()
372        } else {
373            return Err(StygianError::Service(ServiceError::Unavailable(
374                "no URL provided and no plugin endpoint available".to_string(),
375            )));
376        };
377
378        // ── 3. Resolve query ──────────────────────────────────────────────
379        let query = params["query"].as_str().ok_or_else(|| {
380            StygianError::Service(ServiceError::InvalidResponse(
381                "params.query is required".to_string(),
382            ))
383        })?;
384
385        let operation_name = params["operation_name"].as_str();
386        let mut variables = params["variables"].clone();
387        if variables.is_null() {
388            variables = json!({});
389        }
390
391        // ── 4. Resolve auth ───────────────────────────────────────────────
392        let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
393            Self::parse_auth(&params["auth"])
394        } else {
395            plugin.as_ref().and_then(|p| p.default_auth())
396        };
397
398        // ── 5. Build headers (extra + plugin version headers) ─────────────
399        let mut extra_headers: HashMap<String, String> = params["headers"]
400            .as_object()
401            .map(|obj| {
402                obj.iter()
403                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
404                    .collect()
405            })
406            .unwrap_or_default();
407
408        // Plugin version headers override ad-hoc ones for the same key
409        if let Some(ref p) = plugin {
410            for (k, v) in p.version_headers() {
411                extra_headers.insert(k, v);
412            }
413        }
414
415        // ── 6. Resolve pagination config ──────────────────────────────────
416        let pag = &params["pagination"];
417        let use_cursor = pag["strategy"].as_str() == Some("cursor");
418        let page_info_path = pag["page_info_path"]
419            .as_str()
420            .unwrap_or("data.pageInfo")
421            .to_string();
422        let edges_path = pag["edges_path"]
423            .as_str()
424            .unwrap_or("data.edges")
425            .to_string();
426        let page_size: u64 = pag["page_size"]
427            .as_u64()
428            .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
429
430        // ── 7. Execute (with optional cursor pagination) ───────────────────
431        if use_cursor {
432            // Inject the initial `first`/page-size variable and null cursor
433            variables["first"] = json!(page_size);
434            variables["after"] = json!(null);
435
436            let mut all_edges: Vec<Value> = Vec::new();
437            let mut page = 0usize;
438            let mut cost_meta = json!(null);
439
440            loop {
441                if page >= self.config.max_pages {
442                    return Err(StygianError::Service(ServiceError::InvalidResponse(
443                        format!("pagination exceeded max_pages ({})", self.config.max_pages),
444                    )));
445                }
446
447                let body = self
448                    .post_query(
449                        &url,
450                        query,
451                        &variables,
452                        operation_name,
453                        auth.as_ref(),
454                        &extra_headers,
455                    )
456                    .await?;
457
458                Self::validate_body(&body)?;
459
460                // Accumulate edges
461                let edges = Self::json_path(&body, &edges_path);
462                if let Some(arr) = edges.as_array() {
463                    all_edges.extend(arr.iter().cloned());
464                }
465
466                // Check for next page
467                let page_info = Self::json_path(&body, &page_info_path);
468                let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
469                let end_cursor = page_info["endCursor"].clone();
470
471                cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
472                page += 1;
473
474                if !has_next || end_cursor.is_null() {
475                    break;
476                }
477                variables["after"] = end_cursor;
478            }
479
480            let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
481            Ok(ServiceOutput {
482                data: serde_json::to_string(&all_edges).unwrap_or_default(),
483                metadata,
484            })
485        } else {
486            // Single-request mode
487            let body = self
488                .post_query(
489                    &url,
490                    query,
491                    &variables,
492                    operation_name,
493                    auth.as_ref(),
494                    &extra_headers,
495                )
496                .await?;
497
498            Self::validate_body(&body)?;
499
500            let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
501            let metadata = json!({ "cost": cost_meta });
502
503            Ok(ServiceOutput {
504                data: serde_json::to_string(&body["data"]).unwrap_or_default(),
505                metadata,
506            })
507        }
508    }
509}
510
511// ─────────────────────────────────────────────────────────────────────────────
512// Tests
513// ─────────────────────────────────────────────────────────────────────────────
514
515#[cfg(test)]
516#[allow(
517    clippy::unwrap_used,
518    clippy::indexing_slicing,
519    clippy::needless_pass_by_value,
520    clippy::field_reassign_with_default,
521    clippy::unnecessary_literal_bound
522)]
523mod tests {
524    use super::*;
525    use std::collections::HashMap;
526    use std::io::Write;
527    use std::sync::Arc;
528
529    use serde_json::json;
530    use tokio::io::{AsyncReadExt, AsyncWriteExt};
531    use tokio::net::TcpListener;
532
533    use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
534    use crate::ports::graphql_plugin::GraphQlTargetPlugin;
535
536    // ── Mock server ──────────────────────────────────────────────────────────
537
538    /// Minimal HTTP/1.1 mock server that serves one fixed JSON response body.
539    ///
540    /// The server listens on a random port, serves one request, then stops.
541    struct MockGraphQlServer;
542
543    impl MockGraphQlServer {
544        /// Spawn a server that returns HTTP `status` with `body` and run `f`.
545        ///
546        /// The closure receives the base URL `"http://127.0.0.1:<port>"`.
547        async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
548        where
549            F: FnOnce(String) -> Fut,
550            Fut: std::future::Future<Output = ()>,
551        {
552            let body_bytes: Vec<u8> = body.into();
553            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554            let addr = listener.local_addr().unwrap();
555            let url = format!("http://{addr}");
556
557            let body_clone = body_bytes.clone();
558            tokio::spawn(async move {
559                if let Ok((mut stream, _)) = listener.accept().await {
560                    let mut buf = [0u8; 4096];
561                    let _ = stream.read(&mut buf).await;
562                    // Build a minimal HTTP/1.1 response
563                    let mut response = Vec::new();
564                    write!(
565                        response,
566                        "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
567                        body_clone.len()
568                    ).unwrap();
569                    response.extend_from_slice(&body_clone);
570                    let _ = stream.write_all(&response).await;
571                }
572            });
573
574            f(url).await;
575        }
576
577        /// Variant that captures the received request headers for assertion.
578        async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
579        where
580            F: FnOnce(String) -> Fut,
581            Fut: std::future::Future<Output = ()>,
582        {
583            let body_bytes: Vec<u8> = body.into();
584            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
585            let addr = listener.local_addr().unwrap();
586            let url = format!("http://{addr}");
587
588            let body_clone = body_bytes.clone();
589            let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
590            tokio::spawn(async move {
591                if let Ok((mut stream, _)) = listener.accept().await {
592                    let mut buf = vec![0u8; 8192];
593                    let n = stream.read(&mut buf).await.unwrap_or(0);
594                    let request = buf[..n].to_vec();
595                    let _ = tx.send(request);
596
597                    let mut response = Vec::new();
598                    write!(
599                        response,
600                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
601                        body_clone.len()
602                    ).unwrap();
603                    response.extend_from_slice(&body_clone);
604                    let _ = stream.write_all(&response).await;
605                }
606            });
607
608            f(url).await;
609
610            rx.try_recv().unwrap_or_default()
611        }
612    }
613
614    fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
615        let mut config = GraphQlConfig::default();
616        config.max_pages = 5; // keep tests fast
617        GraphQlService::new(config, plugins)
618    }
619
620    fn simple_query_body(data: Value) -> Vec<u8> {
621        serde_json::to_vec(&json!({ "data": data })).unwrap()
622    }
623
624    // ── Tests ────────────────────────────────────────────────────────────────
625
626    #[tokio::test]
627    async fn execute_simple_query() {
628        let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
629        MockGraphQlServer::run_with(200, body, |url| async move {
630            let svc = make_service(None);
631            let input = ServiceInput {
632                url,
633                params: json!({ "query": "{ users { id } }" }),
634            };
635            let output = svc.execute(input).await.unwrap();
636            let data: Value = serde_json::from_str(&output.data).unwrap();
637            assert_eq!(data["users"][0]["id"], 1);
638        })
639        .await;
640    }
641
642    #[tokio::test]
643    async fn graphql_errors_in_200_response() {
644        let body =
645            serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
646                .unwrap();
647        MockGraphQlServer::run_with(200, body, |url| async move {
648            let svc = make_service(None);
649            let input = ServiceInput {
650                url,
651                params: json!({ "query": "{ missing }" }),
652            };
653            let err = svc.execute(input).await.unwrap_err();
654            assert!(
655                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
656                "expected InvalidResponse, got {err:?}"
657            );
658        })
659        .await;
660    }
661
662    #[tokio::test]
663    async fn http_error_returns_unavailable() {
664        let body = b"Internal Server Error".to_vec();
665        MockGraphQlServer::run_with(500, body, |url| async move {
666            let svc = make_service(None);
667            let input = ServiceInput {
668                url,
669                params: json!({ "query": "{ x }" }),
670            };
671            let err = svc.execute(input).await.unwrap_err();
672            assert!(
673                matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
674                "expected Unavailable, got {err:?}"
675            );
676        })
677        .await;
678    }
679
680    #[tokio::test]
681    async fn missing_data_key() {
682        let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
683        MockGraphQlServer::run_with(200, body, |url| async move {
684            let svc = make_service(None);
685            let input = ServiceInput {
686                url,
687                params: json!({ "query": "{ x }" }),
688            };
689            let err = svc.execute(input).await.unwrap_err();
690            assert!(
691                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
692                "expected InvalidResponse, got {err:?}"
693            );
694        })
695        .await;
696    }
697
698    #[tokio::test]
699    async fn bearer_auth_header_set() {
700        let body = simple_query_body(json!({}));
701        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
702            let svc = make_service(None);
703            let input = ServiceInput {
704                url,
705                params: json!({
706                    "query": "{ x }",
707                    "auth": { "kind": "bearer", "token": "test-token-123" }
708                }),
709            };
710            let _ = svc.execute(input).await;
711        })
712        .await;
713
714        let request_str = String::from_utf8_lossy(&request_bytes);
715        assert!(
716            request_str.contains("authorization: Bearer test-token-123"),
717            "auth header not found in request:\n{request_str}"
718        );
719    }
720
721    #[tokio::test]
722    async fn plugin_version_headers_merged() {
723        struct V1Plugin;
724        impl GraphQlTargetPlugin for V1Plugin {
725            fn name(&self) -> &str {
726                "v1"
727            }
728            fn endpoint(&self) -> &str {
729                "unused"
730            }
731            fn version_headers(&self) -> HashMap<String, String> {
732                [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
733            }
734        }
735
736        let mut registry = GraphQlPluginRegistry::new();
737        registry.register(Arc::new(V1Plugin));
738
739        let body = simple_query_body(json!({}));
740        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
741            let svc = make_service(Some(Arc::new(registry)));
742            let input = ServiceInput {
743                url,
744                params: json!({
745                    "query": "{ x }",
746                    "plugin": "v1"
747                }),
748            };
749            let _ = svc.execute(input).await;
750        })
751        .await;
752
753        let request_str = String::from_utf8_lossy(&request_bytes);
754        assert!(
755            request_str.contains("x-test-version: 2025-01-01"),
756            "version header not found:\n{request_str}"
757        );
758    }
759
760    #[tokio::test]
761    async fn plugin_default_auth_used_when_params_auth_absent() {
762        use crate::ports::{GraphQlAuth, GraphQlAuthKind};
763
764        struct TokenPlugin;
765        impl GraphQlTargetPlugin for TokenPlugin {
766            fn name(&self) -> &str {
767                "tokenplugin"
768            }
769            fn endpoint(&self) -> &str {
770                "unused"
771            }
772            fn default_auth(&self) -> Option<GraphQlAuth> {
773                Some(GraphQlAuth {
774                    kind: GraphQlAuthKind::Bearer,
775                    token: "plugin-default-token".to_string(),
776                    header_name: None,
777                })
778            }
779        }
780
781        let mut registry = GraphQlPluginRegistry::new();
782        registry.register(Arc::new(TokenPlugin));
783
784        let body = simple_query_body(json!({}));
785        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
786            let svc = make_service(Some(Arc::new(registry)));
787            let input = ServiceInput {
788                url,
789                // No `auth` field — plugin should supply it
790                params: json!({
791                    "query": "{ x }",
792                    "plugin": "tokenplugin"
793                }),
794            };
795            let _ = svc.execute(input).await;
796        })
797        .await;
798
799        let request_str = String::from_utf8_lossy(&request_bytes);
800        assert!(
801            request_str.contains("Bearer plugin-default-token"),
802            "plugin default auth not applied:\n{request_str}"
803        );
804    }
805
806    #[tokio::test]
807    async fn throttle_response_returns_rate_limited() {
808        let body = serde_json::to_vec(&json!({
809            "data": null,
810            "extensions": {
811                "cost": {
812                    "throttleStatus": "THROTTLED",
813                    "maximumAvailable": 10000,
814                    "currentlyAvailable": 0,
815                    "restoreRate": 500
816                }
817            }
818        }))
819        .unwrap();
820
821        MockGraphQlServer::run_with(200, body, |url| async move {
822            let svc = make_service(None);
823            let input = ServiceInput {
824                url,
825                params: json!({ "query": "{ x }" }),
826            };
827            let err = svc.execute(input).await.unwrap_err();
828            assert!(
829                matches!(
830                    err,
831                    StygianError::Service(ServiceError::RateLimited { retry_after_ms })
832                    if retry_after_ms > 0
833                ),
834                "expected RateLimited, got {err:?}"
835            );
836        })
837        .await;
838    }
839
840    #[tokio::test]
841    async fn cost_metadata_surfaced() {
842        let body = serde_json::to_vec(&json!({
843            "data": { "items": [] },
844            "extensions": {
845                "cost": {
846                    "throttleStatus": "PASS",
847                    "maximumAvailable": 10000,
848                    "currentlyAvailable": 9800,
849                    "actualQueryCost": 42,
850                    "restoreRate": 500
851                }
852            }
853        }))
854        .unwrap();
855
856        MockGraphQlServer::run_with(200, body, |url| async move {
857            let svc = make_service(None);
858            let input = ServiceInput {
859                url,
860                params: json!({ "query": "{ items { id } }" }),
861            };
862            let output = svc.execute(input).await.unwrap();
863            let cost = &output.metadata["cost"];
864            assert_eq!(cost["actualQueryCost"], 42);
865            assert_eq!(cost["throttleStatus"], "PASS");
866        })
867        .await;
868    }
869
870    #[tokio::test]
871    async fn cursor_pagination_accumulates_pages() {
872        // Two-page scenario: page 1 has next page, page 2 does not.
873        // We need two independent servers (one per page).
874        let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
875        let addr1 = listener1.local_addr().unwrap();
876        let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
877        let addr2 = listener2.local_addr().unwrap();
878
879        // Both pages go to the same host:port — use a single server that handles
880        // two sequential connections.
881        let page1_body = serde_json::to_vec(&json!({
882            "data": {
883                "items": {
884                    "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
885                    "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
886                }
887            }
888        }))
889        .unwrap();
890
891        let page2_body = serde_json::to_vec(&json!({
892            "data": {
893                "items": {
894                    "edges": [{"node": {"id": 3}}],
895                    "pageInfo": { "hasNextPage": false, "endCursor": null }
896                }
897            }
898        }))
899        .unwrap();
900
901        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
902        let addr = listener.local_addr().unwrap();
903        let url = format!("http://{addr}");
904
905        let bodies = vec![page1_body, page2_body];
906        tokio::spawn(async move {
907            for response_body in bodies {
908                if let Ok((mut stream, _)) = listener.accept().await {
909                    let mut buf = [0u8; 8192];
910                    let _ = stream.read(&mut buf).await;
911                    let mut resp = Vec::new();
912                    write!(
913                        resp,
914                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
915                        response_body.len()
916                    ).unwrap();
917                    resp.extend_from_slice(&response_body);
918                    let _ = stream.write_all(&resp).await;
919                }
920            }
921            // suppress unused warnings — listener1/2 and addr1/2 were created to
922            // demonstrate the two-listener approach; the actual test uses a single listener
923            let _ = listener1;
924            let _ = listener2;
925            let _ = addr1;
926            let _ = addr2;
927        });
928
929        let svc = make_service(None);
930        let input = ServiceInput {
931            url,
932            params: json!({
933                "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
934                "pagination": {
935                    "strategy": "cursor",
936                    "page_info_path": "data.items.pageInfo",
937                    "edges_path": "data.items.edges",
938                    "page_size": 2
939                }
940            }),
941        };
942
943        let output = svc.execute(input).await.unwrap();
944        let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
945        assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
946        assert_eq!(edges[0]["node"]["id"], 1);
947        assert_eq!(edges[2]["node"]["id"], 3);
948    }
949
950    #[tokio::test]
951    async fn pagination_cap_prevents_infinite_loop() {
952        // Every page reports hasNextPage=true — the cap should kick in.
953        let page_body = serde_json::to_vec(&json!({
954            "data": {
955                "rows": {
956                    "edges": [{"node": {"id": 1}}],
957                    "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
958                }
959            }
960        }))
961        .unwrap();
962
963        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
964        let addr = listener.local_addr().unwrap();
965        let url = format!("http://{addr}");
966
967        let page_body_clone = page_body.clone();
968        tokio::spawn(async move {
969            while let Ok((mut stream, _)) = listener.accept().await {
970                let mut buf = [0u8; 8192];
971                let _ = stream.read(&mut buf).await;
972                let mut resp = Vec::new();
973                write!(
974                    resp,
975                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
976                    page_body_clone.len()
977                )
978                .unwrap();
979                resp.extend_from_slice(&page_body_clone);
980                let _ = stream.write_all(&resp).await;
981            }
982        });
983
984        // max_pages = 5 from make_service
985        let svc = make_service(None);
986        let input = ServiceInput {
987            url,
988            params: json!({
989                "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
990                "pagination": {
991                    "strategy": "cursor",
992                    "page_info_path": "data.rows.pageInfo",
993                    "edges_path": "data.rows.edges",
994                    "page_size": 1
995                }
996            }),
997        };
998
999        let err = svc.execute(input).await.unwrap_err();
1000        assert!(
1001            matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1002            "expected pagination cap error, got {err:?}"
1003        );
1004    }
1005}