Skip to main content

stygian_graph/adapters/
graphql.rs

1//! GraphQL API adapter — a generic [`ScrapingService`](crate::ports::ScrapingService) for any spec-compliant
2//! GraphQL endpoint.
3//!
4//! Handles the full request/response lifecycle: query execution, variable
5//! injection, GraphQL error-envelope parsing, Jobber-style cost/throttle
6//! metadata, cursor-based pagination, and pluggable auth strategies.
7//!
8//! Target-specific knowledge (endpoint URL, version headers, default auth) is
9//! supplied by a [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin)
10//! resolved from an optional [`GraphQlPluginRegistry`](crate::application::graphql_plugin_registry::GraphQlPluginRegistry).
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_throttle::{PluginBudget, pre_flight_delay, update_budget};
21use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
22use crate::application::pipeline_parser::expand_template;
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::auth::ErasedAuthPort;
25use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
26
27// ─────────────────────────────────────────────────────────────────────────────
28// Configuration
29// ─────────────────────────────────────────────────────────────────────────────
30
31/// Configuration for [`GraphQlService`].
32///
33/// # Example
34///
35/// ```rust
36/// use stygian_graph::adapters::graphql::GraphQlConfig;
37///
38/// let config = GraphQlConfig {
39///     timeout_secs: 30,
40///     max_pages: 1000,
41///     user_agent: "stygian-graph/1.0".to_string(),
42/// };
43/// ```
44#[derive(Debug, Clone)]
45pub struct GraphQlConfig {
46    /// Request timeout in seconds (default: 30)
47    pub timeout_secs: u64,
48    /// Maximum number of pages for cursor-paginated queries (default: 1000)
49    pub max_pages: usize,
50    /// User-Agent header sent with every request
51    pub user_agent: String,
52}
53
54impl Default for GraphQlConfig {
55    fn default() -> Self {
56        Self {
57            timeout_secs: 30,
58            max_pages: 1000,
59            user_agent: "stygian-graph/1.0".to_string(),
60        }
61    }
62}
63
64// ─────────────────────────────────────────────────────────────────────────────
65// Adapter
66// ─────────────────────────────────────────────────────────────────────────────
67
68/// `ScrapingService` adapter for GraphQL APIs.
69///
70/// Implement any spec-compliant GraphQL endpoint by constructing a
71/// [`GraphQlService`] with a config and an optional plugin registry. Target
72/// specifics (endpoint, version headers, auth) are supplied either via
73/// `ServiceInput.params` directly or through a registered
74/// [`GraphQlTargetPlugin`](crate::ports::graphql_plugin::GraphQlTargetPlugin).
75///
76/// # Example
77///
78/// ```no_run
79/// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
80/// use stygian_graph::ports::{ScrapingService, ServiceInput};
81/// use serde_json::json;
82///
83/// #[tokio::main]
84/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
85///     let service = GraphQlService::new(GraphQlConfig::default(), None);
86///     let input = ServiceInput {
87///         url: "https://countries.trevorblades.com/".to_string(),
88///         params: json!({
89///             "query": "{ countries { code name } }"
90///         }),
91///     };
92///     let output = service.execute(input).await?;
93///     println!("{}", output.data);
94///     Ok(())
95/// }
96/// ```
97pub struct GraphQlService {
98    client: reqwest::Client,
99    config: GraphQlConfig,
100    plugins: Option<Arc<GraphQlPluginRegistry>>,
101    /// Optional runtime auth port — used when no static auth is configured.
102    auth_port: Option<Arc<dyn ErasedAuthPort>>,
103    /// Per-plugin proactive cost-throttle budgets, keyed by plugin name.
104    budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
105}
106
107impl GraphQlService {
108    /// Create a new `GraphQlService`.
109    ///
110    /// `plugins` may be `None` for raw-params mode (no plugin resolution).
111    ///
112    /// # Example
113    ///
114    /// ```no_run
115    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
116    /// use stygian_graph::ports::ScrapingService;
117    ///
118    /// let service = GraphQlService::new(GraphQlConfig::default(), None);
119    /// assert_eq!(service.name(), "graphql");
120    /// ```
121    pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
122        let client = reqwest::Client::builder()
123            .timeout(Duration::from_secs(config.timeout_secs))
124            .user_agent(&config.user_agent)
125            .build()
126            .unwrap_or_default();
127        Self {
128            client,
129            config,
130            plugins,
131            auth_port: None,
132            budgets: Arc::new(RwLock::new(HashMap::new())),
133        }
134    }
135
136    /// Attach a runtime auth port.
137    ///
138    /// When set, the port's `erased_resolve_token()` will be called to obtain
139    /// a bearer token whenever `params.auth` is absent and the plugin supplies
140    /// no `default_auth`.
141    ///
142    /// # Example
143    ///
144    /// ```no_run
145    /// use std::sync::Arc;
146    /// use stygian_graph::adapters::graphql::{GraphQlService, GraphQlConfig};
147    /// use stygian_graph::ports::auth::{EnvAuthPort, ErasedAuthPort};
148    ///
149    /// let auth: Arc<dyn ErasedAuthPort> = Arc::new(EnvAuthPort::new("API_TOKEN"));
150    /// let service = GraphQlService::new(GraphQlConfig::default(), None)
151    ///     .with_auth_port(auth);
152    /// ```
153    #[must_use]
154    pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
155        self.auth_port = Some(port);
156        self
157    }
158
159    // ── Internal helpers ─────────────────────────────────────────────────────
160
161    /// Apply auth to the request builder.
162    fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
163        let token = expand_template(&auth.token);
164        match auth.kind {
165            GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
166            GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
167            GraphQlAuthKind::Header => {
168                let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
169                builder.header(name, token)
170            }
171            GraphQlAuthKind::None => builder,
172        }
173    }
174
175    /// Parse `GraphQlAuth` from a JSON object like `{"kind":"bearer","token":"..."}`.
176    fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
177        let kind_str = val["kind"].as_str().unwrap_or("none");
178        let kind = match kind_str {
179            "bearer" => GraphQlAuthKind::Bearer,
180            "api_key" => GraphQlAuthKind::ApiKey,
181            "header" => GraphQlAuthKind::Header,
182            _ => GraphQlAuthKind::None,
183        };
184        if kind == GraphQlAuthKind::None {
185            return None;
186        }
187        let token = val["token"].as_str()?.to_string();
188        let header_name = val["header_name"].as_str().map(str::to_string);
189        Some(GraphQlAuth {
190            kind,
191            token,
192            header_name,
193        })
194    }
195
196    /// Check whether the response body indicates throttling.
197    ///
198    /// Returns `Some(retry_after_ms)` on throttle detection via any of:
199    /// 1. `extensions.cost.throttleStatus == "THROTTLED"`
200    /// 2. Any error entry with `extensions.code == "THROTTLED"`
201    /// 3. Any error message containing "throttled" (case-insensitive)
202    #[allow(clippy::indexing_slicing)]
203    fn detect_throttle(body: &Value) -> Option<u64> {
204        // 1. extensions.cost.throttleStatus
205        if body["extensions"]["cost"]["throttleStatus"]
206            .as_str()
207            .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
208        {
209            return Some(Self::throttle_backoff(body));
210        }
211
212        // 2 & 3. errors array
213        if let Some(errors) = body["errors"].as_array() {
214            for err in errors {
215                if err["extensions"]["code"]
216                    .as_str()
217                    .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
218                {
219                    return Some(Self::throttle_backoff(body));
220                }
221                if err["message"]
222                    .as_str()
223                    .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
224                {
225                    return Some(Self::throttle_backoff(body));
226                }
227            }
228        }
229
230        None
231    }
232
233    /// Calculate retry back-off from `extensions.cost`.
234    ///
235    /// ```text
236    /// deficit = maximumAvailable − currentlyAvailable
237    /// ms      = (deficit / restoreRate * 1000).clamp(500, 2000)
238    /// ```
239    #[allow(
240        clippy::indexing_slicing,
241        clippy::cast_possible_truncation,
242        clippy::cast_sign_loss
243    )]
244    fn throttle_backoff(body: &Value) -> u64 {
245        let cost = &body["extensions"]["cost"];
246        let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
247        let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
248        let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
249        let deficit = (max_avail - cur_avail).max(0.0);
250        let ms = if restore_rate > 0.0 {
251            (deficit / restore_rate * 1000.0) as u64
252        } else {
253            2_000
254        };
255        ms.clamp(500, 2_000)
256    }
257
258    /// Extract the `extensions.cost` object into a metadata-compatible [`Value`].
259    #[allow(clippy::indexing_slicing)]
260    fn extract_cost_metadata(body: &Value) -> Option<Value> {
261        let cost = &body["extensions"]["cost"];
262        if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
263            return None;
264        }
265        Some(cost.clone())
266    }
267
268    /// Navigate a dot-separated JSON path like `"data.clients.pageInfo"`.
269    #[allow(clippy::indexing_slicing)]
270    fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
271        let mut cur = root;
272        for key in path.split('.') {
273            cur = &cur[key];
274        }
275        cur
276    }
277
278    /// Execute one GraphQL POST and return the parsed JSON body or an error.
279    #[allow(clippy::indexing_slicing)]
280    async fn post_query(
281        &self,
282        url: &str,
283        query: &str,
284        variables: &Value,
285        operation_name: Option<&str>,
286        auth: Option<&GraphQlAuth>,
287        extra_headers: &HashMap<String, String>,
288    ) -> Result<Value> {
289        let mut body = json!({ "query": query, "variables": variables });
290        if let Some(op) = operation_name {
291            body["operationName"] = json!(op);
292        }
293
294        let mut builder = self
295            .client
296            .post(url)
297            .header("Content-Type", "application/json")
298            .header("Accept", "application/json");
299
300        for (k, v) in extra_headers {
301            builder = builder.header(k.as_str(), v.as_str());
302        }
303
304        if let Some(a) = auth {
305            builder = Self::apply_auth(builder, a);
306        }
307
308        let resp = builder
309            .json(&body)
310            .send()
311            .await
312            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
313
314        let status = resp.status();
315        let text = resp
316            .text()
317            .await
318            .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
319
320        if status.as_u16() >= 400 {
321            return Err(StygianError::Service(ServiceError::Unavailable(format!(
322                "HTTP {status}: {text}"
323            ))));
324        }
325
326        serde_json::from_str::<Value>(&text).map_err(|e| {
327            StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
328        })
329    }
330
331    /// Validate a parsed GraphQL body (errors array, missing `data` key, throttle).
332    #[allow(clippy::indexing_slicing)]
333    fn validate_body(body: &Value) -> Result<()> {
334        // Throttle check takes priority so callers can retry with backoff.
335        if let Some(retry_after_ms) = Self::detect_throttle(body) {
336            return Err(StygianError::Service(ServiceError::RateLimited {
337                retry_after_ms,
338            }));
339        }
340
341        if let Some(errors) = body["errors"].as_array()
342            && !errors.is_empty()
343        {
344            let msg = errors[0]["message"]
345                .as_str()
346                .unwrap_or("unknown GraphQL error")
347                .to_string();
348            return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
349        }
350
351        // `data` key is missing — explicitly null with no errors is allowed (partial response)
352        if body.get("data").is_none() {
353            return Err(StygianError::Service(ServiceError::InvalidResponse(
354                "missing 'data' key in GraphQL response".to_string(),
355            )));
356        }
357
358        Ok(())
359    }
360}
361
362// ─────────────────────────────────────────────────────────────────────────────
363// ScrapingService impl
364// ─────────────────────────────────────────────────────────────────────────────
365
366#[async_trait]
367impl ScrapingService for GraphQlService {
368    fn name(&self) -> &'static str {
369        "graphql"
370    }
371
372    /// Execute a GraphQL query.
373    ///
374    /// Reads `ServiceInput.params` for:
375    /// - `query` (required) — the GraphQL query string
376    /// - `variables` — optional JSON object
377    /// - `operation_name` — optional string
378    /// - `auth` — optional `{"kind": "bearer"|"api_key"|"header"|"none", "token": "..."}`
379    /// - `headers` — optional extra headers object
380    /// - `plugin` — optional plugin name to resolve from the registry
381    /// - `pagination` — optional `{"strategy": "cursor", "page_info_path": "...", "edges_path": "...", "page_size": 50}`
382    ///
383    /// # Errors
384    ///
385    /// Returns `Err` for HTTP ≥ 400, invalid JSON, GraphQL `errors[]`, missing
386    /// `data` key, throttle detection, or pagination runaway.
387    #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
388    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
389        let params = &input.params;
390
391        // ── 1. Resolve plugin (if any) ────────────────────────────────────
392        let plugin_name = params["plugin"].as_str();
393        let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
394            Some(registry.get(name)?)
395        } else {
396            None
397        };
398
399        // ── 2. Resolve URL ────────────────────────────────────────────────
400        let url = if !input.url.is_empty() {
401            input.url.clone()
402        } else if let Some(ref p) = plugin {
403            p.endpoint().to_string()
404        } else {
405            return Err(StygianError::Service(ServiceError::Unavailable(
406                "no URL provided and no plugin endpoint available".to_string(),
407            )));
408        };
409
410        // ── 3. Resolve query ──────────────────────────────────────────────
411        let query = params["query"].as_str().ok_or_else(|| {
412            StygianError::Service(ServiceError::InvalidResponse(
413                "params.query is required".to_string(),
414            ))
415        })?;
416
417        let operation_name = params["operation_name"].as_str();
418        let mut variables = params["variables"].clone();
419        if variables.is_null() {
420            variables = json!({});
421        }
422
423        // ── 4. Resolve auth ───────────────────────────────────────────────
424        let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
425            Self::parse_auth(&params["auth"])
426        } else if let Some(ref p) = plugin {
427            p.default_auth()
428        } else if let Some(ref port) = self.auth_port {
429            // Fall back to the runtime auth port if no static auth is present
430            match port.erased_resolve_token().await {
431                Ok(token) => Some(GraphQlAuth {
432                    kind: GraphQlAuthKind::Bearer,
433                    token,
434                    header_name: None,
435                }),
436                Err(e) => {
437                    tracing::warn!("auth port failed: {e}; proceeding without auth");
438                    None
439                }
440            }
441        } else {
442            None
443        };
444
445        // ── 4b. Lazy-init and acquire per-plugin budget ───────────────────
446        let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
447            if let Some(throttle_cfg) = p.cost_throttle_config() {
448                let name = p.name().to_string();
449                // Fast path: budget already exists
450                {
451                    let read = self.budgets.read().await;
452                    if let Some(b) = read.get(&name) {
453                        let b_clone = b.clone();
454                        drop(read);
455                        pre_flight_delay(&b_clone).await;
456                        Some(b_clone)
457                    } else {
458                        drop(read);
459                        // Slow path: initialise
460                        let new_budget = PluginBudget::new(throttle_cfg);
461                        self.budgets.write().await.insert(name, new_budget.clone());
462                        pre_flight_delay(&new_budget).await;
463                        Some(new_budget)
464                    }
465                }
466            } else {
467                None
468            }
469        } else {
470            None
471        };
472
473        // ── 5. Build headers (extra + plugin version headers) ─────────────
474        let mut extra_headers: HashMap<String, String> = params["headers"]
475            .as_object()
476            .map(|obj| {
477                obj.iter()
478                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
479                    .collect()
480            })
481            .unwrap_or_default();
482
483        // Plugin version headers override ad-hoc ones for the same key
484        if let Some(ref p) = plugin {
485            for (k, v) in p.version_headers() {
486                extra_headers.insert(k, v);
487            }
488        }
489
490        // ── 6. Resolve pagination config ──────────────────────────────────
491        let pag = &params["pagination"];
492        let use_cursor = pag["strategy"].as_str() == Some("cursor");
493        let page_info_path = pag["page_info_path"]
494            .as_str()
495            .unwrap_or("data.pageInfo")
496            .to_string();
497        let edges_path = pag["edges_path"]
498            .as_str()
499            .unwrap_or("data.edges")
500            .to_string();
501        let page_size: u64 = pag["page_size"]
502            .as_u64()
503            .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
504
505        // ── 7. Execute (with optional cursor pagination) ───────────────────
506        if use_cursor {
507            // Inject the initial `first`/page-size variable and null cursor
508            variables["first"] = json!(page_size);
509            variables["after"] = json!(null);
510
511            let mut all_edges: Vec<Value> = Vec::new();
512            let mut page = 0usize;
513            let mut cost_meta = json!(null);
514
515            loop {
516                if page >= self.config.max_pages {
517                    return Err(StygianError::Service(ServiceError::InvalidResponse(
518                        format!("pagination exceeded max_pages ({})", self.config.max_pages),
519                    )));
520                }
521
522                let body = self
523                    .post_query(
524                        &url,
525                        query,
526                        &variables,
527                        operation_name,
528                        auth.as_ref(),
529                        &extra_headers,
530                    )
531                    .await?;
532
533                Self::validate_body(&body)?;
534
535                // Update proactive budget from response
536                if let Some(ref b) = maybe_budget {
537                    update_budget(b, &body).await;
538                }
539
540                // Accumulate edges
541                let edges = Self::json_path(&body, &edges_path);
542                if let Some(arr) = edges.as_array() {
543                    all_edges.extend(arr.iter().cloned());
544                }
545
546                // Check for next page
547                let page_info = Self::json_path(&body, &page_info_path);
548                let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
549                let end_cursor = page_info["endCursor"].clone();
550
551                cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
552                page += 1;
553
554                if !has_next || end_cursor.is_null() {
555                    break;
556                }
557                variables["after"] = end_cursor;
558            }
559
560            let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
561            Ok(ServiceOutput {
562                data: serde_json::to_string(&all_edges).unwrap_or_default(),
563                metadata,
564            })
565        } else {
566            // Single-request mode
567            let body = self
568                .post_query(
569                    &url,
570                    query,
571                    &variables,
572                    operation_name,
573                    auth.as_ref(),
574                    &extra_headers,
575                )
576                .await?;
577
578            Self::validate_body(&body)?;
579
580            // Update proactive budget from response
581            if let Some(ref b) = maybe_budget {
582                update_budget(b, &body).await;
583            }
584
585            let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
586            let metadata = json!({ "cost": cost_meta });
587
588            Ok(ServiceOutput {
589                data: serde_json::to_string(&body["data"]).unwrap_or_default(),
590                metadata,
591            })
592        }
593    }
594}
595
596// ─────────────────────────────────────────────────────────────────────────────
597// Tests
598// ─────────────────────────────────────────────────────────────────────────────
599
600#[cfg(test)]
601#[allow(
602    clippy::unwrap_used,
603    clippy::indexing_slicing,
604    clippy::needless_pass_by_value,
605    clippy::field_reassign_with_default,
606    clippy::unnecessary_literal_bound
607)]
608mod tests {
609    use super::*;
610    use std::collections::HashMap;
611    use std::io::Write;
612    use std::sync::Arc;
613
614    use serde_json::json;
615    use tokio::io::{AsyncReadExt, AsyncWriteExt};
616    use tokio::net::TcpListener;
617
618    use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
619    use crate::ports::graphql_plugin::GraphQlTargetPlugin;
620
621    // ── Mock server ──────────────────────────────────────────────────────────
622
623    /// Minimal HTTP/1.1 mock server that serves one fixed JSON response body.
624    ///
625    /// The server listens on a random port, serves one request, then stops.
626    struct MockGraphQlServer;
627
628    impl MockGraphQlServer {
629        /// Spawn a server that returns HTTP `status` with `body` and run `f`.
630        ///
631        /// The closure receives the base URL `"http://127.0.0.1:<port>"`.
632        async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
633        where
634            F: FnOnce(String) -> Fut,
635            Fut: std::future::Future<Output = ()>,
636        {
637            let body_bytes: Vec<u8> = body.into();
638            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
639            let addr = listener.local_addr().unwrap();
640            let url = format!("http://{addr}");
641
642            let body_clone = body_bytes.clone();
643            tokio::spawn(async move {
644                if let Ok((mut stream, _)) = listener.accept().await {
645                    let mut buf = [0u8; 4096];
646                    let _ = stream.read(&mut buf).await;
647                    // Build a minimal HTTP/1.1 response
648                    let mut response = Vec::new();
649                    write!(
650                        response,
651                        "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
652                        body_clone.len()
653                    ).unwrap();
654                    response.extend_from_slice(&body_clone);
655                    let _ = stream.write_all(&response).await;
656                }
657            });
658
659            f(url).await;
660        }
661
662        /// Variant that captures the received request headers for assertion.
663        async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
664        where
665            F: FnOnce(String) -> Fut,
666            Fut: std::future::Future<Output = ()>,
667        {
668            let body_bytes: Vec<u8> = body.into();
669            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
670            let addr = listener.local_addr().unwrap();
671            let url = format!("http://{addr}");
672
673            let body_clone = body_bytes.clone();
674            let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
675            tokio::spawn(async move {
676                if let Ok((mut stream, _)) = listener.accept().await {
677                    let mut buf = vec![0u8; 8192];
678                    let n = stream.read(&mut buf).await.unwrap_or(0);
679                    let request = buf[..n].to_vec();
680                    let _ = tx.send(request);
681
682                    let mut response = Vec::new();
683                    write!(
684                        response,
685                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
686                        body_clone.len()
687                    ).unwrap();
688                    response.extend_from_slice(&body_clone);
689                    let _ = stream.write_all(&response).await;
690                }
691            });
692
693            f(url).await;
694
695            rx.try_recv().unwrap_or_default()
696        }
697    }
698
699    fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
700        let mut config = GraphQlConfig::default();
701        config.max_pages = 5; // keep tests fast
702        GraphQlService::new(config, plugins)
703    }
704
705    fn simple_query_body(data: Value) -> Vec<u8> {
706        serde_json::to_vec(&json!({ "data": data })).unwrap()
707    }
708
709    // ── Tests ────────────────────────────────────────────────────────────────
710
711    #[tokio::test]
712    async fn execute_simple_query() {
713        let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
714        MockGraphQlServer::run_with(200, body, |url| async move {
715            let svc = make_service(None);
716            let input = ServiceInput {
717                url,
718                params: json!({ "query": "{ users { id } }" }),
719            };
720            let output = svc.execute(input).await.unwrap();
721            let data: Value = serde_json::from_str(&output.data).unwrap();
722            assert_eq!(data["users"][0]["id"], 1);
723        })
724        .await;
725    }
726
727    #[tokio::test]
728    async fn graphql_errors_in_200_response() {
729        let body =
730            serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
731                .unwrap();
732        MockGraphQlServer::run_with(200, body, |url| async move {
733            let svc = make_service(None);
734            let input = ServiceInput {
735                url,
736                params: json!({ "query": "{ missing }" }),
737            };
738            let err = svc.execute(input).await.unwrap_err();
739            assert!(
740                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
741                "expected InvalidResponse, got {err:?}"
742            );
743        })
744        .await;
745    }
746
747    #[tokio::test]
748    async fn http_error_returns_unavailable() {
749        let body = b"Internal Server Error".to_vec();
750        MockGraphQlServer::run_with(500, body, |url| async move {
751            let svc = make_service(None);
752            let input = ServiceInput {
753                url,
754                params: json!({ "query": "{ x }" }),
755            };
756            let err = svc.execute(input).await.unwrap_err();
757            assert!(
758                matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
759                "expected Unavailable, got {err:?}"
760            );
761        })
762        .await;
763    }
764
765    #[tokio::test]
766    async fn missing_data_key() {
767        let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
768        MockGraphQlServer::run_with(200, body, |url| async move {
769            let svc = make_service(None);
770            let input = ServiceInput {
771                url,
772                params: json!({ "query": "{ x }" }),
773            };
774            let err = svc.execute(input).await.unwrap_err();
775            assert!(
776                matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
777                "expected InvalidResponse, got {err:?}"
778            );
779        })
780        .await;
781    }
782
783    #[tokio::test]
784    async fn bearer_auth_header_set() {
785        let body = simple_query_body(json!({}));
786        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
787            let svc = make_service(None);
788            let input = ServiceInput {
789                url,
790                params: json!({
791                    "query": "{ x }",
792                    "auth": { "kind": "bearer", "token": "test-token-123" }
793                }),
794            };
795            let _ = svc.execute(input).await;
796        })
797        .await;
798
799        let request_str = String::from_utf8_lossy(&request_bytes);
800        assert!(
801            request_str.contains("authorization: Bearer test-token-123"),
802            "auth header not found in request:\n{request_str}"
803        );
804    }
805
806    #[tokio::test]
807    async fn plugin_version_headers_merged() {
808        struct V1Plugin;
809        impl GraphQlTargetPlugin for V1Plugin {
810            fn name(&self) -> &str {
811                "v1"
812            }
813            fn endpoint(&self) -> &str {
814                "unused"
815            }
816            fn version_headers(&self) -> HashMap<String, String> {
817                [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
818            }
819        }
820
821        let mut registry = GraphQlPluginRegistry::new();
822        registry.register(Arc::new(V1Plugin));
823
824        let body = simple_query_body(json!({}));
825        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
826            let svc = make_service(Some(Arc::new(registry)));
827            let input = ServiceInput {
828                url,
829                params: json!({
830                    "query": "{ x }",
831                    "plugin": "v1"
832                }),
833            };
834            let _ = svc.execute(input).await;
835        })
836        .await;
837
838        let request_str = String::from_utf8_lossy(&request_bytes);
839        assert!(
840            request_str.contains("x-test-version: 2025-01-01"),
841            "version header not found:\n{request_str}"
842        );
843    }
844
845    #[tokio::test]
846    async fn plugin_default_auth_used_when_params_auth_absent() {
847        use crate::ports::{GraphQlAuth, GraphQlAuthKind};
848
849        struct TokenPlugin;
850        impl GraphQlTargetPlugin for TokenPlugin {
851            fn name(&self) -> &str {
852                "tokenplugin"
853            }
854            fn endpoint(&self) -> &str {
855                "unused"
856            }
857            fn default_auth(&self) -> Option<GraphQlAuth> {
858                Some(GraphQlAuth {
859                    kind: GraphQlAuthKind::Bearer,
860                    token: "plugin-default-token".to_string(),
861                    header_name: None,
862                })
863            }
864        }
865
866        let mut registry = GraphQlPluginRegistry::new();
867        registry.register(Arc::new(TokenPlugin));
868
869        let body = simple_query_body(json!({}));
870        let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
871            let svc = make_service(Some(Arc::new(registry)));
872            let input = ServiceInput {
873                url,
874                // No `auth` field — plugin should supply it
875                params: json!({
876                    "query": "{ x }",
877                    "plugin": "tokenplugin"
878                }),
879            };
880            let _ = svc.execute(input).await;
881        })
882        .await;
883
884        let request_str = String::from_utf8_lossy(&request_bytes);
885        assert!(
886            request_str.contains("Bearer plugin-default-token"),
887            "plugin default auth not applied:\n{request_str}"
888        );
889    }
890
891    #[tokio::test]
892    async fn throttle_response_returns_rate_limited() {
893        let body = serde_json::to_vec(&json!({
894            "data": null,
895            "extensions": {
896                "cost": {
897                    "throttleStatus": "THROTTLED",
898                    "maximumAvailable": 10000,
899                    "currentlyAvailable": 0,
900                    "restoreRate": 500
901                }
902            }
903        }))
904        .unwrap();
905
906        MockGraphQlServer::run_with(200, body, |url| async move {
907            let svc = make_service(None);
908            let input = ServiceInput {
909                url,
910                params: json!({ "query": "{ x }" }),
911            };
912            let err = svc.execute(input).await.unwrap_err();
913            assert!(
914                matches!(
915                    err,
916                    StygianError::Service(ServiceError::RateLimited { retry_after_ms })
917                    if retry_after_ms > 0
918                ),
919                "expected RateLimited, got {err:?}"
920            );
921        })
922        .await;
923    }
924
925    #[tokio::test]
926    async fn cost_metadata_surfaced() {
927        let body = serde_json::to_vec(&json!({
928            "data": { "items": [] },
929            "extensions": {
930                "cost": {
931                    "throttleStatus": "PASS",
932                    "maximumAvailable": 10000,
933                    "currentlyAvailable": 9800,
934                    "actualQueryCost": 42,
935                    "restoreRate": 500
936                }
937            }
938        }))
939        .unwrap();
940
941        MockGraphQlServer::run_with(200, body, |url| async move {
942            let svc = make_service(None);
943            let input = ServiceInput {
944                url,
945                params: json!({ "query": "{ items { id } }" }),
946            };
947            let output = svc.execute(input).await.unwrap();
948            let cost = &output.metadata["cost"];
949            assert_eq!(cost["actualQueryCost"], 42);
950            assert_eq!(cost["throttleStatus"], "PASS");
951        })
952        .await;
953    }
954
955    #[tokio::test]
956    async fn cursor_pagination_accumulates_pages() {
957        // Two-page scenario: page 1 has next page, page 2 does not.
958        // We need two independent servers (one per page).
959        let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
960        let addr1 = listener1.local_addr().unwrap();
961        let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
962        let addr2 = listener2.local_addr().unwrap();
963
964        // Both pages go to the same host:port — use a single server that handles
965        // two sequential connections.
966        let page1_body = serde_json::to_vec(&json!({
967            "data": {
968                "items": {
969                    "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
970                    "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
971                }
972            }
973        }))
974        .unwrap();
975
976        let page2_body = serde_json::to_vec(&json!({
977            "data": {
978                "items": {
979                    "edges": [{"node": {"id": 3}}],
980                    "pageInfo": { "hasNextPage": false, "endCursor": null }
981                }
982            }
983        }))
984        .unwrap();
985
986        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
987        let addr = listener.local_addr().unwrap();
988        let url = format!("http://{addr}");
989
990        let bodies = vec![page1_body, page2_body];
991        tokio::spawn(async move {
992            for response_body in bodies {
993                if let Ok((mut stream, _)) = listener.accept().await {
994                    let mut buf = [0u8; 8192];
995                    let _ = stream.read(&mut buf).await;
996                    let mut resp = Vec::new();
997                    write!(
998                        resp,
999                        "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1000                        response_body.len()
1001                    ).unwrap();
1002                    resp.extend_from_slice(&response_body);
1003                    let _ = stream.write_all(&resp).await;
1004                }
1005            }
1006            // suppress unused warnings — listener1/2 and addr1/2 were created to
1007            // demonstrate the two-listener approach; the actual test uses a single listener
1008            let _ = listener1;
1009            let _ = listener2;
1010            let _ = addr1;
1011            let _ = addr2;
1012        });
1013
1014        let svc = make_service(None);
1015        let input = ServiceInput {
1016            url,
1017            params: json!({
1018                "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1019                "pagination": {
1020                    "strategy": "cursor",
1021                    "page_info_path": "data.items.pageInfo",
1022                    "edges_path": "data.items.edges",
1023                    "page_size": 2
1024                }
1025            }),
1026        };
1027
1028        let output = svc.execute(input).await.unwrap();
1029        let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1030        assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1031        assert_eq!(edges[0]["node"]["id"], 1);
1032        assert_eq!(edges[2]["node"]["id"], 3);
1033    }
1034
1035    #[tokio::test]
1036    async fn pagination_cap_prevents_infinite_loop() {
1037        // Every page reports hasNextPage=true — the cap should kick in.
1038        let page_body = serde_json::to_vec(&json!({
1039            "data": {
1040                "rows": {
1041                    "edges": [{"node": {"id": 1}}],
1042                    "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1043                }
1044            }
1045        }))
1046        .unwrap();
1047
1048        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1049        let addr = listener.local_addr().unwrap();
1050        let url = format!("http://{addr}");
1051
1052        let page_body_clone = page_body.clone();
1053        tokio::spawn(async move {
1054            while let Ok((mut stream, _)) = listener.accept().await {
1055                let mut buf = [0u8; 8192];
1056                let _ = stream.read(&mut buf).await;
1057                let mut resp = Vec::new();
1058                write!(
1059                    resp,
1060                    "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1061                    page_body_clone.len()
1062                )
1063                .unwrap();
1064                resp.extend_from_slice(&page_body_clone);
1065                let _ = stream.write_all(&resp).await;
1066            }
1067        });
1068
1069        // max_pages = 5 from make_service
1070        let svc = make_service(None);
1071        let input = ServiceInput {
1072            url,
1073            params: json!({
1074                "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1075                "pagination": {
1076                    "strategy": "cursor",
1077                    "page_info_path": "data.rows.pageInfo",
1078                    "edges_path": "data.rows.edges",
1079                    "page_size": 1
1080                }
1081            }),
1082        };
1083
1084        let err = svc.execute(input).await.unwrap_err();
1085        assert!(
1086            matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1087            "expected pagination cap error, got {err:?}"
1088        );
1089    }
1090}