1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18
19use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
20use crate::application::pipeline_parser::expand_template;
21use crate::domain::error::{Result, ServiceError, StygianError};
22use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
23
24#[derive(Debug, Clone)]
42pub struct GraphQlConfig {
43 pub timeout_secs: u64,
45 pub max_pages: usize,
47 pub user_agent: String,
49}
50
51impl Default for GraphQlConfig {
52 fn default() -> Self {
53 Self {
54 timeout_secs: 30,
55 max_pages: 1000,
56 user_agent: "stygian-graph/1.0".to_string(),
57 }
58 }
59}
60
61pub struct GraphQlService {
95 client: reqwest::Client,
96 config: GraphQlConfig,
97 plugins: Option<Arc<GraphQlPluginRegistry>>,
98}
99
100impl GraphQlService {
101 pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
115 let client = reqwest::Client::builder()
116 .timeout(Duration::from_secs(config.timeout_secs))
117 .user_agent(&config.user_agent)
118 .build()
119 .unwrap_or_default();
120 Self {
121 client,
122 config,
123 plugins,
124 }
125 }
126
127 fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
131 let token = expand_template(&auth.token);
132 match auth.kind {
133 GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
134 GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
135 GraphQlAuthKind::Header => {
136 let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
137 builder.header(name, token)
138 }
139 GraphQlAuthKind::None => builder,
140 }
141 }
142
143 fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
145 let kind_str = val["kind"].as_str().unwrap_or("none");
146 let kind = match kind_str {
147 "bearer" => GraphQlAuthKind::Bearer,
148 "api_key" => GraphQlAuthKind::ApiKey,
149 "header" => GraphQlAuthKind::Header,
150 _ => GraphQlAuthKind::None,
151 };
152 if kind == GraphQlAuthKind::None {
153 return None;
154 }
155 let token = val["token"].as_str()?.to_string();
156 let header_name = val["header_name"].as_str().map(str::to_string);
157 Some(GraphQlAuth {
158 kind,
159 token,
160 header_name,
161 })
162 }
163
164 #[allow(clippy::indexing_slicing)]
171 fn detect_throttle(body: &Value) -> Option<u64> {
172 if body["extensions"]["cost"]["throttleStatus"]
174 .as_str()
175 .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
176 {
177 return Some(Self::throttle_backoff(body));
178 }
179
180 if let Some(errors) = body["errors"].as_array() {
182 for err in errors {
183 if err["extensions"]["code"]
184 .as_str()
185 .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
186 {
187 return Some(Self::throttle_backoff(body));
188 }
189 if err["message"]
190 .as_str()
191 .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
192 {
193 return Some(Self::throttle_backoff(body));
194 }
195 }
196 }
197
198 None
199 }
200
201 #[allow(
208 clippy::indexing_slicing,
209 clippy::cast_possible_truncation,
210 clippy::cast_sign_loss
211 )]
212 fn throttle_backoff(body: &Value) -> u64 {
213 let cost = &body["extensions"]["cost"];
214 let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
215 let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
216 let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
217 let deficit = (max_avail - cur_avail).max(0.0);
218 let ms = if restore_rate > 0.0 {
219 (deficit / restore_rate * 1000.0) as u64
220 } else {
221 2_000
222 };
223 ms.clamp(500, 2_000)
224 }
225
226 #[allow(clippy::indexing_slicing)]
228 fn extract_cost_metadata(body: &Value) -> Option<Value> {
229 let cost = &body["extensions"]["cost"];
230 if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
231 return None;
232 }
233 Some(cost.clone())
234 }
235
236 #[allow(clippy::indexing_slicing)]
238 fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
239 let mut cur = root;
240 for key in path.split('.') {
241 cur = &cur[key];
242 }
243 cur
244 }
245
246 #[allow(clippy::indexing_slicing)]
248 async fn post_query(
249 &self,
250 url: &str,
251 query: &str,
252 variables: &Value,
253 operation_name: Option<&str>,
254 auth: Option<&GraphQlAuth>,
255 extra_headers: &HashMap<String, String>,
256 ) -> Result<Value> {
257 let mut body = json!({ "query": query, "variables": variables });
258 if let Some(op) = operation_name {
259 body["operationName"] = json!(op);
260 }
261
262 let mut builder = self
263 .client
264 .post(url)
265 .header("Content-Type", "application/json")
266 .header("Accept", "application/json");
267
268 for (k, v) in extra_headers {
269 builder = builder.header(k.as_str(), v.as_str());
270 }
271
272 if let Some(a) = auth {
273 builder = Self::apply_auth(builder, a);
274 }
275
276 let resp = builder
277 .json(&body)
278 .send()
279 .await
280 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
281
282 let status = resp.status();
283 let text = resp
284 .text()
285 .await
286 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
287
288 if status.as_u16() >= 400 {
289 return Err(StygianError::Service(ServiceError::Unavailable(format!(
290 "HTTP {status}: {text}"
291 ))));
292 }
293
294 serde_json::from_str::<Value>(&text).map_err(|e| {
295 StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
296 })
297 }
298
299 #[allow(clippy::indexing_slicing)]
301 fn validate_body(body: &Value) -> Result<()> {
302 if let Some(retry_after_ms) = Self::detect_throttle(body) {
304 return Err(StygianError::Service(ServiceError::RateLimited {
305 retry_after_ms,
306 }));
307 }
308
309 if let Some(errors) = body["errors"].as_array()
310 && !errors.is_empty()
311 {
312 let msg = errors[0]["message"]
313 .as_str()
314 .unwrap_or("unknown GraphQL error")
315 .to_string();
316 return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
317 }
318
319 if body.get("data").is_none() {
321 return Err(StygianError::Service(ServiceError::InvalidResponse(
322 "missing 'data' key in GraphQL response".to_string(),
323 )));
324 }
325
326 Ok(())
327 }
328}
329
330#[async_trait]
335impl ScrapingService for GraphQlService {
336 fn name(&self) -> &'static str {
337 "graphql"
338 }
339
340 #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
356 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
357 let params = &input.params;
358
359 let plugin_name = params["plugin"].as_str();
361 let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
362 Some(registry.get(name)?)
363 } else {
364 None
365 };
366
367 let url = if !input.url.is_empty() {
369 input.url.clone()
370 } else if let Some(ref p) = plugin {
371 p.endpoint().to_string()
372 } else {
373 return Err(StygianError::Service(ServiceError::Unavailable(
374 "no URL provided and no plugin endpoint available".to_string(),
375 )));
376 };
377
378 let query = params["query"].as_str().ok_or_else(|| {
380 StygianError::Service(ServiceError::InvalidResponse(
381 "params.query is required".to_string(),
382 ))
383 })?;
384
385 let operation_name = params["operation_name"].as_str();
386 let mut variables = params["variables"].clone();
387 if variables.is_null() {
388 variables = json!({});
389 }
390
391 let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
393 Self::parse_auth(¶ms["auth"])
394 } else {
395 plugin.as_ref().and_then(|p| p.default_auth())
396 };
397
398 let mut extra_headers: HashMap<String, String> = params["headers"]
400 .as_object()
401 .map(|obj| {
402 obj.iter()
403 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
404 .collect()
405 })
406 .unwrap_or_default();
407
408 if let Some(ref p) = plugin {
410 for (k, v) in p.version_headers() {
411 extra_headers.insert(k, v);
412 }
413 }
414
415 let pag = ¶ms["pagination"];
417 let use_cursor = pag["strategy"].as_str() == Some("cursor");
418 let page_info_path = pag["page_info_path"]
419 .as_str()
420 .unwrap_or("data.pageInfo")
421 .to_string();
422 let edges_path = pag["edges_path"]
423 .as_str()
424 .unwrap_or("data.edges")
425 .to_string();
426 let page_size: u64 = pag["page_size"]
427 .as_u64()
428 .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
429
430 if use_cursor {
432 variables["first"] = json!(page_size);
434 variables["after"] = json!(null);
435
436 let mut all_edges: Vec<Value> = Vec::new();
437 let mut page = 0usize;
438 let mut cost_meta = json!(null);
439
440 loop {
441 if page >= self.config.max_pages {
442 return Err(StygianError::Service(ServiceError::InvalidResponse(
443 format!("pagination exceeded max_pages ({})", self.config.max_pages),
444 )));
445 }
446
447 let body = self
448 .post_query(
449 &url,
450 query,
451 &variables,
452 operation_name,
453 auth.as_ref(),
454 &extra_headers,
455 )
456 .await?;
457
458 Self::validate_body(&body)?;
459
460 let edges = Self::json_path(&body, &edges_path);
462 if let Some(arr) = edges.as_array() {
463 all_edges.extend(arr.iter().cloned());
464 }
465
466 let page_info = Self::json_path(&body, &page_info_path);
468 let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
469 let end_cursor = page_info["endCursor"].clone();
470
471 cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
472 page += 1;
473
474 if !has_next || end_cursor.is_null() {
475 break;
476 }
477 variables["after"] = end_cursor;
478 }
479
480 let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
481 Ok(ServiceOutput {
482 data: serde_json::to_string(&all_edges).unwrap_or_default(),
483 metadata,
484 })
485 } else {
486 let body = self
488 .post_query(
489 &url,
490 query,
491 &variables,
492 operation_name,
493 auth.as_ref(),
494 &extra_headers,
495 )
496 .await?;
497
498 Self::validate_body(&body)?;
499
500 let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
501 let metadata = json!({ "cost": cost_meta });
502
503 Ok(ServiceOutput {
504 data: serde_json::to_string(&body["data"]).unwrap_or_default(),
505 metadata,
506 })
507 }
508 }
509}
510
511#[cfg(test)]
516#[allow(
517 clippy::unwrap_used,
518 clippy::indexing_slicing,
519 clippy::needless_pass_by_value,
520 clippy::field_reassign_with_default,
521 clippy::unnecessary_literal_bound
522)]
523mod tests {
524 use super::*;
525 use std::collections::HashMap;
526 use std::io::Write;
527 use std::sync::Arc;
528
529 use serde_json::json;
530 use tokio::io::{AsyncReadExt, AsyncWriteExt};
531 use tokio::net::TcpListener;
532
533 use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
534 use crate::ports::graphql_plugin::GraphQlTargetPlugin;
535
536 struct MockGraphQlServer;
542
543 impl MockGraphQlServer {
544 async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
548 where
549 F: FnOnce(String) -> Fut,
550 Fut: std::future::Future<Output = ()>,
551 {
552 let body_bytes: Vec<u8> = body.into();
553 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554 let addr = listener.local_addr().unwrap();
555 let url = format!("http://{addr}");
556
557 let body_clone = body_bytes.clone();
558 tokio::spawn(async move {
559 if let Ok((mut stream, _)) = listener.accept().await {
560 let mut buf = [0u8; 4096];
561 let _ = stream.read(&mut buf).await;
562 let mut response = Vec::new();
564 write!(
565 response,
566 "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
567 body_clone.len()
568 ).unwrap();
569 response.extend_from_slice(&body_clone);
570 let _ = stream.write_all(&response).await;
571 }
572 });
573
574 f(url).await;
575 }
576
577 async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
579 where
580 F: FnOnce(String) -> Fut,
581 Fut: std::future::Future<Output = ()>,
582 {
583 let body_bytes: Vec<u8> = body.into();
584 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
585 let addr = listener.local_addr().unwrap();
586 let url = format!("http://{addr}");
587
588 let body_clone = body_bytes.clone();
589 let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
590 tokio::spawn(async move {
591 if let Ok((mut stream, _)) = listener.accept().await {
592 let mut buf = vec![0u8; 8192];
593 let n = stream.read(&mut buf).await.unwrap_or(0);
594 let request = buf[..n].to_vec();
595 let _ = tx.send(request);
596
597 let mut response = Vec::new();
598 write!(
599 response,
600 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
601 body_clone.len()
602 ).unwrap();
603 response.extend_from_slice(&body_clone);
604 let _ = stream.write_all(&response).await;
605 }
606 });
607
608 f(url).await;
609
610 rx.try_recv().unwrap_or_default()
611 }
612 }
613
614 fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
615 let mut config = GraphQlConfig::default();
616 config.max_pages = 5; GraphQlService::new(config, plugins)
618 }
619
620 fn simple_query_body(data: Value) -> Vec<u8> {
621 serde_json::to_vec(&json!({ "data": data })).unwrap()
622 }
623
624 #[tokio::test]
627 async fn execute_simple_query() {
628 let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
629 MockGraphQlServer::run_with(200, body, |url| async move {
630 let svc = make_service(None);
631 let input = ServiceInput {
632 url,
633 params: json!({ "query": "{ users { id } }" }),
634 };
635 let output = svc.execute(input).await.unwrap();
636 let data: Value = serde_json::from_str(&output.data).unwrap();
637 assert_eq!(data["users"][0]["id"], 1);
638 })
639 .await;
640 }
641
642 #[tokio::test]
643 async fn graphql_errors_in_200_response() {
644 let body =
645 serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
646 .unwrap();
647 MockGraphQlServer::run_with(200, body, |url| async move {
648 let svc = make_service(None);
649 let input = ServiceInput {
650 url,
651 params: json!({ "query": "{ missing }" }),
652 };
653 let err = svc.execute(input).await.unwrap_err();
654 assert!(
655 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
656 "expected InvalidResponse, got {err:?}"
657 );
658 })
659 .await;
660 }
661
662 #[tokio::test]
663 async fn http_error_returns_unavailable() {
664 let body = b"Internal Server Error".to_vec();
665 MockGraphQlServer::run_with(500, body, |url| async move {
666 let svc = make_service(None);
667 let input = ServiceInput {
668 url,
669 params: json!({ "query": "{ x }" }),
670 };
671 let err = svc.execute(input).await.unwrap_err();
672 assert!(
673 matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
674 "expected Unavailable, got {err:?}"
675 );
676 })
677 .await;
678 }
679
680 #[tokio::test]
681 async fn missing_data_key() {
682 let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
683 MockGraphQlServer::run_with(200, body, |url| async move {
684 let svc = make_service(None);
685 let input = ServiceInput {
686 url,
687 params: json!({ "query": "{ x }" }),
688 };
689 let err = svc.execute(input).await.unwrap_err();
690 assert!(
691 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
692 "expected InvalidResponse, got {err:?}"
693 );
694 })
695 .await;
696 }
697
698 #[tokio::test]
699 async fn bearer_auth_header_set() {
700 let body = simple_query_body(json!({}));
701 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
702 let svc = make_service(None);
703 let input = ServiceInput {
704 url,
705 params: json!({
706 "query": "{ x }",
707 "auth": { "kind": "bearer", "token": "test-token-123" }
708 }),
709 };
710 let _ = svc.execute(input).await;
711 })
712 .await;
713
714 let request_str = String::from_utf8_lossy(&request_bytes);
715 assert!(
716 request_str.contains("authorization: Bearer test-token-123"),
717 "auth header not found in request:\n{request_str}"
718 );
719 }
720
721 #[tokio::test]
722 async fn plugin_version_headers_merged() {
723 struct V1Plugin;
724 impl GraphQlTargetPlugin for V1Plugin {
725 fn name(&self) -> &str {
726 "v1"
727 }
728 fn endpoint(&self) -> &str {
729 "unused"
730 }
731 fn version_headers(&self) -> HashMap<String, String> {
732 [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
733 }
734 }
735
736 let mut registry = GraphQlPluginRegistry::new();
737 registry.register(Arc::new(V1Plugin));
738
739 let body = simple_query_body(json!({}));
740 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
741 let svc = make_service(Some(Arc::new(registry)));
742 let input = ServiceInput {
743 url,
744 params: json!({
745 "query": "{ x }",
746 "plugin": "v1"
747 }),
748 };
749 let _ = svc.execute(input).await;
750 })
751 .await;
752
753 let request_str = String::from_utf8_lossy(&request_bytes);
754 assert!(
755 request_str.contains("x-test-version: 2025-01-01"),
756 "version header not found:\n{request_str}"
757 );
758 }
759
760 #[tokio::test]
761 async fn plugin_default_auth_used_when_params_auth_absent() {
762 use crate::ports::{GraphQlAuth, GraphQlAuthKind};
763
764 struct TokenPlugin;
765 impl GraphQlTargetPlugin for TokenPlugin {
766 fn name(&self) -> &str {
767 "tokenplugin"
768 }
769 fn endpoint(&self) -> &str {
770 "unused"
771 }
772 fn default_auth(&self) -> Option<GraphQlAuth> {
773 Some(GraphQlAuth {
774 kind: GraphQlAuthKind::Bearer,
775 token: "plugin-default-token".to_string(),
776 header_name: None,
777 })
778 }
779 }
780
781 let mut registry = GraphQlPluginRegistry::new();
782 registry.register(Arc::new(TokenPlugin));
783
784 let body = simple_query_body(json!({}));
785 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
786 let svc = make_service(Some(Arc::new(registry)));
787 let input = ServiceInput {
788 url,
789 params: json!({
791 "query": "{ x }",
792 "plugin": "tokenplugin"
793 }),
794 };
795 let _ = svc.execute(input).await;
796 })
797 .await;
798
799 let request_str = String::from_utf8_lossy(&request_bytes);
800 assert!(
801 request_str.contains("Bearer plugin-default-token"),
802 "plugin default auth not applied:\n{request_str}"
803 );
804 }
805
806 #[tokio::test]
807 async fn throttle_response_returns_rate_limited() {
808 let body = serde_json::to_vec(&json!({
809 "data": null,
810 "extensions": {
811 "cost": {
812 "throttleStatus": "THROTTLED",
813 "maximumAvailable": 10000,
814 "currentlyAvailable": 0,
815 "restoreRate": 500
816 }
817 }
818 }))
819 .unwrap();
820
821 MockGraphQlServer::run_with(200, body, |url| async move {
822 let svc = make_service(None);
823 let input = ServiceInput {
824 url,
825 params: json!({ "query": "{ x }" }),
826 };
827 let err = svc.execute(input).await.unwrap_err();
828 assert!(
829 matches!(
830 err,
831 StygianError::Service(ServiceError::RateLimited { retry_after_ms })
832 if retry_after_ms > 0
833 ),
834 "expected RateLimited, got {err:?}"
835 );
836 })
837 .await;
838 }
839
840 #[tokio::test]
841 async fn cost_metadata_surfaced() {
842 let body = serde_json::to_vec(&json!({
843 "data": { "items": [] },
844 "extensions": {
845 "cost": {
846 "throttleStatus": "PASS",
847 "maximumAvailable": 10000,
848 "currentlyAvailable": 9800,
849 "actualQueryCost": 42,
850 "restoreRate": 500
851 }
852 }
853 }))
854 .unwrap();
855
856 MockGraphQlServer::run_with(200, body, |url| async move {
857 let svc = make_service(None);
858 let input = ServiceInput {
859 url,
860 params: json!({ "query": "{ items { id } }" }),
861 };
862 let output = svc.execute(input).await.unwrap();
863 let cost = &output.metadata["cost"];
864 assert_eq!(cost["actualQueryCost"], 42);
865 assert_eq!(cost["throttleStatus"], "PASS");
866 })
867 .await;
868 }
869
870 #[tokio::test]
871 async fn cursor_pagination_accumulates_pages() {
872 let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
875 let addr1 = listener1.local_addr().unwrap();
876 let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
877 let addr2 = listener2.local_addr().unwrap();
878
879 let page1_body = serde_json::to_vec(&json!({
882 "data": {
883 "items": {
884 "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
885 "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
886 }
887 }
888 }))
889 .unwrap();
890
891 let page2_body = serde_json::to_vec(&json!({
892 "data": {
893 "items": {
894 "edges": [{"node": {"id": 3}}],
895 "pageInfo": { "hasNextPage": false, "endCursor": null }
896 }
897 }
898 }))
899 .unwrap();
900
901 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
902 let addr = listener.local_addr().unwrap();
903 let url = format!("http://{addr}");
904
905 let bodies = vec![page1_body, page2_body];
906 tokio::spawn(async move {
907 for response_body in bodies {
908 if let Ok((mut stream, _)) = listener.accept().await {
909 let mut buf = [0u8; 8192];
910 let _ = stream.read(&mut buf).await;
911 let mut resp = Vec::new();
912 write!(
913 resp,
914 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
915 response_body.len()
916 ).unwrap();
917 resp.extend_from_slice(&response_body);
918 let _ = stream.write_all(&resp).await;
919 }
920 }
921 let _ = listener1;
924 let _ = listener2;
925 let _ = addr1;
926 let _ = addr2;
927 });
928
929 let svc = make_service(None);
930 let input = ServiceInput {
931 url,
932 params: json!({
933 "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
934 "pagination": {
935 "strategy": "cursor",
936 "page_info_path": "data.items.pageInfo",
937 "edges_path": "data.items.edges",
938 "page_size": 2
939 }
940 }),
941 };
942
943 let output = svc.execute(input).await.unwrap();
944 let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
945 assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
946 assert_eq!(edges[0]["node"]["id"], 1);
947 assert_eq!(edges[2]["node"]["id"], 3);
948 }
949
950 #[tokio::test]
951 async fn pagination_cap_prevents_infinite_loop() {
952 let page_body = serde_json::to_vec(&json!({
954 "data": {
955 "rows": {
956 "edges": [{"node": {"id": 1}}],
957 "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
958 }
959 }
960 }))
961 .unwrap();
962
963 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
964 let addr = listener.local_addr().unwrap();
965 let url = format!("http://{addr}");
966
967 let page_body_clone = page_body.clone();
968 tokio::spawn(async move {
969 while let Ok((mut stream, _)) = listener.accept().await {
970 let mut buf = [0u8; 8192];
971 let _ = stream.read(&mut buf).await;
972 let mut resp = Vec::new();
973 write!(
974 resp,
975 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
976 page_body_clone.len()
977 )
978 .unwrap();
979 resp.extend_from_slice(&page_body_clone);
980 let _ = stream.write_all(&resp).await;
981 }
982 });
983
984 let svc = make_service(None);
986 let input = ServiceInput {
987 url,
988 params: json!({
989 "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
990 "pagination": {
991 "strategy": "cursor",
992 "page_info_path": "data.rows.pageInfo",
993 "edges_path": "data.rows.edges",
994 "page_size": 1
995 }
996 }),
997 };
998
999 let err = svc.execute(input).await.unwrap_err();
1000 assert!(
1001 matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1002 "expected pagination cap error, got {err:?}"
1003 );
1004 }
1005}