1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use serde_json::{Value, json};
18use tokio::sync::RwLock;
19
20use crate::adapters::graphql_throttle::{PluginBudget, pre_flight_delay, update_budget};
21use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
22use crate::application::pipeline_parser::expand_template;
23use crate::domain::error::{Result, ServiceError, StygianError};
24use crate::ports::auth::ErasedAuthPort;
25use crate::ports::{GraphQlAuth, GraphQlAuthKind, ScrapingService, ServiceInput, ServiceOutput};
26
27#[derive(Debug, Clone)]
45pub struct GraphQlConfig {
46 pub timeout_secs: u64,
48 pub max_pages: usize,
50 pub user_agent: String,
52}
53
54impl Default for GraphQlConfig {
55 fn default() -> Self {
56 Self {
57 timeout_secs: 30,
58 max_pages: 1000,
59 user_agent: "stygian-graph/1.0".to_string(),
60 }
61 }
62}
63
64pub struct GraphQlService {
98 client: reqwest::Client,
99 config: GraphQlConfig,
100 plugins: Option<Arc<GraphQlPluginRegistry>>,
101 auth_port: Option<Arc<dyn ErasedAuthPort>>,
103 budgets: Arc<RwLock<HashMap<String, PluginBudget>>>,
105}
106
107impl GraphQlService {
108 pub fn new(config: GraphQlConfig, plugins: Option<Arc<GraphQlPluginRegistry>>) -> Self {
122 let client = reqwest::Client::builder()
123 .timeout(Duration::from_secs(config.timeout_secs))
124 .user_agent(&config.user_agent)
125 .build()
126 .unwrap_or_default();
127 Self {
128 client,
129 config,
130 plugins,
131 auth_port: None,
132 budgets: Arc::new(RwLock::new(HashMap::new())),
133 }
134 }
135
136 #[must_use]
154 pub fn with_auth_port(mut self, port: Arc<dyn ErasedAuthPort>) -> Self {
155 self.auth_port = Some(port);
156 self
157 }
158
159 fn apply_auth(builder: reqwest::RequestBuilder, auth: &GraphQlAuth) -> reqwest::RequestBuilder {
163 let token = expand_template(&auth.token);
164 match auth.kind {
165 GraphQlAuthKind::Bearer => builder.header("Authorization", format!("Bearer {token}")),
166 GraphQlAuthKind::ApiKey => builder.header("X-Api-Key", token),
167 GraphQlAuthKind::Header => {
168 let name = auth.header_name.as_deref().unwrap_or("X-Api-Key");
169 builder.header(name, token)
170 }
171 GraphQlAuthKind::None => builder,
172 }
173 }
174
175 fn parse_auth(val: &Value) -> Option<GraphQlAuth> {
177 let kind_str = val["kind"].as_str().unwrap_or("none");
178 let kind = match kind_str {
179 "bearer" => GraphQlAuthKind::Bearer,
180 "api_key" => GraphQlAuthKind::ApiKey,
181 "header" => GraphQlAuthKind::Header,
182 _ => GraphQlAuthKind::None,
183 };
184 if kind == GraphQlAuthKind::None {
185 return None;
186 }
187 let token = val["token"].as_str()?.to_string();
188 let header_name = val["header_name"].as_str().map(str::to_string);
189 Some(GraphQlAuth {
190 kind,
191 token,
192 header_name,
193 })
194 }
195
196 #[allow(clippy::indexing_slicing)]
203 fn detect_throttle(body: &Value) -> Option<u64> {
204 if body["extensions"]["cost"]["throttleStatus"]
206 .as_str()
207 .is_some_and(|s| s.eq_ignore_ascii_case("THROTTLED"))
208 {
209 return Some(Self::throttle_backoff(body));
210 }
211
212 if let Some(errors) = body["errors"].as_array() {
214 for err in errors {
215 if err["extensions"]["code"]
216 .as_str()
217 .is_some_and(|c| c.eq_ignore_ascii_case("THROTTLED"))
218 {
219 return Some(Self::throttle_backoff(body));
220 }
221 if err["message"]
222 .as_str()
223 .is_some_and(|m| m.to_ascii_lowercase().contains("throttled"))
224 {
225 return Some(Self::throttle_backoff(body));
226 }
227 }
228 }
229
230 None
231 }
232
233 #[allow(
240 clippy::indexing_slicing,
241 clippy::cast_possible_truncation,
242 clippy::cast_sign_loss
243 )]
244 fn throttle_backoff(body: &Value) -> u64 {
245 let cost = &body["extensions"]["cost"];
246 let max_avail = cost["maximumAvailable"].as_f64().unwrap_or(10_000.0);
247 let cur_avail = cost["currentlyAvailable"].as_f64().unwrap_or(0.0);
248 let restore_rate = cost["restoreRate"].as_f64().unwrap_or(500.0);
249 let deficit = (max_avail - cur_avail).max(0.0);
250 let ms = if restore_rate > 0.0 {
251 (deficit / restore_rate * 1000.0) as u64
252 } else {
253 2_000
254 };
255 ms.clamp(500, 2_000)
256 }
257
258 #[allow(clippy::indexing_slicing)]
260 fn extract_cost_metadata(body: &Value) -> Option<Value> {
261 let cost = &body["extensions"]["cost"];
262 if cost.is_null() || cost.is_object() && cost.as_object()?.is_empty() {
263 return None;
264 }
265 Some(cost.clone())
266 }
267
268 #[allow(clippy::indexing_slicing)]
270 fn json_path<'v>(root: &'v Value, path: &str) -> &'v Value {
271 let mut cur = root;
272 for key in path.split('.') {
273 cur = &cur[key];
274 }
275 cur
276 }
277
278 #[allow(clippy::indexing_slicing)]
280 async fn post_query(
281 &self,
282 url: &str,
283 query: &str,
284 variables: &Value,
285 operation_name: Option<&str>,
286 auth: Option<&GraphQlAuth>,
287 extra_headers: &HashMap<String, String>,
288 ) -> Result<Value> {
289 let mut body = json!({ "query": query, "variables": variables });
290 if let Some(op) = operation_name {
291 body["operationName"] = json!(op);
292 }
293
294 let mut builder = self
295 .client
296 .post(url)
297 .header("Content-Type", "application/json")
298 .header("Accept", "application/json");
299
300 for (k, v) in extra_headers {
301 builder = builder.header(k.as_str(), v.as_str());
302 }
303
304 if let Some(a) = auth {
305 builder = Self::apply_auth(builder, a);
306 }
307
308 let resp = builder
309 .json(&body)
310 .send()
311 .await
312 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
313
314 let status = resp.status();
315 let text = resp
316 .text()
317 .await
318 .map_err(|e| StygianError::Service(ServiceError::Unavailable(e.to_string())))?;
319
320 if status.as_u16() >= 400 {
321 return Err(StygianError::Service(ServiceError::Unavailable(format!(
322 "HTTP {status}: {text}"
323 ))));
324 }
325
326 serde_json::from_str::<Value>(&text).map_err(|e| {
327 StygianError::Service(ServiceError::InvalidResponse(format!("invalid JSON: {e}")))
328 })
329 }
330
331 #[allow(clippy::indexing_slicing)]
333 fn validate_body(body: &Value) -> Result<()> {
334 if let Some(retry_after_ms) = Self::detect_throttle(body) {
336 return Err(StygianError::Service(ServiceError::RateLimited {
337 retry_after_ms,
338 }));
339 }
340
341 if let Some(errors) = body["errors"].as_array()
342 && !errors.is_empty()
343 {
344 let msg = errors[0]["message"]
345 .as_str()
346 .unwrap_or("unknown GraphQL error")
347 .to_string();
348 return Err(StygianError::Service(ServiceError::InvalidResponse(msg)));
349 }
350
351 if body.get("data").is_none() {
353 return Err(StygianError::Service(ServiceError::InvalidResponse(
354 "missing 'data' key in GraphQL response".to_string(),
355 )));
356 }
357
358 Ok(())
359 }
360}
361
362#[async_trait]
367impl ScrapingService for GraphQlService {
368 fn name(&self) -> &'static str {
369 "graphql"
370 }
371
372 #[allow(clippy::too_many_lines, clippy::indexing_slicing)]
388 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
389 let params = &input.params;
390
391 let plugin_name = params["plugin"].as_str();
393 let plugin = if let (Some(name), Some(registry)) = (plugin_name, &self.plugins) {
394 Some(registry.get(name)?)
395 } else {
396 None
397 };
398
399 let url = if !input.url.is_empty() {
401 input.url.clone()
402 } else if let Some(ref p) = plugin {
403 p.endpoint().to_string()
404 } else {
405 return Err(StygianError::Service(ServiceError::Unavailable(
406 "no URL provided and no plugin endpoint available".to_string(),
407 )));
408 };
409
410 let query = params["query"].as_str().ok_or_else(|| {
412 StygianError::Service(ServiceError::InvalidResponse(
413 "params.query is required".to_string(),
414 ))
415 })?;
416
417 let operation_name = params["operation_name"].as_str();
418 let mut variables = params["variables"].clone();
419 if variables.is_null() {
420 variables = json!({});
421 }
422
423 let auth: Option<GraphQlAuth> = if !params["auth"].is_null() && params["auth"].is_object() {
425 Self::parse_auth(¶ms["auth"])
426 } else if let Some(ref p) = plugin {
427 p.default_auth()
428 } else if let Some(ref port) = self.auth_port {
429 match port.erased_resolve_token().await {
431 Ok(token) => Some(GraphQlAuth {
432 kind: GraphQlAuthKind::Bearer,
433 token,
434 header_name: None,
435 }),
436 Err(e) => {
437 tracing::warn!("auth port failed: {e}; proceeding without auth");
438 None
439 }
440 }
441 } else {
442 None
443 };
444
445 let maybe_budget: Option<PluginBudget> = if let Some(ref p) = plugin {
447 if let Some(throttle_cfg) = p.cost_throttle_config() {
448 let name = p.name().to_string();
449 {
451 let read = self.budgets.read().await;
452 if let Some(b) = read.get(&name) {
453 let b_clone = b.clone();
454 drop(read);
455 pre_flight_delay(&b_clone).await;
456 Some(b_clone)
457 } else {
458 drop(read);
459 let new_budget = PluginBudget::new(throttle_cfg);
461 self.budgets.write().await.insert(name, new_budget.clone());
462 pre_flight_delay(&new_budget).await;
463 Some(new_budget)
464 }
465 }
466 } else {
467 None
468 }
469 } else {
470 None
471 };
472
473 let mut extra_headers: HashMap<String, String> = params["headers"]
475 .as_object()
476 .map(|obj| {
477 obj.iter()
478 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
479 .collect()
480 })
481 .unwrap_or_default();
482
483 if let Some(ref p) = plugin {
485 for (k, v) in p.version_headers() {
486 extra_headers.insert(k, v);
487 }
488 }
489
490 let pag = ¶ms["pagination"];
492 let use_cursor = pag["strategy"].as_str() == Some("cursor");
493 let page_info_path = pag["page_info_path"]
494 .as_str()
495 .unwrap_or("data.pageInfo")
496 .to_string();
497 let edges_path = pag["edges_path"]
498 .as_str()
499 .unwrap_or("data.edges")
500 .to_string();
501 let page_size: u64 = pag["page_size"]
502 .as_u64()
503 .unwrap_or_else(|| plugin.as_ref().map_or(50, |p| p.default_page_size() as u64));
504
505 if use_cursor {
507 variables["first"] = json!(page_size);
509 variables["after"] = json!(null);
510
511 let mut all_edges: Vec<Value> = Vec::new();
512 let mut page = 0usize;
513 let mut cost_meta = json!(null);
514
515 loop {
516 if page >= self.config.max_pages {
517 return Err(StygianError::Service(ServiceError::InvalidResponse(
518 format!("pagination exceeded max_pages ({})", self.config.max_pages),
519 )));
520 }
521
522 let body = self
523 .post_query(
524 &url,
525 query,
526 &variables,
527 operation_name,
528 auth.as_ref(),
529 &extra_headers,
530 )
531 .await?;
532
533 Self::validate_body(&body)?;
534
535 if let Some(ref b) = maybe_budget {
537 update_budget(b, &body).await;
538 }
539
540 let edges = Self::json_path(&body, &edges_path);
542 if let Some(arr) = edges.as_array() {
543 all_edges.extend(arr.iter().cloned());
544 }
545
546 let page_info = Self::json_path(&body, &page_info_path);
548 let has_next = page_info["hasNextPage"].as_bool().unwrap_or(false);
549 let end_cursor = page_info["endCursor"].clone();
550
551 cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
552 page += 1;
553
554 if !has_next || end_cursor.is_null() {
555 break;
556 }
557 variables["after"] = end_cursor;
558 }
559
560 let metadata = json!({ "cost": cost_meta, "pages_fetched": page });
561 Ok(ServiceOutput {
562 data: serde_json::to_string(&all_edges).unwrap_or_default(),
563 metadata,
564 })
565 } else {
566 let body = self
568 .post_query(
569 &url,
570 query,
571 &variables,
572 operation_name,
573 auth.as_ref(),
574 &extra_headers,
575 )
576 .await?;
577
578 Self::validate_body(&body)?;
579
580 if let Some(ref b) = maybe_budget {
582 update_budget(b, &body).await;
583 }
584
585 let cost_meta = Self::extract_cost_metadata(&body).unwrap_or(json!(null));
586 let metadata = json!({ "cost": cost_meta });
587
588 Ok(ServiceOutput {
589 data: serde_json::to_string(&body["data"]).unwrap_or_default(),
590 metadata,
591 })
592 }
593 }
594}
595
596#[cfg(test)]
601#[allow(
602 clippy::unwrap_used,
603 clippy::indexing_slicing,
604 clippy::needless_pass_by_value,
605 clippy::field_reassign_with_default,
606 clippy::unnecessary_literal_bound
607)]
608mod tests {
609 use super::*;
610 use std::collections::HashMap;
611 use std::io::Write;
612 use std::sync::Arc;
613
614 use serde_json::json;
615 use tokio::io::{AsyncReadExt, AsyncWriteExt};
616 use tokio::net::TcpListener;
617
618 use crate::application::graphql_plugin_registry::GraphQlPluginRegistry;
619 use crate::ports::graphql_plugin::GraphQlTargetPlugin;
620
621 struct MockGraphQlServer;
627
628 impl MockGraphQlServer {
629 async fn run_with<F, Fut>(status: u16, body: impl Into<Vec<u8>>, f: F)
633 where
634 F: FnOnce(String) -> Fut,
635 Fut: std::future::Future<Output = ()>,
636 {
637 let body_bytes: Vec<u8> = body.into();
638 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
639 let addr = listener.local_addr().unwrap();
640 let url = format!("http://{addr}");
641
642 let body_clone = body_bytes.clone();
643 tokio::spawn(async move {
644 if let Ok((mut stream, _)) = listener.accept().await {
645 let mut buf = [0u8; 4096];
646 let _ = stream.read(&mut buf).await;
647 let mut response = Vec::new();
649 write!(
650 response,
651 "HTTP/1.1 {status} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
652 body_clone.len()
653 ).unwrap();
654 response.extend_from_slice(&body_clone);
655 let _ = stream.write_all(&response).await;
656 }
657 });
658
659 f(url).await;
660 }
661
662 async fn run_capturing_request<F, Fut>(body: impl Into<Vec<u8>>, f: F) -> Vec<u8>
664 where
665 F: FnOnce(String) -> Fut,
666 Fut: std::future::Future<Output = ()>,
667 {
668 let body_bytes: Vec<u8> = body.into();
669 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
670 let addr = listener.local_addr().unwrap();
671 let url = format!("http://{addr}");
672
673 let body_clone = body_bytes.clone();
674 let (tx, mut rx) = tokio::sync::oneshot::channel::<Vec<u8>>();
675 tokio::spawn(async move {
676 if let Ok((mut stream, _)) = listener.accept().await {
677 let mut buf = vec![0u8; 8192];
678 let n = stream.read(&mut buf).await.unwrap_or(0);
679 let request = buf[..n].to_vec();
680 let _ = tx.send(request);
681
682 let mut response = Vec::new();
683 write!(
684 response,
685 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
686 body_clone.len()
687 ).unwrap();
688 response.extend_from_slice(&body_clone);
689 let _ = stream.write_all(&response).await;
690 }
691 });
692
693 f(url).await;
694
695 rx.try_recv().unwrap_or_default()
696 }
697 }
698
699 fn make_service(plugins: Option<Arc<GraphQlPluginRegistry>>) -> GraphQlService {
700 let mut config = GraphQlConfig::default();
701 config.max_pages = 5; GraphQlService::new(config, plugins)
703 }
704
705 fn simple_query_body(data: Value) -> Vec<u8> {
706 serde_json::to_vec(&json!({ "data": data })).unwrap()
707 }
708
709 #[tokio::test]
712 async fn execute_simple_query() {
713 let body = simple_query_body(json!({ "users": [{ "id": 1 }] }));
714 MockGraphQlServer::run_with(200, body, |url| async move {
715 let svc = make_service(None);
716 let input = ServiceInput {
717 url,
718 params: json!({ "query": "{ users { id } }" }),
719 };
720 let output = svc.execute(input).await.unwrap();
721 let data: Value = serde_json::from_str(&output.data).unwrap();
722 assert_eq!(data["users"][0]["id"], 1);
723 })
724 .await;
725 }
726
727 #[tokio::test]
728 async fn graphql_errors_in_200_response() {
729 let body =
730 serde_json::to_vec(&json!({ "errors": [{ "message": "not found" }], "data": null }))
731 .unwrap();
732 MockGraphQlServer::run_with(200, body, |url| async move {
733 let svc = make_service(None);
734 let input = ServiceInput {
735 url,
736 params: json!({ "query": "{ missing }" }),
737 };
738 let err = svc.execute(input).await.unwrap_err();
739 assert!(
740 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
741 "expected InvalidResponse, got {err:?}"
742 );
743 })
744 .await;
745 }
746
747 #[tokio::test]
748 async fn http_error_returns_unavailable() {
749 let body = b"Internal Server Error".to_vec();
750 MockGraphQlServer::run_with(500, body, |url| async move {
751 let svc = make_service(None);
752 let input = ServiceInput {
753 url,
754 params: json!({ "query": "{ x }" }),
755 };
756 let err = svc.execute(input).await.unwrap_err();
757 assert!(
758 matches!(err, StygianError::Service(ServiceError::Unavailable(_))),
759 "expected Unavailable, got {err:?}"
760 );
761 })
762 .await;
763 }
764
765 #[tokio::test]
766 async fn missing_data_key() {
767 let body = serde_json::to_vec(&json!({ "extensions": {} })).unwrap();
768 MockGraphQlServer::run_with(200, body, |url| async move {
769 let svc = make_service(None);
770 let input = ServiceInput {
771 url,
772 params: json!({ "query": "{ x }" }),
773 };
774 let err = svc.execute(input).await.unwrap_err();
775 assert!(
776 matches!(err, StygianError::Service(ServiceError::InvalidResponse(_))),
777 "expected InvalidResponse, got {err:?}"
778 );
779 })
780 .await;
781 }
782
783 #[tokio::test]
784 async fn bearer_auth_header_set() {
785 let body = simple_query_body(json!({}));
786 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
787 let svc = make_service(None);
788 let input = ServiceInput {
789 url,
790 params: json!({
791 "query": "{ x }",
792 "auth": { "kind": "bearer", "token": "test-token-123" }
793 }),
794 };
795 let _ = svc.execute(input).await;
796 })
797 .await;
798
799 let request_str = String::from_utf8_lossy(&request_bytes);
800 assert!(
801 request_str.contains("authorization: Bearer test-token-123"),
802 "auth header not found in request:\n{request_str}"
803 );
804 }
805
806 #[tokio::test]
807 async fn plugin_version_headers_merged() {
808 struct V1Plugin;
809 impl GraphQlTargetPlugin for V1Plugin {
810 fn name(&self) -> &str {
811 "v1"
812 }
813 fn endpoint(&self) -> &str {
814 "unused"
815 }
816 fn version_headers(&self) -> HashMap<String, String> {
817 [("X-TEST-VERSION".to_string(), "2025-01-01".to_string())].into()
818 }
819 }
820
821 let mut registry = GraphQlPluginRegistry::new();
822 registry.register(Arc::new(V1Plugin));
823
824 let body = simple_query_body(json!({}));
825 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
826 let svc = make_service(Some(Arc::new(registry)));
827 let input = ServiceInput {
828 url,
829 params: json!({
830 "query": "{ x }",
831 "plugin": "v1"
832 }),
833 };
834 let _ = svc.execute(input).await;
835 })
836 .await;
837
838 let request_str = String::from_utf8_lossy(&request_bytes);
839 assert!(
840 request_str.contains("x-test-version: 2025-01-01"),
841 "version header not found:\n{request_str}"
842 );
843 }
844
845 #[tokio::test]
846 async fn plugin_default_auth_used_when_params_auth_absent() {
847 use crate::ports::{GraphQlAuth, GraphQlAuthKind};
848
849 struct TokenPlugin;
850 impl GraphQlTargetPlugin for TokenPlugin {
851 fn name(&self) -> &str {
852 "tokenplugin"
853 }
854 fn endpoint(&self) -> &str {
855 "unused"
856 }
857 fn default_auth(&self) -> Option<GraphQlAuth> {
858 Some(GraphQlAuth {
859 kind: GraphQlAuthKind::Bearer,
860 token: "plugin-default-token".to_string(),
861 header_name: None,
862 })
863 }
864 }
865
866 let mut registry = GraphQlPluginRegistry::new();
867 registry.register(Arc::new(TokenPlugin));
868
869 let body = simple_query_body(json!({}));
870 let request_bytes = MockGraphQlServer::run_capturing_request(body, |url| async move {
871 let svc = make_service(Some(Arc::new(registry)));
872 let input = ServiceInput {
873 url,
874 params: json!({
876 "query": "{ x }",
877 "plugin": "tokenplugin"
878 }),
879 };
880 let _ = svc.execute(input).await;
881 })
882 .await;
883
884 let request_str = String::from_utf8_lossy(&request_bytes);
885 assert!(
886 request_str.contains("Bearer plugin-default-token"),
887 "plugin default auth not applied:\n{request_str}"
888 );
889 }
890
891 #[tokio::test]
892 async fn throttle_response_returns_rate_limited() {
893 let body = serde_json::to_vec(&json!({
894 "data": null,
895 "extensions": {
896 "cost": {
897 "throttleStatus": "THROTTLED",
898 "maximumAvailable": 10000,
899 "currentlyAvailable": 0,
900 "restoreRate": 500
901 }
902 }
903 }))
904 .unwrap();
905
906 MockGraphQlServer::run_with(200, body, |url| async move {
907 let svc = make_service(None);
908 let input = ServiceInput {
909 url,
910 params: json!({ "query": "{ x }" }),
911 };
912 let err = svc.execute(input).await.unwrap_err();
913 assert!(
914 matches!(
915 err,
916 StygianError::Service(ServiceError::RateLimited { retry_after_ms })
917 if retry_after_ms > 0
918 ),
919 "expected RateLimited, got {err:?}"
920 );
921 })
922 .await;
923 }
924
925 #[tokio::test]
926 async fn cost_metadata_surfaced() {
927 let body = serde_json::to_vec(&json!({
928 "data": { "items": [] },
929 "extensions": {
930 "cost": {
931 "throttleStatus": "PASS",
932 "maximumAvailable": 10000,
933 "currentlyAvailable": 9800,
934 "actualQueryCost": 42,
935 "restoreRate": 500
936 }
937 }
938 }))
939 .unwrap();
940
941 MockGraphQlServer::run_with(200, body, |url| async move {
942 let svc = make_service(None);
943 let input = ServiceInput {
944 url,
945 params: json!({ "query": "{ items { id } }" }),
946 };
947 let output = svc.execute(input).await.unwrap();
948 let cost = &output.metadata["cost"];
949 assert_eq!(cost["actualQueryCost"], 42);
950 assert_eq!(cost["throttleStatus"], "PASS");
951 })
952 .await;
953 }
954
955 #[tokio::test]
956 async fn cursor_pagination_accumulates_pages() {
957 let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
960 let addr1 = listener1.local_addr().unwrap();
961 let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
962 let addr2 = listener2.local_addr().unwrap();
963
964 let page1_body = serde_json::to_vec(&json!({
967 "data": {
968 "items": {
969 "edges": [{"node": {"id": 1}}, {"node": {"id": 2}}],
970 "pageInfo": { "hasNextPage": true, "endCursor": "cursor1" }
971 }
972 }
973 }))
974 .unwrap();
975
976 let page2_body = serde_json::to_vec(&json!({
977 "data": {
978 "items": {
979 "edges": [{"node": {"id": 3}}],
980 "pageInfo": { "hasNextPage": false, "endCursor": null }
981 }
982 }
983 }))
984 .unwrap();
985
986 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
987 let addr = listener.local_addr().unwrap();
988 let url = format!("http://{addr}");
989
990 let bodies = vec![page1_body, page2_body];
991 tokio::spawn(async move {
992 for response_body in bodies {
993 if let Ok((mut stream, _)) = listener.accept().await {
994 let mut buf = [0u8; 8192];
995 let _ = stream.read(&mut buf).await;
996 let mut resp = Vec::new();
997 write!(
998 resp,
999 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1000 response_body.len()
1001 ).unwrap();
1002 resp.extend_from_slice(&response_body);
1003 let _ = stream.write_all(&resp).await;
1004 }
1005 }
1006 let _ = listener1;
1009 let _ = listener2;
1010 let _ = addr1;
1011 let _ = addr2;
1012 });
1013
1014 let svc = make_service(None);
1015 let input = ServiceInput {
1016 url,
1017 params: json!({
1018 "query": "query($first:Int,$after:String){ items(first:$first,after:$after){ edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1019 "pagination": {
1020 "strategy": "cursor",
1021 "page_info_path": "data.items.pageInfo",
1022 "edges_path": "data.items.edges",
1023 "page_size": 2
1024 }
1025 }),
1026 };
1027
1028 let output = svc.execute(input).await.unwrap();
1029 let edges: Vec<Value> = serde_json::from_str(&output.data).unwrap();
1030 assert_eq!(edges.len(), 3, "expected 3 accumulated edges");
1031 assert_eq!(edges[0]["node"]["id"], 1);
1032 assert_eq!(edges[2]["node"]["id"], 3);
1033 }
1034
1035 #[tokio::test]
1036 async fn pagination_cap_prevents_infinite_loop() {
1037 let page_body = serde_json::to_vec(&json!({
1039 "data": {
1040 "rows": {
1041 "edges": [{"node": {"id": 1}}],
1042 "pageInfo": { "hasNextPage": true, "endCursor": "always-more" }
1043 }
1044 }
1045 }))
1046 .unwrap();
1047
1048 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1049 let addr = listener.local_addr().unwrap();
1050 let url = format!("http://{addr}");
1051
1052 let page_body_clone = page_body.clone();
1053 tokio::spawn(async move {
1054 while let Ok((mut stream, _)) = listener.accept().await {
1055 let mut buf = [0u8; 8192];
1056 let _ = stream.read(&mut buf).await;
1057 let mut resp = Vec::new();
1058 write!(
1059 resp,
1060 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
1061 page_body_clone.len()
1062 )
1063 .unwrap();
1064 resp.extend_from_slice(&page_body_clone);
1065 let _ = stream.write_all(&resp).await;
1066 }
1067 });
1068
1069 let svc = make_service(None);
1071 let input = ServiceInput {
1072 url,
1073 params: json!({
1074 "query": "{ rows { edges{node{id}} pageInfo{hasNextPage endCursor} } }",
1075 "pagination": {
1076 "strategy": "cursor",
1077 "page_info_path": "data.rows.pageInfo",
1078 "edges_path": "data.rows.edges",
1079 "page_size": 1
1080 }
1081 }),
1082 };
1083
1084 let err = svc.execute(input).await.unwrap_err();
1085 assert!(
1086 matches!(err, StygianError::Service(ServiceError::InvalidResponse(ref msg)) if msg.contains("max_pages")),
1087 "expected pagination cap error, got {err:?}"
1088 );
1089 }
1090}