1use std::collections::HashMap;
48use std::sync::{Arc, OnceLock};
49use std::time::Duration;
50
51use async_trait::async_trait;
52use openapiv3::{OpenAPI, Operation, Parameter, ReferenceOr};
53use reqwest::Client;
54use serde_json::{Value, json};
55use tokio::sync::RwLock;
56use tracing::{debug, info};
57
58use crate::adapters::graphql_rate_limit::{
59 RateLimitConfig, RateLimitStrategy, RequestRateLimit, rate_limit_acquire,
60};
61use crate::adapters::rest_api::{RestApiAdapter, RestApiConfig};
62use crate::domain::error::{Result, ServiceError, StygianError};
63use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
64
65type SpecCache = Arc<RwLock<HashMap<String, Arc<OpenAPI>>>>;
68
69#[derive(Debug, Clone, Default)]
91pub struct OpenApiConfig {
92 pub rest: RestApiConfig,
94}
95
96#[derive(Clone)]
111pub struct OpenApiAdapter {
112 inner: RestApiAdapter,
114 spec_client: Client,
116 spec_cache: SpecCache,
118 rate_limit: Arc<OnceLock<RequestRateLimit>>,
121}
122
123impl OpenApiAdapter {
124 pub fn new() -> Self {
133 Self::with_config(OpenApiConfig::default())
134 }
135
136 pub fn with_config(config: OpenApiConfig) -> Self {
157 #[allow(clippy::expect_used)]
159 let spec_client = Client::builder()
160 .timeout(Duration::from_secs(30))
161 .use_rustls_tls()
162 .build()
163 .expect("TLS backend unavailable");
164
165 Self {
166 inner: RestApiAdapter::with_config(config.rest),
167 spec_client,
168 spec_cache: Arc::new(RwLock::new(HashMap::new())),
169 rate_limit: Arc::new(OnceLock::new()),
170 }
171 }
172}
173
174impl Default for OpenApiAdapter {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180fn svc_err(msg: impl Into<String>) -> StygianError {
184 StygianError::from(ServiceError::Unavailable(msg.into()))
185}
186
187async fn fetch_spec(client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
191 let body = client
192 .get(url)
193 .header(
194 "Accept",
195 "application/json, application/yaml, text/yaml, */*",
196 )
197 .send()
198 .await
199 .map_err(|e| svc_err(format!("spec fetch failed: {e}")))?
200 .text()
201 .await
202 .map_err(|e| svc_err(format!("spec read failed: {e}")))?;
203
204 let api: OpenAPI = serde_json::from_str(&body)
205 .or_else(|_| serde_yaml::from_str(&body))
206 .map_err(|e| svc_err(format!("spec parse failed: {e}")))?;
207
208 Ok(Arc::new(api))
209}
210
211async fn resolve_spec(cache: &SpecCache, client: &Client, url: &str) -> Result<Arc<OpenAPI>> {
213 {
214 let guard = cache.read().await;
215 if let Some(spec) = guard.get(url) {
216 debug!(url, "OpenAPI spec cache hit");
217 return Ok(Arc::clone(spec));
218 }
219 }
220
221 let spec = fetch_spec(client, url).await?;
223
224 {
225 let mut guard = cache.write().await;
226 guard
228 .entry(url.to_owned())
229 .or_insert_with(|| Arc::clone(&spec));
230 }
231
232 Ok(spec)
233}
234
235fn resolve_operation<'a>(
242 api: &'a OpenAPI,
243 operation_ref: &str,
244) -> Result<(String, String, &'a Operation)> {
245 let method_path: Option<(String, &str)> = operation_ref
247 .split_once(' ')
248 .filter(|(m, _)| {
249 matches!(
250 m.to_uppercase().as_str(),
251 "GET" | "POST" | "PUT" | "PATCH" | "DELETE" | "HEAD" | "OPTIONS" | "TRACE"
252 )
253 })
254 .map(|(m, p)| (m.to_uppercase(), p));
255
256 for (path_str, path_item_ref) in &api.paths.paths {
257 let item = match path_item_ref {
258 ReferenceOr::Item(i) => i,
259 ReferenceOr::Reference { .. } => continue,
260 };
261
262 let ops: [(&str, Option<&Operation>); 8] = [
263 ("GET", item.get.as_ref()),
264 ("POST", item.post.as_ref()),
265 ("PUT", item.put.as_ref()),
266 ("PATCH", item.patch.as_ref()),
267 ("DELETE", item.delete.as_ref()),
268 ("HEAD", item.head.as_ref()),
269 ("OPTIONS", item.options.as_ref()),
270 ("TRACE", item.trace.as_ref()),
271 ];
272
273 for (method, maybe_op) in ops {
274 let Some(op) = maybe_op else { continue };
275
276 let matched = match &method_path {
277 Some((target_method, target_path)) => {
278 method == target_method.as_str() && path_str == target_path
279 }
280 None => op.operation_id.as_deref() == Some(operation_ref),
281 };
282
283 if matched {
284 return Ok((method.to_owned(), path_str.clone(), op));
285 }
286 }
287 }
288
289 Err(svc_err(format!(
290 "operation '{operation_ref}' not found in spec"
291 )))
292}
293
294#[allow(clippy::indexing_slicing)]
298fn resolve_server(api: &OpenAPI, server_override: &Value) -> String {
299 if let Some(url) = server_override.as_str().filter(|s| !s.is_empty()) {
300 return url.trim_end_matches('/').to_owned();
301 }
302 api.servers
303 .first()
304 .map(|s| s.url.trim_end_matches('/').to_owned())
305 .unwrap_or_default()
306}
307
308fn classify_params(op: &Operation) -> (Vec<String>, Vec<String>) {
310 let mut path_params: Vec<String> = Vec::new();
311 let mut query_params: Vec<String> = Vec::new();
312
313 for p_ref in &op.parameters {
314 let p = match p_ref {
315 ReferenceOr::Item(p) => p,
316 ReferenceOr::Reference { .. } => continue,
317 };
318 match p {
319 Parameter::Path { parameter_data, .. } => {
320 path_params.push(parameter_data.name.clone());
321 }
322 Parameter::Query { parameter_data, .. } => {
323 query_params.push(parameter_data.name.clone());
324 }
325 Parameter::Header { .. } | Parameter::Cookie { .. } => {}
328 }
329 }
330
331 (path_params, query_params)
332}
333
334fn build_url(server_url: &str, path_template: &str, args: &HashMap<String, Value>) -> String {
336 let mut url = format!("{server_url}{path_template}");
337 for (key, val) in args {
338 let placeholder = format!("{{{key}}}");
339 if url.contains(placeholder.as_str()) {
340 let replacement = val.as_str().map_or_else(|| val.to_string(), str::to_owned);
341 url = url.replace(placeholder.as_str(), &replacement);
342 }
343 }
344 url
345}
346
347#[allow(clippy::indexing_slicing)]
353fn build_rest_params(
354 method: &str,
355 op: &Operation,
356 args: &HashMap<String, Value>,
357 path_param_names: &[String],
358 query_param_names: &[String],
359 auth_override: &Value,
360) -> Value {
361 let query_obj: serde_json::Map<String, Value> = query_param_names
362 .iter()
363 .filter_map(|name| {
364 args.get(name.as_str()).map(|val| {
365 let s = val.as_str().map_or_else(|| val.to_string(), str::to_owned);
366 (name.clone(), Value::String(s))
367 })
368 })
369 .collect();
370
371 let body_value = if op.request_body.is_some() {
372 let excluded: std::collections::HashSet<&str> = path_param_names
373 .iter()
374 .chain(query_param_names.iter())
375 .map(String::as_str)
376 .collect();
377 let body_args: serde_json::Map<String, Value> = args
378 .iter()
379 .filter(|(k, _)| !excluded.contains(k.as_str()))
380 .map(|(k, v)| (k.clone(), v.clone()))
381 .collect();
382 if body_args.is_empty() {
383 Value::Null
384 } else {
385 Value::Object(body_args)
386 }
387 } else {
388 Value::Null
389 };
390
391 let mut params = json!({
392 "method": method,
393 "query": Value::Object(query_obj),
394 });
395
396 if !body_value.is_null() {
397 params["body"] = body_value;
398 }
399 if !auth_override.is_null() {
400 params["auth"] = auth_override.clone();
401 }
402
403 params
404}
405
406#[allow(clippy::indexing_slicing)]
408fn parse_rate_limit_config(rl: &Value) -> RateLimitConfig {
409 let strategy = match rl["strategy"].as_str().unwrap_or("sliding_window") {
410 "token_bucket" => RateLimitStrategy::TokenBucket,
411 _ => RateLimitStrategy::SlidingWindow,
412 };
413 RateLimitConfig {
414 max_requests: rl["max_requests"]
415 .as_u64()
416 .and_then(|value| u32::try_from(value).ok())
417 .unwrap_or(100),
418 window: Duration::from_secs(rl["window_secs"].as_u64().unwrap_or(60)),
419 max_delay_ms: rl["max_delay_ms"].as_u64().unwrap_or(30_000),
420 strategy,
421 }
422}
423
424#[async_trait]
427impl ScrapingService for OpenApiAdapter {
428 #[allow(clippy::indexing_slicing)]
468 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
469 let rl_params = &input.params["rate_limit"];
471 if !rl_params.is_null() {
472 let rl = self
473 .rate_limit
474 .get_or_init(|| RequestRateLimit::new(parse_rate_limit_config(rl_params)));
475 rate_limit_acquire(rl).await;
476 }
477
478 info!(url = %input.url, "OpenAPI adapter: execute");
479
480 let api = resolve_spec(&self.spec_cache, &self.spec_client, &input.url).await?;
482
483 let operation_ref = input.params["operation"]
485 .as_str()
486 .ok_or_else(|| svc_err("params.operation is required"))?;
487
488 let (method, path_template, op) = resolve_operation(&api, operation_ref)?;
489
490 let server_url = resolve_server(&api, &input.params["server"]["url"]);
492
493 let (path_param_names, query_param_names) = classify_params(op);
495
496 let args: HashMap<String, Value> = input.params["args"]
498 .as_object()
499 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
500 .unwrap_or_default();
501
502 let final_url = build_url(&server_url, &path_template, &args);
504
505 let rest_params = build_rest_params(
507 &method,
508 op,
509 &args,
510 &path_param_names,
511 &query_param_names,
512 &input.params["auth"],
513 );
514
515 debug!(
516 %final_url, %method, path_template, operation_ref,
517 "OpenAPI: delegating to RestApiAdapter"
518 );
519
520 let inner_output = self
522 .inner
523 .execute(ServiceInput {
524 url: final_url.clone(),
525 params: rest_params,
526 })
527 .await?;
528
529 let mut metadata = inner_output.metadata;
531 if let Value::Object(ref mut m) = metadata {
532 m.insert(
533 "openapi_spec_url".to_owned(),
534 Value::String(input.url.clone()),
535 );
536 m.insert(
537 "operation_id".to_owned(),
538 Value::String(operation_ref.to_owned()),
539 );
540 m.insert("method".to_owned(), Value::String(method));
541 m.insert("path_template".to_owned(), Value::String(path_template));
542 m.insert("server_url".to_owned(), Value::String(server_url));
543 m.insert("resolved_url".to_owned(), Value::String(final_url));
544 }
545
546 Ok(ServiceOutput {
547 data: inner_output.data,
548 metadata,
549 })
550 }
551
552 fn name(&self) -> &'static str {
553 "openapi"
554 }
555}
556
557#[cfg(test)]
560#[allow(
561 clippy::unwrap_used,
562 clippy::panic,
563 clippy::indexing_slicing,
564 clippy::expect_used
565)]
566mod tests {
567 use super::*;
568 use serde_json::json;
569 use std::time::Duration;
570
571 const MINI_SPEC: &str = r#"{
575 "openapi": "3.0.0",
576 "info": { "title": "Mini Test API", "version": "1.0" },
577 "servers": [{ "url": "https://api.example.com/v1" }],
578 "paths": {
579 "/pets": {
580 "get": {
581 "operationId": "listPets",
582 "parameters": [
583 { "name": "limit", "in": "query", "schema": { "type": "integer" } },
584 { "name": "status", "in": "query", "schema": { "type": "string" } }
585 ],
586 "responses": { "200": { "description": "OK" } }
587 }
588 },
589 "/pets/{petId}": {
590 "get": {
591 "operationId": "getPet",
592 "parameters": [
593 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
594 ],
595 "responses": { "200": { "description": "OK" } }
596 },
597 "delete": {
598 "operationId": "deletePet",
599 "parameters": [
600 { "name": "petId", "in": "path", "required": true, "schema": { "type": "integer" } }
601 ],
602 "responses": { "204": { "description": "No content" } }
603 }
604 },
605 "/pets/findByStatus": {
606 "get": {
607 "operationId": "findPetsByStatus",
608 "parameters": [
609 { "name": "status", "in": "query", "schema": { "type": "string" } }
610 ],
611 "responses": { "200": { "description": "OK" } }
612 }
613 }
614 },
615 "components": {
616 "securitySchemes": {
617 "apiKeyAuth": { "type": "apiKey", "in": "header", "name": "X-Api-Key" }
618 }
619 }
620 }"#;
621
622 fn parse_mini() -> Arc<OpenAPI> {
623 Arc::new(serde_json::from_str(MINI_SPEC).expect("MINI_SPEC is valid JSON"))
624 }
625
626 #[test]
629 fn parse_petstore_spec() {
630 let api = parse_mini();
631 assert_eq!(api.paths.paths.len(), 3, "spec has 3 paths");
632 assert!(api.components.is_some());
633 }
634
635 #[test]
638 fn resolve_operation_by_id() {
639 let api = parse_mini();
640 let (method, path, op) = resolve_operation(&api, "listPets").unwrap();
641 assert_eq!(method, "GET");
642 assert_eq!(path, "/pets");
643 assert_eq!(op.operation_id.as_deref(), Some("listPets"));
644 }
645
646 #[test]
649 fn resolve_operation_by_method_path() {
650 let api = parse_mini();
651 let (method, path, op) = resolve_operation(&api, "GET /pets/findByStatus").unwrap();
652 assert_eq!(method, "GET");
653 assert_eq!(path, "/pets/findByStatus");
654 assert_eq!(op.operation_id.as_deref(), Some("findPetsByStatus"));
655 }
656
657 #[test]
660 fn resolve_operation_not_found() {
661 let api = parse_mini();
662 assert!(resolve_operation(&api, "nonExistentOp").is_err());
663 }
664
665 #[test]
668 fn bind_path_params() {
669 let args: HashMap<String, Value> = HashMap::from([("petId".to_owned(), json!(42))]);
670 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
671 assert_eq!(url, "https://api.example.com/v1/pets/42");
672 }
673
674 #[test]
675 fn bind_path_params_string() {
676 let args: HashMap<String, Value> = HashMap::from([("petId".to_owned(), json!("fluffy"))]);
677 let url = build_url("https://api.example.com/v1", "/pets/{petId}", &args);
678 assert_eq!(url, "https://api.example.com/v1/pets/fluffy");
679 }
680
681 #[test]
684 fn bind_query_params() {
685 let api = parse_mini();
686 let (_, _, op) = resolve_operation(&api, "listPets").unwrap();
687 let (path_names, query_names) = classify_params(op);
688 assert!(path_names.is_empty());
689 assert!(query_names.contains(&"status".to_owned()));
690 assert!(query_names.contains(&"limit".to_owned()));
691
692 let args: HashMap<String, Value> = [
693 ("status".to_owned(), json!("available")),
694 ("limit".to_owned(), json!("10")),
695 ]
696 .into_iter()
697 .collect();
698
699 let params = build_rest_params("GET", op, &args, &path_names, &query_names, &Value::Null);
700 assert_eq!(params["query"]["status"], json!("available"));
701 assert_eq!(params["query"]["limit"], json!("10"));
702 }
703
704 #[test]
707 fn server_override() {
708 let api = parse_mini();
709 let url = resolve_server(&api, &json!("https://override.example.com/v2/"));
710 assert_eq!(url, "https://override.example.com/v2");
711
712 let default_url = resolve_server(&api, &Value::Null);
713 assert_eq!(default_url, "https://api.example.com/v1");
714 }
715
716 #[tokio::test]
722 async fn spec_cache_hit() {
723 let cache: SpecCache = Arc::new(RwLock::new(HashMap::new()));
724
725 let api = parse_mini();
727 cache
728 .write()
729 .await
730 .insert("http://test/spec.json".to_owned(), Arc::clone(&api));
731
732 #[allow(clippy::expect_used)]
735 let dummy_client = Client::builder().use_rustls_tls().build().expect("client");
736
737 let returned = resolve_spec(&cache, &dummy_client, "http://test/spec.json")
738 .await
739 .unwrap();
740
741 assert!(Arc::ptr_eq(&api, &returned));
743 }
744
745 #[tokio::test]
748 async fn rate_limit_proactive() {
749 use crate::adapters::graphql_rate_limit::rate_limit_acquire;
750 use tokio::time::Instant;
751
752 let config = RateLimitConfig {
753 max_requests: 3,
754 window: Duration::from_secs(10),
755 max_delay_ms: 5_000,
756 strategy: RateLimitStrategy::SlidingWindow,
757 };
758 let rl = RequestRateLimit::new(config);
759
760 for _ in 0..3 {
762 rate_limit_acquire(&rl).await;
763 }
764
765 let start = Instant::now();
767 let config_short = RateLimitConfig {
769 max_requests: 1,
770 window: Duration::from_millis(50),
771 max_delay_ms: 200,
772 strategy: RateLimitStrategy::SlidingWindow,
773 };
774 let rl_short = RequestRateLimit::new(config_short);
775 rate_limit_acquire(&rl_short).await; rate_limit_acquire(&rl_short).await; let elapsed = start.elapsed();
778 assert!(
779 elapsed >= Duration::from_millis(40),
780 "expected ≥40 ms delay but got {elapsed:?}"
781 );
782 }
783
784 #[test]
787 fn parse_rate_limit_config_token_bucket() {
788 let rl = json!({
789 "max_requests": 50,
790 "window_secs": 30,
791 "strategy": "token_bucket",
792 });
793 let cfg = parse_rate_limit_config(&rl);
794 assert_eq!(cfg.max_requests, 50);
795 assert_eq!(cfg.window, Duration::from_secs(30));
796 assert_eq!(cfg.strategy, RateLimitStrategy::TokenBucket);
797 }
798
799 #[test]
800 fn parse_rate_limit_config_defaults() {
801 let cfg = parse_rate_limit_config(&json!({}));
802 assert_eq!(cfg.max_requests, 100);
803 assert_eq!(cfg.window, Duration::from_secs(60));
804 assert_eq!(cfg.strategy, RateLimitStrategy::SlidingWindow);
805 }
806
807 #[test]
810 fn adapter_name() {
811 assert_eq!(OpenApiAdapter::new().name(), "openapi");
812 }
813}