1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18
19use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
20use crate::application::pipeline_parser::expand_template;
21use crate::domain::error::{StygianError, Result, ServiceError};
22use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
23
24#[derive(Debug, Clone)]
42pub struct GraphQlConfig {
43 pub timeout_secs: u64,
45 pub max_pages: usize,
47 pub user_agent: String,
49}
50
51impl Default for GraphQlConfig {
52 fn default() -> Self {
53 Self {
54 timeout_secs: 30,
55 max_pages: 1000,
56 user_agent: "stygian-graph/1.0".to_string(),
57 }
58 }
59}
60
61pub struct GraphQlService {
95 client: reqwest::Client,
96 config: GraphQlConfig,
97 plugins: Option<Arc<GraphQlPluginRegistry>>,
98}
99
100impl GraphQlService {
101 pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
115 let client = reqwest::Client::builder()
116 .timeout(Duration::from_secs(config.timeout_secs))
117 .user_agent(&config.user_agent)
118 .build()
119 .unwrap_or_default();
120 Self {
121 client,
122 config,
123 plugins,
124 }
125 }
126
127 fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
131 let token = expand_template(&auth.token);
132 match auth.kind {
133 GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
134 GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
135 GraphQlAuthKind::Header => {
136 let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
137 builder.header(name, token)
138 }
139 GraphQlAuthKind::None => builder,
140 }
141 }
142
143 fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
145 let kind_str = val["kind"].as_str().unwrap_or("none");
146 let kind = match kind_str {
147 "bearer" => GraphQlAuthKind::Bearer,
148 "api_key" => GraphQlAuthKind::ApiKey,
149 "header" => GraphQlAuthKind::Header,
150 _ => GraphQlAuthKind::None,
151 };
152 if kind == GraphQlAuthKind::None {
153 return None;
154 }
155 let token = val["token"].as_str()?.to_string();
156 let header_name = val["header_name"].as_str().map(str::to_string);
157 Some(GraphQlAuth {
158 kind,
159 token,
160 header_name,
161 })
162 }
163
164 #[allow(clippy::indexing_slicing)]
171 fn detect_throttle(body: &Value) -> Option<u64> {
172 if body["extensions"]["cost"]["throttleStatus"]
174 .as_str()
175 .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
176 {
177 return Some(Self::throttle_backoff(body));
178 }
179
180 if let Some(errors) = body["errors"].as_array() {
182 for err in errors {
183 if err["extensions"]["code"]
184 .as_str()
185 .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
186 {
187 return Some(Self::throttle_backoff(body));
188 }
189 if err["message"]
190 .as_str()
191 .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
192 {
193 return Some(Self::throttle_backoff(body));
194 }
195 }
196 }
197
198 None
199 }
200
201 #[allow(
208 clippy::indexing_slicing,
209 clippy::cast_possible_truncation,
210 clippy::cast_sign_loss
211 )]
212 fn throttle_backoff(body: &Value) -> u64 {
213 let cost = &body["extensions"]["cost"];
214 let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
215 let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
216 let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
217 let deficit = (max_avail - cur_avail).max(0.0);
218 let ms = if restore_rate > 0.0 {
219 (deficit / restore_rate * 1000.0) as u64
220 } else {
221 2_000
222 };
223 ms.clamp(500, 2_000)
224 }
225
226 #[allow(clippy::indexing_slicing)]
228 fn extract_cost_metadata(body: &Value) -> Option<Value> {
229 let cost = &body["extensions"]["cost"];
230 if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
231 return None;
232 }
233 Some(cost.clone())
234 }
235
236 #[allow(clippy::indexing_slicing)]
238 fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
239 let mut cur = root;
240 for key in path.split('.') {
241 cur = &cur[key];
242 }
243 cur
244 }
245
246 #[allow(clippy::indexing_slicing)]
248 async fn post_query(
249 &self,
250 url: &str,
251 query: &str,
252 variables: &Value,
253 operation_name: Option<&str>,
254 auth: Option<&GraphQlAuth>,
255 extra_headers: &HashMap<String, String>,
256 ) -> Result<Value> {
257 let mut body = json!({ "query": query, "variables": variables });
258 if let Some(op) = operation_name {
259 body["operationName"] = json!(op);
260 }
261
262 let mut builder = self
263 .client
264 .post(url)
265 .header("Content-Type", "application/json")
266 .header("Accept", "application/json");
267
268 for (k, v) in extra_headers {
269 builder = builder.header(k.as_str(), v.as_str());
270 }
271
272 if let Some(a) = auth {
273 builder = Self::apply_auth(builder, a);
274 }
275
276 let resp = builder
277 .json(&body)
278 .send()
279 .await
280 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
281
282 let status = resp.status();
283 let text = resp
284 .text()
285 .await
286 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
287
288 if status.as_u16() >= 400 {
289 return Err(StygianError::Service(ServiceError::Unavailable(format!(
290 "HTTP {status}: {text}"
291 ))));
292 }
293
294 serde_json::from_str::<Value>(&text).map_err(|e| {
295 StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
296 })
297 }
298
299 #[allow(clippy::indexing_slicing)]
301 fn validate_body(body: &Value) -> Result<()> {
302 if let Some(retry_after_ms) = Self::detect_throttle(body) {
304 return Err(StygianError::Service(ServiceError::RateLimited {
305 retry_after_ms,
306 }));
307 }
308
309 if let Some(errors) = body["errors"].as_array()
310 && !errors.is_empty()
311 {
312 let msg = errors[0]["message"]
313 .as_str()
314 .unwrap_or("unknown GraphQL error")
315 .to_string();
316 return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
317 }
318
319 if body.get("data").is_none() {
321 return Err(StygianError::Service(ServiceError::InvalidResponse(
322 "missing 'data' key in GraphQL response".to_string(),
323 )));
324 }
325
326 Ok(())
327 }
328}
329
330#[async_trait]
335impl ScrapingService for GraphQlService {
336 fn name(&self) -> &'static str {
337 "graphql"
338 }
339
340 #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
356 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
357 let params = &input.params;
358
359 let plugin_name = params["plugin"].as_str();
361 let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
362 Some(registry.get(name)?)
363 } else {
364 None
365 };
366
367 let url = if !input.url.is_empty() {
369 input.url.clone()
370 } else if let Some(ref p) = plugin {
371 p.endpoint().to_string()
372 } else {
373 return Err(StygianError::Service(ServiceError::Unavailable(
374 "no URL provided and no plugin endpoint available".to_string(),
375 )));
376 };
377
378 let query = params["query"].as_str().ok_or_else(|| {
380 StygianError::Service(ServiceError::InvalidResponse(
381 "params.query is required".to_string(),
382 ))
383 })?;
384
385 let operation_name = params["operation_name"].as_str();
386 let mut variables = params["variables"].clone();
387 if variables.is_null() {
388 variables = json!({});
389 }
390
391 let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
393 Self::parse_auth(¶ms["auth"])
394 } else {
395 plugin.as_ref().and_then(|p| p.default_auth())
396 };
397
398 let mut extra_headers: HashMap<String, String> = params["headers"]
400 .as_object()
401 .map(|obj| {
402 obj.iter()
403 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
404 .collect()
405 })
406 .unwrap_or_default();
407
408 if let Some(ref p) = plugin {
410 for (k, v) in p.version_headers() {
411 extra_headers.insert(k, v);
412 }
413 }
414
415 let pag = ¶ms["pagination"];
417 let use_cursor = pag["strategy"].as_str() == Some("cursor");
418 let page_info_path = pag["page_info_path"]
419 .as_str()
420 .unwrap_or("data.pageInfo")
421 .to_string();
422 let edges_path = pag["edges_path"]
423 .as_str()
424 .unwrap_or("data.edges")
425 .to_string();
426 let page_size: u64 = pag["page_size"]
427 .as_u64()
428 .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
429
430 if use_cursor {
432 variables["first"] = json!(page_size);
434 variables["after"] = json!(null);
435
436 let mut all_edges: Vec<Value> = Vec::new();
437 let mut page = 0usize;
438 let mut cost_meta = json!(null);
439
440 loop {
441 if page >= self.config.max_pages {
442 return Err(StygianError::Service(ServiceError::InvalidResponse(
443 format!("pagination exceeded max_pages ({})", self.config.max_pages),
444 )));
445 }
446
447 let body = self
448 .post_query(
449 &url,
450 query,
451 &variables,
452 operation_name,
453 auth.as_ref(),
454 &extra_headers,
455 )
456 .await?;
457
458 Self::validate_body(&body)?;
459
460 let edges = Self::json_path(&body, &edges_path);
462 if let Some(arr) = edges.as_array() {
463 all_edges.extend(arr.iter().cloned());
464 }
465
466 let page_info = Self::json_path(&body, &page_info_path);
468 let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
469 let end_cursor = page_info["endCursor"].clone();
470
471 cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
472 page += 1;
473
474 if !has_next || end_cursor.is_null() {
475 break;
476 }
477 variables["after"] = end_cursor;
478 }
479
480 let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
481 Ok(ServiceOutput {
482 data: serde_json::to_string(&all_edges).unwrap_or_default(),
483 metadata,
484 })
485 } else {
486 let body = self
488 .post_query(
489 &url,
490 query,
491 &variables,
492 operation_name,
493 auth.as_ref(),
494 &extra_headers,
495 )
496 .await?;
497
498 Self::validate_body(&body)?;
499
500 let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
501 let metadata = json!({ "cost": cost_meta });
502
503 Ok(ServiceOutput {
504 data: serde_json::to_string(&body["data"]).unwrap_or_default(),
505 metadata,
506 })
507 }
508 }
509}
510
511#[cfg(test)]
516#[allow(
517 clippy::unwrap_used,
518 clippy::indexing_slicing,
519 clippy::needless_pass_by_value,
520 clippy::field_reassign_with_default,
521 clippy::unnecessary_literal_bound
522)]
523mod tests {
524 use super::*;
525 use std::collections::HashMap;
526 use std::io::Write;
527 use std::sync::Arc;
528
529 use serde_json::json;
530 use tokio::io::{AsyncReadExt, AsyncWriteExt};
531 use tokio::net::TcpListener;
532
533 use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
534 use crate::ports::graphql_plugin::GraphQlTargetPlugin;
535
536 struct MockGraphQlServer;
542
543 impl MockGraphQlServer {
544 async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
548 where
549 F: FnOnce(String) -> Fut,
550 Fut: std::future::Future<Output = ()>,
551 {
552 let body_bytes: Vec<u8> = body.into();
553 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554 let addr = listener.local_addr().unwrap();
555 let url = format!("http://{addr}");
556
557 let body_clone = body_bytes.clone();
558 tokio::spawn(async move {
559 if let Ok((mut stream, _)) = listener.accept().await {
560 let mut buf = [0u8; 4096];
561 let _ = stream.read(&mut buf).await;
562 let mut response = Vec::new();
564 write!(
565 response,
566 "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
567 body_clone.len()
568 ).unwrap();
569 response.extend_from_slice(&body_clone);
570 let _ = stream.write_all(&response).await;
571 }
572 });
573
574 f(url).await;
575 }
576
577 async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
579 where
580 F: FnOnce(String) -> Fut,
581 Fut: std::future::Future<Output = ()>,
582 {
583 let body_bytes: Vec<u8> = body.into();
584 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
585 let addr = listener.local_addr().unwrap();
586 let url = format!("http://{addr}");
587
588 let body_clone = body_bytes.clone();
589 let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
590 tokio::spawn(async move {
591 if let Ok((mut stream, _)) = listener.accept().await {
592 let mut buf = vec![0u8; 8192];
593 let n = stream.read(&mut buf).await.unwrap_or(0);
594 let request = buf[..n].to_vec();
595 let _ = tx.send(request);
596
597 let mut response = Vec::new();
598 write!(
599 response,
600 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
601 body_clone.len()
602 ).unwrap();
603 response.extend_from_slice(&body_clone);
604 let _ = stream.write_all(&response).await;
605 }
606 });
607
608 f(url).await;
609
610 rx.try_recv().unwrap_or_default()
611 }
612 }
613
614 fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
615 let mut config = GraphQlConfig::default();
616 config.max_pages = 5; GraphQlService::new(config, plugins)
618 }
619
620 fn simple_query_body(data: Value) -> Vec<u8> {
621 serde_json::to_vec(&json!({ "data": data })).unwrap()
622 }
623
624 #[tokio::test]
627 async fn execute_simple_query() {
628 let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
629 MockGraphQlServer::run_with(200, body, |url| async move {
630 let svc = make_service(None);
631 let input = ServiceInput {
632 url,
633 params: json!({ "query": "{ users { id } }" }),
634 };
635 let output = svc.execute(input).await.unwrap();
636 let data: Value = serde_json::from_str(&output.data).unwrap();
637 assert_eq!(data["users"][0]["id"], 1);
638 })
639 .await;
640 }
641
642 #[tokio::test]
643 async fn graphql_errors_in_200_response() {
644 let body =
645 serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
646 .unwrap();
647 MockGraphQlServer::run_with(200, body, |url| async move {
648 let svc = make_service(None);
649 let input = ServiceInput {
650 url,
651 params: json!({ "query": "{ missing }" }),
652 };
653 let err = svc.execute(input).await.unwrap_err();
654 assert!(
655 matches!(
656 err,
657 StygianError::Service(ServiceError::InvalidResponse(_))
658 ),
659 "expected InvalidResponse, got {err:?}"
660 );
661 })
662 .await;
663 }
664
665 #[tokio::test]
666 async fn http_error_returns_unavailable() {
667 let body = b"Internal Server Error".to_vec();
668 MockGraphQlServer::run_with(500, body, |url| async move {
669 let svc = make_service(None);
670 let input = ServiceInput {
671 url,
672 params: json!({ "query": "{ x }" }),
673 };
674 let err = svc.execute(input).await.unwrap_err();
675 assert!(
676 matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
677 "expected Unavailable, got {err:?}"
678 );
679 })
680 .await;
681 }
682
683 #[tokio::test]
684 async fn missing_data_key() {
685 let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
686 MockGraphQlServer::run_with(200, body, |url| async move {
687 let svc = make_service(None);
688 let input = ServiceInput {
689 url,
690 params: json!({ "query": "{ x }" }),
691 };
692 let err = svc.execute(input).await.unwrap_err();
693 assert!(
694 matches!(
695 err,
696 StygianError::Service(ServiceError::InvalidResponse(_))
697 ),
698 "expected InvalidResponse, got {err:?}"
699 );
700 })
701 .await;
702 }
703
704 #[tokio::test]
705 async fn bearer_auth_header_set() {
706 let body = simple_query_body(json!({}));
707 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
708 let svc = make_service(None);
709 let input = ServiceInput {
710 url,
711 params: json!({
712 "query": "{ x }",
713 "auth": { "kind": "bearer", "token": "test-token-123" }
714 }),
715 };
716 let _ = svc.execute(input).await;
717 })
718 .await;
719
720 let request_str = String::from_utf8_lossy(&request_bytes);
721 assert!(
722 request_str.contains("authorization: Bearer test-token-123"),
723 "auth header not found in request:\n{request_str}"
724 );
725 }
726
727 #[tokio::test]
728 async fn plugin_version_headers_merged() {
729 struct V1Plugin;
730 impl GraphQlTargetPlugin for V1Plugin {
731 fn name(&self) -> &str {
732 "v1"
733 }
734 fn endpoint(&self) -> &str {
735 "unused"
736 }
737 fn version_headers(&self) -> HashMap<String, String> {
738 [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
739 }
740 }
741
742 let mut registry = GraphQlPluginRegistry::new();
743 registry.register(Arc::new(V1Plugin));
744
745 let body = simple_query_body(json!({}));
746 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
747 let svc = make_service(Some(Arc::new(registry)));
748 let input = ServiceInput {
749 url,
750 params: json!({
751 "query": "{ x }",
752 "plugin": "v1"
753 }),
754 };
755 let _ = svc.execute(input).await;
756 })
757 .await;
758
759 let request_str = String::from_utf8_lossy(&request_bytes);
760 assert!(
761 request_str.contains("x-test-version: 2025-01-01"),
762 "version header not found:\n{request_str}"
763 );
764 }
765
766 #[tokio::test]
767 async fn plugin_default_auth_used_when_params_auth_absent() {
768 use crate::ports::{GraphQlAuth, GraphQlAuthKind};
769
770 struct TokenPlugin;
771 impl GraphQlTargetPlugin for TokenPlugin {
772 fn name(&self) -> &str {
773 "tokenplugin"
774 }
775 fn endpoint(&self) -> &str {
776 "unused"
777 }
778 fn default_auth(&self) -> Option<GraphQlAuth> {
779 Some(GraphQlAuth {
780 kind: GraphQlAuthKind::Bearer,
781 token: "plugin-default-token".to_string(),
782 header_name: None,
783 })
784 }
785 }
786
787 let mut registry = GraphQlPluginRegistry::new();
788 registry.register(Arc::new(TokenPlugin));
789
790 let body = simple_query_body(json!({}));
791 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
792 let svc = make_service(Some(Arc::new(registry)));
793 let input = ServiceInput {
794 url,
795 params: json!({
797 "query": "{ x }",
798 "plugin": "tokenplugin"
799 }),
800 };
801 let _ = svc.execute(input).await;
802 })
803 .await;
804
805 let request_str = String::from_utf8_lossy(&request_bytes);
806 assert!(
807 request_str.contains("Bearer plugin-default-token"),
808 "plugin default auth not applied:\n{request_str}"
809 );
810 }
811
812 #[tokio::test]
813 async fn throttle_response_returns_rate_limited() {
814 let body = serde_json::to_vec(&json!({
815 "data": null,
816 "extensions": {
817 "cost": {
818 "throttleStatus": "THROTTLED",
819 "maximumAvailable": 10000,
820 "currentlyAvailable": 0,
821 "restoreRate": 500
822 }
823 }
824 }))
825 .unwrap();
826
827 MockGraphQlServer::run_with(200, body, |url| async move {
828 let svc = make_service(None);
829 let input = ServiceInput {
830 url,
831 params: json!({ "query": "{ x }" }),
832 };
833 let err = svc.execute(input).await.unwrap_err();
834 assert!(
835 matches!(
836 err,
837 StygianError::Service(ServiceError::RateLimited { retry_after_ms })
838 if retry_after_ms > 0
839 ),
840 "expected RateLimited, got {err:?}"
841 );
842 })
843 .await;
844 }
845
846 #[tokio::test]
847 async fn cost_metadata_surfaced() {
848 let body = serde_json::to_vec(&json!({
849 "data": { "items": [] },
850 "extensions": {
851 "cost": {
852 "throttleStatus": "PASS",
853 "maximumAvailable": 10000,
854 "currentlyAvailable": 9800,
855 "actualQueryCost": 42,
856 "restoreRate": 500
857 }
858 }
859 }))
860 .unwrap();
861
862 MockGraphQlServer::run_with(200, body, |url| async move {
863 let svc = make_service(None);
864 let input = ServiceInput {
865 url,
866 params: json!({ "query": "{ items { id } }" }),
867 };
868 let output = svc.execute(input).await.unwrap();
869 let cost = &output.metadata["cost"];
870 assert_eq!(cost["actualQueryCost"], 42);
871 assert_eq!(cost["throttleStatus"], "PASS");
872 })
873 .await;
874 }
875
876 #[tokio::test]
877 async fn cursor_pagination_accumulates_pages() {
878 let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
881 let addr1 = listener1.local_addr().unwrap();
882 let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
883 let addr2 = listener2.local_addr().unwrap();
884
885 let page1_body = serde_json::to_vec(&json!({
888 "data": {
889 "items": {
890 "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
891 "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
892 }
893 }
894 }))
895 .unwrap();
896
897 let page2_body = serde_json::to_vec(&json!({
898 "data": {
899 "items": {
900 "edges": [{"node": {"id": 3}}],
901 "pageInfo": { "hasNextPage": false, "endCursor": null }
902 }
903 }
904 }))
905 .unwrap();
906
907 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
908 let addr = listener.local_addr().unwrap();
909 let url = format!("http://{addr}");
910
911 let bodies = vec![page1_body, page2_body];
912 tokio::spawn(async move {
913 for response_body in bodies {
914 if let Ok((mut stream, _)) = listener.accept().await {
915 let mut buf = [0u8; 8192];
916 let _ = stream.read(&mut buf).await;
917 let mut resp = Vec::new();
918 write!(
919 resp,
920 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
921 response_body.len()
922 ).unwrap();
923 resp.extend_from_slice(&response_body);
924 let _ = stream.write_all(&resp).await;
925 }
926 }
927 let _ = listener1;
930 let _ = listener2;
931 let _ = addr1;
932 let _ = addr2;
933 });
934
935 let svc = make_service(None);
936 let input = ServiceInput {
937 url,
938 params: json!({
939 "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
940 "pagination": {
941 "strategy": "cursor",
942 "page_info_path": "data.items.pageInfo",
943 "edges_path": "data.items.edges",
944 "page_size": 2
945 }
946 }),
947 };
948
949 let output = svc.execute(input).await.unwrap();
950 let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
951 assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
952 assert_eq!(edges[0]["node"]["id"], 1);
953 assert_eq!(edges[2]["node"]["id"], 3);
954 }
955
956 #[tokio::test]
957 async fn pagination_cap_prevents_infinite_loop() {
958 let page_body = serde_json::to_vec(&json!({
960 "data": {
961 "rows": {
962 "edges": [{"node": {"id": 1}}],
963 "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
964 }
965 }
966 }))
967 .unwrap();
968
969 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
970 let addr = listener.local_addr().unwrap();
971 let url = format!("http://{addr}");
972
973 let page_body_clone = page_body.clone();
974 tokio::spawn(async move {
975 while let Ok((mut stream, _)) = listener.accept().await {
976 let mut buf = [0u8; 8192];
977 let _ = stream.read(&mut buf).await;
978 let mut resp = Vec::new();
979 write!(
980 resp,
981 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
982 page_body_clone.len()
983 )
984 .unwrap();
985 resp.extend_from_slice(&page_body_clone);
986 let _ = stream.write_all(&resp).await;
987 }
988 });
989
990 let svc = make_service(None);
992 let input = ServiceInput {
993 url,
994 params: json!({
995 "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
996 "pagination": {
997 "strategy": "cursor",
998 "page_info_path": "data.rows.pageInfo",
999 "edges_path": "data.rows.edges",
1000 "page_size": 1
1001 }
1002 }),
1003 };
1004
1005 let err = svc.execute(input).await.unwrap_err();
1006 assert!(
1007 matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1008 "expected pagination cap error, got {err:?}"
1009 );
1010 }
1011}