1use std::collections::HashMap;
48use std::sync::{Arc, OnceLock};
49use std::time::Duration;
50
51use async_trait::async_trait;
52use openapiv3::{OpenAPI, Operation, Parameter, ReferenceOr};
53use reqwest::Client;
54use serde_json::{Value, json};
55use tokio::sync::RwLock;
56use tracing::{debug, info};
57
58use crate::adapters::graphql_rate_limit::{
59 RateLimitConfig, RateLimitStrategy, RequestRateLimit, rate_limit_acquire,
60};
61use crate::adapters::rest_api::{RestApiAdapter, RestApiConfig};
62use crate::domain::error::{Result, ServiceError, StygianError};
63use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
64
65type SpecCache = Arc<RwLock<HashMap<String, Arc<OpenAPI>>>>;
68
69#[derive(Debug, Clone, Default)]
91pub struct OpenApiConfig {
92 pub rest: RestApiConfig,
94}
95
96#[derive(Clone)]
111pub struct OpenApiAdapter {
112 inner: RestApiAdapter,
114 spec_client: Client,
116 spec_cache: SpecCache,
118 rate_limit: Arc<OnceLock<RequestRateLimit>>,
121}
122
123impl OpenApiAdapter {
124 #[must_use]
133 pub fn new() -> Self {
134 Self::with_config(OpenApiConfig::default())
135 }
136
137 #[must_use]
158 pub fn with_config(config: OpenApiConfig) -> Self {
159 #[allow(clippy::expect_used)]
161 let spec_client = Client::builder()
162 .timeout(Duration::from_secs(30))
163 .use_rustls_tls()
164 .build()
165 .expect("TLS backend unavailable");
166
167 Self {
168 inner: RestApiAdapter::with_config(config.rest),
169 spec_client,
170 spec_cache: Arc::new(RwLock::new(HashMap::new())),
171 rate_limit: Arc::new(OnceLock::new()),
172 }
173 }
174}
175
176impl Default for OpenApiAdapter {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182fn svc_err(msg: impl Into<String>) -> StygianError {
186 StygianError::from(ServiceError::Unavailable(msg.into()))
187}
188
189async fn fetch_spec(client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
193 let body = client
194 .get(url)
195 .header(
196 "Accept",
197 "application/json, application/yaml, text/yaml, */*",
198 )
199 .send()
200 .await
201 .map_err(|e| svc_err(format!("spec fetch failed: {e}")))?
202 .text()
203 .await
204 .map_err(|e| svc_err(format!("spec read failed: {e}")))?;
205
206 let api: OpenAPI = serde_json::from_str(&body)
207 .or_else(|_| serde_yaml::from_str(&body))
208 .map_err(|e| svc_err(format!("spec parse failed: {e}")))?;
209
210 Ok(Arc::new(api))
211}
212
213async fn resolve_spec(cache: &SpecCache, client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
215 {
216 let guard = cache.read().await;
217 if let Some(spec) = guard.get(url) {
218 debug!(url, "OpenAPI spec cache hit");
219 return Ok(Arc::clone(spec));
220 }
221 }
222
223 let spec = fetch_spec(client, url).await?;
225
226 {
227 let mut guard = cache.write().await;
228 guard
230 .entry(url.to_owned())
231 .or_insert_with(|| Arc::clone(&spec));
232 }
233
234 Ok(spec)
235}
236
237fn resolve_operation<'a>(
244 api: &'a OpenAPI,
245 operation_ref: &str,
246) -> Result<(String, String, &'a Operation)> {
247 let method_path: Option<(String, &str)> = operation_ref
249 .split_once(' ')
250 .filter(|(m, _)| {
251 matches!(
252 m.to_uppercase().as_str(),
253 "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | "HEAD" | "OPTIONS" | "TRACE"
254 )
255 })
256 .map(|(m, p)| (m.to_uppercase(), p));
257
258 for (path_str, path_item_ref) in &api.paths.paths {
259 let item = match path_item_ref {
260 ReferenceOr::Item(i) => i,
261 ReferenceOr::Reference { .. } => continue,
262 };
263
264 let ops: [(&str, Option<&Operation>); 8] = [
265 ("GET", item.get.as_ref()),
266 ("POST", item.post.as_ref()),
267 ("PUT", item.put.as_ref()),
268 ("PATCH", item.patch.as_ref()),
269 ("DELETE", item.delete.as_ref()),
270 ("HEAD", item.head.as_ref()),
271 ("OPTIONS", item.options.as_ref()),
272 ("TRACE", item.trace.as_ref()),
273 ];
274
275 for (method, maybe_op) in ops {
276 let Some(op) = maybe_op else { continue };
277
278 let matched = match &method_path {
279 Some((target_method, target_path)) => {
280 method == target_method.as_str() && path_str == target_path
281 }
282 None => op.operation_id.as_deref() == Some(operation_ref),
283 };
284
285 if matched {
286 return Ok((method.to_owned(), path_str.clone(), op));
287 }
288 }
289 }
290
291 Err(svc_err(format!(
292 "operation '{operation_ref}' not found in spec"
293 )))
294}
295
296#[allow(clippy::indexing_slicing)]
300fn resolve_server(api: &OpenAPI, server_override: &Value) -> String {
301 if let Some(url) = server_override.as_str().filter(|s| !s.is_empty()) {
302 return url.trim_end_matches('/').to_owned();
303 }
304 api.servers
305 .first()
306 .map(|s| s.url.trim_end_matches('/').to_owned())
307 .unwrap_or_default()
308}
309
310fn classify_params(op: &Operation) -> (Vec<String>, Vec<String>) {
312 let mut path_params: Vec<String> = Vec::new();
313 let mut query_params: Vec<String> = Vec::new();
314
315 for p_ref in &op.parameters {
316 let p = match p_ref {
317 ReferenceOr::Item(p) => p,
318 ReferenceOr::Reference { .. } => continue,
319 };
320 match p {
321 Parameter::Path { parameter_data, .. } => {
322 path_params.push(parameter_data.name.clone());
323 }
324 Parameter::Query { parameter_data, .. } => {
325 query_params.push(parameter_data.name.clone());
326 }
327 Parameter::Header { .. } | Parameter::Cookie { .. } => {}
330 }
331 }
332
333 (path_params, query_params)
334}
335
336fn build_url(server_url: &str, path_template: &str, args: &HashMap<String, Value>) -> String {
338 let mut url = format!("{server_url}{path_template}");
339 for (key, val) in args {
340 let placeholder = format!("{{{key}}}");
341 if url.contains(placeholder.as_str()) {
342 let replacement = val.as_str().map_or_else(|| val.to_string(), str::to_owned);
343 url = url.replace(placeholder.as_str(), &replacement);
344 }
345 }
346 url
347}
348
349#[allow(clippy::indexing_slicing)]
355fn build_rest_params(
356 method: &str,
357 op: &Operation,
358 args: &HashMap<String, Value>,
359 path_param_names: &[String],
360 query_param_names: &[String],
361 auth_override: &Value,
362) -> Value {
363 let query_obj: serde_json::Map<String, Value> = query_param_names
364 .iter()
365 .filter_map(|name| {
366 args.get(name.as_str()).map(|val| {
367 let s = val.as_str().map_or_else(|| val.to_string(), str::to_owned);
368 (name.clone(), Value::String(s))
369 })
370 })
371 .collect();
372
373 let body_value = if op.request_body.is_some() {
374 let excluded: std::collections::HashSet<&str> = path_param_names
375 .iter()
376 .chain(query_param_names.iter())
377 .map(String::as_str)
378 .collect();
379 let body_args: serde_json::Map<String, Value> = args
380 .iter()
381 .filter(|(k, _)| !excluded.contains(k.as_str()))
382 .map(|(k, v)| (k.clone(), v.clone()))
383 .collect();
384 if body_args.is_empty() {
385 Value::Null
386 } else {
387 Value::Object(body_args)
388 }
389 } else {
390 Value::Null
391 };
392
393 let mut params = json!({
394 "method": method,
395 "query": Value::Object(query_obj),
396 });
397
398 if !body_value.is_null() {
399 params["body"] = body_value;
400 }
401 if !auth_override.is_null() {
402 params["auth"] = auth_override.clone();
403 }
404
405 params
406}
407
408#[allow(clippy::indexing_slicing)]
410fn parse_rate_limit_config(rl: &Value) -> RateLimitConfig {
411 let strategy = match rl["strategy"].as_str().unwrap_or("sliding_window") {
412 "token_bucket" => RateLimitStrategy::TokenBucket,
413 _ => RateLimitStrategy::SlidingWindow,
414 };
415 RateLimitConfig {
416 max_requests: rl["max_requests"]
417 .as_u64()
418 .and_then(|value| u32::try_from(value).ok())
419 .unwrap_or(100),
420 window: Duration::from_secs(rl["window_secs"].as_u64().unwrap_or(60)),
421 max_delay_ms: rl["max_delay_ms"].as_u64().unwrap_or(30_000),
422 strategy,
423 }
424}
425
426#[async_trait]
429impl ScrapingService for OpenApiAdapter {
430 #[allow(clippy::indexing_slicing)]
470 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
471 let rl_params = &input.params["rate_limit"];
473 if !rl_params.is_null() {
474 let rl = self
475 .rate_limit
476 .get_or_init(|| RequestRateLimit::new(parse_rate_limit_config(rl_params)));
477 rate_limit_acquire(rl).await;
478 }
479
480 info!(url = %input.url, "OpenAPI adapter: execute");
481
482 let api = resolve_spec(&self.spec_cache, &self.spec_client, &input.url).await?;
484
485 let operation_ref = input.params["operation"]
487 .as_str()
488 .ok_or_else(|| svc_err("params.operation is required"))?;
489
490 let (method, path_template, op) = resolve_operation(&api, operation_ref)?;
491
492 let server_url = resolve_server(&api, &input.params["server"]["url"]);
494
495 let (path_param_names, query_param_names) = classify_params(op);
497
498 let args: HashMap<String, Value> = input.params["args"]
500 .as_object()
501 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
502 .unwrap_or_default();
503
504 let final_url = build_url(&server_url, &path_template, &args);
506
507 let rest_params = build_rest_params(
509 &method,
510 op,
511 &args,
512 &path_param_names,
513 &query_param_names,
514 &input.params["auth"],
515 );
516
517 debug!(
518 %final_url, %method, path_template, operation_ref,
519 "OpenAPI: delegating to RestApiAdapter"
520 );
521
522 let inner_output = self
524 .inner
525 .execute(ServiceInput {
526 url: final_url.clone(),
527 params: rest_params,
528 })
529 .await?;
530
531 let mut metadata = inner_output.metadata;
533 if let Value::Object(ref mut m) = metadata {
534 m.insert(
535 "openapi_spec_url".to_owned(),
536 Value::String(input.url.clone()),
537 );
538 m.insert(
539 "operation_id".to_owned(),
540 Value::String(operation_ref.to_owned()),
541 );
542 m.insert("method".to_owned(), Value::String(method));
543 m.insert("path_template".to_owned(), Value::String(path_template));
544 m.insert("server_url".to_owned(), Value::String(server_url));
545 m.insert("resolved_url".to_owned(), Value::String(final_url));
546 }
547
548 Ok(ServiceOutput {
549 data: inner_output.data,
550 metadata,
551 })
552 }
553
554 fn name(&self) -> &'static str {
555 "openapi"
556 }
557}
558
559#[cfg(test)]
562#[allow(
563 clippy::unwrap_used,
564 clippy::panic,
565 clippy::indexing_slicing,
566 clippy::expect_used
567)]
568mod tests {
569 use super::*;
570 use serde_json::json;
571 use std::time::Duration;
572
573 const MINI_SPEC: &str = r#"{
577 "openapi": "3.0.0",
578 "info": { "title": "Mini Test API", "version": "1.0" },
579 "servers": [{ "url": "https://api.example.com/v1" }],
580 "paths": {
581 "/pets": {
582 "get": {
583 "operationId": "listPets",
584 "parameters": [
585 { "name": "limit", "in": "query", "schema": { "type": "integer" } },
586 { "name": "status", "in": "query", "schema": { "type": "string" } }
587 ],
588 "responses": { "200": { "description": "OK" } }
589 }
590 },
591 "/pets/{petId}": {
592 "get": {
593 "operationId": "getPet",
594 "parameters": [
595 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
596 ],
597 "responses": { "200": { "description": "OK" } }
598 },
599 "delete": {
600 "operationId": "deletePet",
601 "parameters": [
602 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
603 ],
604 "responses": { "204": { "description": "No content" } }
605 }
606 },
607 "/pets/findByStatus": {
608 "get": {
609 "operationId": "findPetsByStatus",
610 "parameters": [
611 { "name": "status", "in": "query", "schema": { "type": "string" } }
612 ],
613 "responses": { "200": { "description": "OK" } }
614 }
615 }
616 },
617 "components": {
618 "securitySchemes": {
619 "apiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-Api-Key" }
620 }
621 }
622 }"#;
623
624 fn parse_mini() -> Arc<OpenAPI> {
625 Arc::new(serde_json::from_str(MINI_SPEC).expect("MINI_SPEC is valid JSON"))
626 }
627
628 #[test]
631 fn parse_petstore_spec() {
632 let api = parse_mini();
633 assert_eq!(api.paths.paths.len(), 3, "spec has 3 paths");
634 assert!(api.components.is_some());
635 }
636
637 #[test]
640 fn resolve_operation_by_id() {
641 let api = parse_mini();
642 let (method, path, op) = resolve_operation(&api, "listPets").unwrap();
643 assert_eq!(method, "GET");
644 assert_eq!(path, "/pets");
645 assert_eq!(op.operation_id.as_deref(), Some("listPets"));
646 }
647
648 #[test]
651 fn resolve_operation_by_method_path() {
652 let api = parse_mini();
653 let (method, path, op) = resolve_operation(&api, "GET /pets/findByStatus").unwrap();
654 assert_eq!(method, "GET");
655 assert_eq!(path, "/pets/findByStatus");
656 assert_eq!(op.operation_id.as_deref(), Some("findPetsByStatus"));
657 }
658
659 #[test]
662 fn resolve_operation_not_found() {
663 let api = parse_mini();
664 assert!(resolve_operation(&api, "nonExistentOp").is_err());
665 }
666
667 #[test]
670 fn bind_path_params() {
671 let args: HashMap<String, Value> = HashMap::from([("petId".to_owned(), json!(42))]);
672 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
673 assert_eq!(url, "https://api.example.com/v1/pets/42");
674 }
675
676 #[test]
677 fn bind_path_params_string() {
678 let args: HashMap<String, Value> = HashMap::from([("petId".to_owned(), json!("fluffy"))]);
679 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
680 assert_eq!(url, "https://api.example.com/v1/pets/fluffy");
681 }
682
683 #[test]
686 fn bind_query_params() {
687 let api = parse_mini();
688 let (_, _, op) = resolve_operation(&api, "listPets").unwrap();
689 let (path_names, query_names) = classify_params(op);
690 assert!(path_names.is_empty());
691 assert!(query_names.contains(&"status".to_owned()));
692 assert!(query_names.contains(&"limit".to_owned()));
693
694 let args: HashMap<String, Value> = [
695 ("status".to_owned(), json!("available")),
696 ("limit".to_owned(), json!("10")),
697 ]
698 .into_iter()
699 .collect();
700
701 let params = build_rest_params("GET", op, &args, &path_names, &query_names, &Value::Null);
702 assert_eq!(params["query"]["status"], json!("available"));
703 assert_eq!(params["query"]["limit"], json!("10"));
704 }
705
706 #[test]
709 fn server_override() {
710 let api = parse_mini();
711 let url = resolve_server(&api, &json!("https://override.example.com/v2/"));
712 assert_eq!(url, "https://override.example.com/v2");
713
714 let default_url = resolve_server(&api, &Value::Null);
715 assert_eq!(default_url, "https://api.example.com/v1");
716 }
717
718 #[tokio::test]
724 async fn spec_cache_hit() {
725 let cache: SpecCache = Arc::new(RwLock::new(HashMap::new()));
726
727 let api = parse_mini();
729 cache
730 .write()
731 .await
732 .insert("http://test/spec.json".to_owned(), Arc::clone(&api));
733
734 #[allow(clippy::expect_used)]
737 let dummy_client = Client::builder().use_rustls_tls().build().expect("client");
738
739 let returned = resolve_spec(&cache, &dummy_client, "http://test/spec.json")
740 .await
741 .unwrap();
742
743 assert!(Arc::ptr_eq(&api, &returned));
745 }
746
747 #[tokio::test]
750 async fn rate_limit_proactive() {
751 use crate::adapters::graphql_rate_limit::rate_limit_acquire;
752 use tokio::time::Instant;
753
754 let config = RateLimitConfig {
755 max_requests: 3,
756 window: Duration::from_secs(10),
757 max_delay_ms: 5_000,
758 strategy: RateLimitStrategy::SlidingWindow,
759 };
760 let rl = RequestRateLimit::new(config);
761
762 for _ in 0..3 {
764 rate_limit_acquire(&rl).await;
765 }
766
767 let start = Instant::now();
769 let config_short = RateLimitConfig {
771 max_requests: 1,
772 window: Duration::from_millis(50),
773 max_delay_ms: 200,
774 strategy: RateLimitStrategy::SlidingWindow,
775 };
776 let rl_short = RequestRateLimit::new(config_short);
777 rate_limit_acquire(&rl_short).await; rate_limit_acquire(&rl_short).await; let elapsed = start.elapsed();
780 assert!(
781 elapsed >= Duration::from_millis(40),
782 "expected ≥40 ms delay but got {elapsed:?}"
783 );
784 }
785
786 #[test]
789 fn parse_rate_limit_config_token_bucket() {
790 let rl = json!({
791 "max_requests": 50,
792 "window_secs": 30,
793 "strategy": "token_bucket",
794 });
795 let cfg = parse_rate_limit_config(&rl);
796 assert_eq!(cfg.max_requests, 50);
797 assert_eq!(cfg.window, Duration::from_secs(30));
798 assert_eq!(cfg.strategy, RateLimitStrategy::TokenBucket);
799 }
800
801 #[test]
802 fn parse_rate_limit_config_defaults() {
803 let cfg = parse_rate_limit_config(&json!({}));
804 assert_eq!(cfg.max_requests, 100);
805 assert_eq!(cfg.window, Duration::from_mins(1));
806 assert_eq!(cfg.strategy, RateLimitStrategy::SlidingWindow);
807 }
808
809 #[test]
812 fn adapter_name() {
813 assert_eq!(OpenApiAdapter::new().name(), "openapi");
814 }
815}