1use crate::stateful_handler::StatefulResponseHandler;
5use crate::{
6 CustomFixtureLoader, Error, FailureInjector, ProxyHandler, RealityContinuumEngine,
7 RecordReplayHandler, RequestFingerprint, ResponsePriority, ResponseSource, Result,
8 RouteChaosInjector,
9};
10use async_trait::async_trait;
11use axum::http::{HeaderMap, Method, StatusCode, Uri};
12use std::collections::HashMap;
13use std::sync::Arc;
14
15#[async_trait]
17pub trait BehavioralScenarioReplay: Send + Sync {
18 async fn try_replay(
20 &self,
21 method: &Method,
22 uri: &Uri,
23 headers: &HeaderMap,
24 body: Option<&[u8]>,
25 session_id: Option<&str>,
26 ) -> Result<Option<BehavioralReplayResponse>>;
27}
28
29#[derive(Debug, Clone)]
31pub struct BehavioralReplayResponse {
32 pub status_code: u16,
34 pub headers: HashMap<String, String>,
36 pub body: Vec<u8>,
38 pub timing_ms: Option<u64>,
40 pub content_type: String,
42}
43
44pub struct PriorityHttpHandler {
46 custom_fixture_loader: Option<Arc<CustomFixtureLoader>>,
48 record_replay: RecordReplayHandler,
50 behavioral_scenario_replay: Option<Arc<dyn BehavioralScenarioReplay + Send + Sync>>,
52 stateful_handler: Option<Arc<StatefulResponseHandler>>,
54 route_chaos_injector: Option<Arc<RouteChaosInjector>>,
56 failure_injector: Option<FailureInjector>,
58 proxy_handler: Option<ProxyHandler>,
60 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
62 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
64 continuum_engine: Option<Arc<RealityContinuumEngine>>,
66}
67
68pub trait MockGenerator {
70 fn generate_mock_response(
72 &self,
73 fingerprint: &RequestFingerprint,
74 headers: &HeaderMap,
75 body: Option<&[u8]>,
76 ) -> Result<Option<MockResponse>>;
77}
78
79#[derive(Debug, Clone)]
81pub struct MockResponse {
82 pub status_code: u16,
84 pub headers: HashMap<String, String>,
86 pub body: String,
88 pub content_type: String,
90}
91
92impl PriorityHttpHandler {
93 pub fn new(
95 record_replay: RecordReplayHandler,
96 failure_injector: Option<FailureInjector>,
97 proxy_handler: Option<ProxyHandler>,
98 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
99 ) -> Self {
100 Self {
101 custom_fixture_loader: None,
102 record_replay,
103 behavioral_scenario_replay: None,
104 stateful_handler: None,
105 route_chaos_injector: None,
106 failure_injector,
107 proxy_handler,
108 mock_generator,
109 openapi_spec: None,
110 continuum_engine: None,
111 }
112 }
113
114 pub fn new_with_openapi(
116 record_replay: RecordReplayHandler,
117 failure_injector: Option<FailureInjector>,
118 proxy_handler: Option<ProxyHandler>,
119 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
120 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
121 ) -> Self {
122 Self {
123 custom_fixture_loader: None,
124 record_replay,
125 behavioral_scenario_replay: None,
126 stateful_handler: None,
127 route_chaos_injector: None,
128 failure_injector,
129 proxy_handler,
130 mock_generator,
131 openapi_spec,
132 continuum_engine: None,
133 }
134 }
135
136 pub fn with_custom_fixture_loader(mut self, loader: Arc<CustomFixtureLoader>) -> Self {
138 self.custom_fixture_loader = Some(loader);
139 self
140 }
141
142 pub fn with_stateful_handler(mut self, handler: Arc<StatefulResponseHandler>) -> Self {
144 self.stateful_handler = Some(handler);
145 self
146 }
147
148 pub fn with_route_chaos_injector(mut self, injector: Arc<RouteChaosInjector>) -> Self {
150 self.route_chaos_injector = Some(injector);
151 self
152 }
153
154 pub fn with_continuum_engine(mut self, engine: Arc<RealityContinuumEngine>) -> Self {
156 self.continuum_engine = Some(engine);
157 self
158 }
159
160 pub fn with_behavioral_scenario_replay(
162 mut self,
163 replay_engine: Arc<dyn BehavioralScenarioReplay + Send + Sync>,
164 ) -> Self {
165 self.behavioral_scenario_replay = Some(replay_engine);
166 self
167 }
168
169 pub async fn process_request(
171 &self,
172 method: &Method,
173 uri: &Uri,
174 headers: &HeaderMap,
175 body: Option<&[u8]>,
176 ) -> Result<PriorityResponse> {
177 let fingerprint = RequestFingerprint::new(method.clone(), uri, headers, body);
178
179 if let Some(ref custom_loader) = self.custom_fixture_loader {
181 if let Some(custom_fixture) = custom_loader.load_fixture(&fingerprint) {
182 if custom_fixture.delay_ms > 0 {
184 tokio::time::sleep(tokio::time::Duration::from_millis(custom_fixture.delay_ms))
185 .await;
186 }
187
188 let response_body = if custom_fixture.response.is_string() {
190 custom_fixture.response.as_str().unwrap().to_string()
191 } else {
192 serde_json::to_string(&custom_fixture.response).map_err(|e| {
193 Error::generic(format!(
194 "Failed to serialize custom fixture response: {}",
195 e
196 ))
197 })?
198 };
199
200 let content_type = custom_fixture
202 .headers
203 .get("content-type")
204 .cloned()
205 .unwrap_or_else(|| "application/json".to_string());
206
207 return Ok(PriorityResponse {
208 source: ResponseSource::new(
209 ResponsePriority::Replay,
210 "custom_fixture".to_string(),
211 )
212 .with_metadata("fixture_path".to_string(), custom_fixture.path.clone()),
213 status_code: custom_fixture.status,
214 headers: custom_fixture.headers.clone(),
215 body: response_body.into_bytes(),
216 content_type,
217 });
218 }
219 }
220
221 if let Some(recorded_request) =
223 self.record_replay.replay_handler().load_fixture(&fingerprint).await?
224 {
225 let content_type = recorded_request
226 .response_headers
227 .get("content-type")
228 .unwrap_or(&"application/json".to_string())
229 .clone();
230
231 return Ok(PriorityResponse {
232 source: ResponseSource::new(ResponsePriority::Replay, "replay".to_string())
233 .with_metadata("fixture_path".to_string(), "recorded".to_string()),
234 status_code: recorded_request.status_code,
235 headers: recorded_request.response_headers,
236 body: recorded_request.response_body.into_bytes(),
237 content_type,
238 });
239 }
240
241 if let Some(ref scenario_replay) = self.behavioral_scenario_replay {
243 let session_id = headers
245 .get("x-session-id")
246 .or_else(|| headers.get("session-id"))
247 .and_then(|v| v.to_str().ok())
248 .map(|s| s.to_string());
249
250 if let Ok(Some(replay_response)) = scenario_replay
251 .try_replay(method, uri, headers, body, session_id.as_deref())
252 .await
253 {
254 if let Some(timing_ms) = replay_response.timing_ms {
256 tokio::time::sleep(tokio::time::Duration::from_millis(timing_ms)).await;
257 }
258 return Ok(PriorityResponse {
259 source: ResponseSource::new(
260 ResponsePriority::Replay,
261 "behavioral_scenario".to_string(),
262 )
263 .with_metadata("replay_type".to_string(), "scenario".to_string()),
264 status_code: replay_response.status_code,
265 headers: replay_response.headers,
266 body: replay_response.body,
267 content_type: replay_response.content_type,
268 });
269 }
270 }
271
272 if let Some(ref stateful_handler) = self.stateful_handler {
274 if let Some(stateful_response) =
275 stateful_handler.process_request(method, uri, headers, body).await?
276 {
277 return Ok(PriorityResponse {
278 source: ResponseSource::new(ResponsePriority::Stateful, "stateful".to_string())
279 .with_metadata("state".to_string(), stateful_response.state)
280 .with_metadata("resource_id".to_string(), stateful_response.resource_id),
281 status_code: stateful_response.status_code,
282 headers: stateful_response.headers,
283 body: stateful_response.body.into_bytes(),
284 content_type: stateful_response.content_type,
285 });
286 }
287 }
288
289 if let Some(ref route_chaos) = self.route_chaos_injector {
291 if let Err(e) = route_chaos.inject_latency(method, uri).await {
293 tracing::warn!("Failed to inject per-route latency: {}", e);
294 }
295
296 if let Some(fault_response) = route_chaos.get_fault_response(method, uri) {
298 let error_response = serde_json::json!({
299 "error": fault_response.error_message,
300 "injected_failure": true,
301 "fault_type": fault_response.fault_type,
302 "timestamp": chrono::Utc::now().to_rfc3339()
303 });
304
305 return Ok(PriorityResponse {
306 source: ResponseSource::new(
307 ResponsePriority::Fail,
308 "route_fault_injection".to_string(),
309 )
310 .with_metadata("fault_type".to_string(), fault_response.fault_type)
311 .with_metadata("error_message".to_string(), fault_response.error_message),
312 status_code: fault_response.status_code,
313 headers: HashMap::new(),
314 body: serde_json::to_string(&error_response)?.into_bytes(),
315 content_type: "application/json".to_string(),
316 });
317 }
318 }
319
320 if let Some(ref failure_injector) = self.failure_injector {
322 let tags = if let Some(ref spec) = self.openapi_spec {
323 fingerprint.openapi_tags(spec).unwrap_or_else(|| fingerprint.tags())
324 } else {
325 fingerprint.tags()
326 };
327 if let Some((status_code, error_message)) = failure_injector.process_request(&tags) {
328 let error_response = serde_json::json!({
329 "error": error_message,
330 "injected_failure": true,
331 "timestamp": chrono::Utc::now().to_rfc3339()
332 });
333
334 return Ok(PriorityResponse {
335 source: ResponseSource::new(
336 ResponsePriority::Fail,
337 "failure_injection".to_string(),
338 )
339 .with_metadata("error_message".to_string(), error_message),
340 status_code,
341 headers: HashMap::new(),
342 body: serde_json::to_string(&error_response)?.into_bytes(),
343 content_type: "application/json".to_string(),
344 });
345 }
346 }
347
348 let should_blend = if let Some(ref continuum_engine) = self.continuum_engine {
350 continuum_engine.is_enabled().await
351 } else {
352 false
353 };
354
355 if let Some(ref proxy_handler) = self.proxy_handler {
357 let migration_mode = if proxy_handler.config.migration_enabled {
359 proxy_handler.config.get_effective_migration_mode(uri.path())
360 } else {
361 None
362 };
363
364 if let Some(crate::proxy::config::MigrationMode::Mock) = migration_mode {
366 } else if proxy_handler.config.should_proxy_with_condition(method, uri, headers, body) {
368 let is_shadow = proxy_handler.config.should_shadow(uri.path());
370
371 if should_blend {
373 let proxy_future = proxy_handler.proxy_request(method, uri, headers, body);
375 let mock_result = if let Some(ref mock_generator) = self.mock_generator {
376 mock_generator.generate_mock_response(&fingerprint, headers, body)
377 } else {
378 Ok(None)
379 };
380
381 let proxy_result = proxy_future.await;
383
384 match (proxy_result, mock_result) {
386 (Ok(proxy_response), Ok(Some(mock_response))) => {
387 if let Some(ref continuum_engine) = self.continuum_engine {
389 let blend_ratio =
390 continuum_engine.get_blend_ratio(uri.path()).await;
391 let blender = continuum_engine.blender();
392
393 let mock_body_str = &mock_response.body;
395 let real_body_bytes =
396 proxy_response.body.clone().unwrap_or_default();
397 let real_body_str = String::from_utf8_lossy(&real_body_bytes);
398
399 let mock_json: serde_json::Value =
400 serde_json::from_str(mock_body_str)
401 .unwrap_or_else(|_| serde_json::json!({}));
402 let real_json: serde_json::Value =
403 serde_json::from_str(&real_body_str)
404 .unwrap_or_else(|_| serde_json::json!({}));
405
406 let blended_json =
408 blender.blend_responses(&mock_json, &real_json, blend_ratio);
409 let blended_body = serde_json::to_string(&blended_json)
410 .unwrap_or_else(|_| real_body_str.to_string());
411
412 let blended_status = blender.blend_status_code(
414 mock_response.status_code,
415 proxy_response.status_code,
416 blend_ratio,
417 );
418
419 let mut proxy_headers = HashMap::new();
421 for (key, value) in proxy_response.headers.iter() {
422 if let Ok(value_str) = value.to_str() {
423 proxy_headers.insert(
424 key.as_str().to_string(),
425 value_str.to_string(),
426 );
427 }
428 }
429 let blended_headers = blender.blend_headers(
430 &mock_response.headers,
431 &proxy_headers,
432 blend_ratio,
433 );
434
435 let content_type = blended_headers
436 .get("content-type")
437 .cloned()
438 .or_else(|| {
439 proxy_response
440 .headers
441 .get("content-type")
442 .and_then(|v| v.to_str().ok())
443 .map(|s| s.to_string())
444 })
445 .unwrap_or_else(|| "application/json".to_string());
446
447 tracing::info!(
448 path = %uri.path(),
449 blend_ratio = blend_ratio,
450 "Reality Continuum: blended mock and real responses"
451 );
452
453 let mut source = ResponseSource::new(
454 ResponsePriority::Proxy,
455 "continuum".to_string(),
456 )
457 .with_metadata("blend_ratio".to_string(), blend_ratio.to_string())
458 .with_metadata(
459 "upstream_url".to_string(),
460 proxy_handler.config.get_upstream_url(uri.path()),
461 );
462
463 if let Some(mode) = migration_mode {
464 source = source.with_metadata(
465 "migration_mode".to_string(),
466 format!("{:?}", mode),
467 );
468 }
469
470 return Ok(PriorityResponse {
471 source,
472 status_code: blended_status,
473 headers: blended_headers,
474 body: blended_body.into_bytes(),
475 content_type,
476 });
477 }
478 }
479 (Ok(proxy_response), Ok(None)) => {
480 tracing::debug!(
482 path = %uri.path(),
483 "Continuum: mock generation failed, using real response"
484 );
485 }
487 (Ok(proxy_response), Err(_)) => {
488 tracing::debug!(
490 path = %uri.path(),
491 "Continuum: mock generation failed, using real response"
492 );
493 }
495 (Err(e), Ok(Some(mock_response))) => {
496 tracing::debug!(
498 path = %uri.path(),
499 error = %e,
500 "Continuum: proxy failed, using mock response"
501 );
502 let mut source = ResponseSource::new(
504 ResponsePriority::Mock,
505 "continuum_fallback".to_string(),
506 )
507 .with_metadata("generated_from".to_string(), "openapi_spec".to_string())
508 .with_metadata(
509 "fallback_reason".to_string(),
510 "proxy_failed".to_string(),
511 );
512
513 if let Some(mode) = migration_mode {
514 source = source.with_metadata(
515 "migration_mode".to_string(),
516 format!("{:?}", mode),
517 );
518 }
519
520 return Ok(PriorityResponse {
521 source,
522 status_code: mock_response.status_code,
523 headers: mock_response.headers,
524 body: mock_response.body.into_bytes(),
525 content_type: mock_response.content_type,
526 });
527 }
528 (Err(e), _) => {
529 tracing::warn!(
531 path = %uri.path(),
532 error = %e,
533 "Continuum: both proxy and mock failed"
534 );
535 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode
537 {
538 return Err(Error::generic(format!(
539 "Proxy request failed in real mode: {}",
540 e
541 )));
542 }
543 }
545 }
546 }
547
548 match proxy_handler.proxy_request(method, uri, headers, body).await {
550 Ok(proxy_response) => {
551 let mut response_headers = HashMap::new();
552 for (key, value) in proxy_response.headers.iter() {
553 let key_str = key.as_str();
554 if let Ok(value_str) = value.to_str() {
555 response_headers.insert(key_str.to_string(), value_str.to_string());
556 }
557 }
558
559 let content_type = response_headers
560 .get("content-type")
561 .unwrap_or(&"application/json".to_string())
562 .clone();
563
564 if is_shadow {
566 if let Some(ref mock_generator) = self.mock_generator {
567 if let Ok(Some(mock_response)) = mock_generator
568 .generate_mock_response(&fingerprint, headers, body)
569 {
570 tracing::info!(
572 path = %uri.path(),
573 real_status = proxy_response.status_code,
574 mock_status = mock_response.status_code,
575 "Shadow mode: comparing real and mock responses"
576 );
577
578 let real_body_bytes =
580 proxy_response.body.clone().unwrap_or_default();
581 let real_body = String::from_utf8_lossy(&real_body_bytes);
582 let mock_body = &mock_response.body;
583
584 if real_body != *mock_body {
585 tracing::warn!(
586 path = %uri.path(),
587 "Shadow mode: real and mock responses differ"
588 );
589 }
590 }
591 }
592 }
593
594 let mut source = ResponseSource::new(
595 ResponsePriority::Proxy,
596 if is_shadow {
597 "shadow".to_string()
598 } else {
599 "proxy".to_string()
600 },
601 )
602 .with_metadata(
603 "upstream_url".to_string(),
604 proxy_handler.config.get_upstream_url(uri.path()),
605 );
606
607 if let Some(mode) = migration_mode {
608 source = source
609 .with_metadata("migration_mode".to_string(), format!("{:?}", mode));
610 }
611
612 return Ok(PriorityResponse {
613 source,
614 status_code: proxy_response.status_code,
615 headers: response_headers,
616 body: proxy_response.body.unwrap_or_default(),
617 content_type,
618 });
619 }
620 Err(e) => {
621 tracing::warn!("Proxy request failed: {}", e);
622 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode {
624 return Err(Error::generic(format!(
625 "Proxy request failed in real mode: {}",
626 e
627 )));
628 }
629 }
631 }
632 }
633 }
634
635 if let Some(ref mock_generator) = self.mock_generator {
637 let migration_mode = if let Some(ref proxy_handler) = self.proxy_handler {
639 if proxy_handler.config.migration_enabled {
640 proxy_handler.config.get_effective_migration_mode(uri.path())
641 } else {
642 None
643 }
644 } else {
645 None
646 };
647
648 if let Some(mock_response) =
649 mock_generator.generate_mock_response(&fingerprint, headers, body)?
650 {
651 let mut source = ResponseSource::new(ResponsePriority::Mock, "mock".to_string())
652 .with_metadata("generated_from".to_string(), "openapi_spec".to_string());
653
654 if let Some(mode) = migration_mode {
655 source =
656 source.with_metadata("migration_mode".to_string(), format!("{:?}", mode));
657 }
658
659 return Ok(PriorityResponse {
660 source,
661 status_code: mock_response.status_code,
662 headers: mock_response.headers,
663 body: mock_response.body.into_bytes(),
664 content_type: mock_response.content_type,
665 });
666 }
667 }
668
669 if self.record_replay.record_handler().should_record(method) {
671 let default_response = serde_json::json!({
673 "message": "Request recorded for future replay",
674 "timestamp": chrono::Utc::now().to_rfc3339(),
675 "fingerprint": fingerprint.to_hash()
676 });
677
678 let response_body = serde_json::to_string(&default_response)?;
679 let status_code = 200;
680
681 self.record_replay
683 .record_handler()
684 .record_request(&fingerprint, status_code, headers, &response_body, None)
685 .await?;
686
687 return Ok(PriorityResponse {
688 source: ResponseSource::new(ResponsePriority::Record, "record".to_string())
689 .with_metadata("recorded".to_string(), "true".to_string()),
690 status_code,
691 headers: HashMap::new(),
692 body: response_body.into_bytes(),
693 content_type: "application/json".to_string(),
694 });
695 }
696
697 Err(Error::generic("No handler could process the request".to_string()))
699 }
700}
701
702#[derive(Debug, Clone)]
704pub struct PriorityResponse {
705 pub source: ResponseSource,
707 pub status_code: u16,
709 pub headers: HashMap<String, String>,
711 pub body: Vec<u8>,
713 pub content_type: String,
715}
716
717impl PriorityResponse {
718 pub fn to_axum_response(self) -> axum::response::Response {
720 let mut response = axum::response::Response::new(axum::body::Body::from(self.body));
721 *response.status_mut() = StatusCode::from_u16(self.status_code).unwrap_or(StatusCode::OK);
722
723 for (key, value) in self.headers {
725 if let (Ok(header_name), Ok(header_value)) =
726 (key.parse::<axum::http::HeaderName>(), value.parse::<axum::http::HeaderValue>())
727 {
728 response.headers_mut().insert(header_name, header_value);
729 }
730 }
731
732 if !response.headers().contains_key("content-type") {
734 if let Ok(header_value) = self.content_type.parse::<axum::http::HeaderValue>() {
735 response.headers_mut().insert("content-type", header_value);
736 }
737 }
738
739 response
740 }
741}
742
743pub struct SimpleMockGenerator {
745 pub default_status: u16,
747 pub default_body: String,
749}
750
751impl SimpleMockGenerator {
752 pub fn new(default_status: u16, default_body: String) -> Self {
754 Self {
755 default_status,
756 default_body,
757 }
758 }
759}
760
761impl MockGenerator for SimpleMockGenerator {
762 fn generate_mock_response(
763 &self,
764 _fingerprint: &RequestFingerprint,
765 _headers: &HeaderMap,
766 _body: Option<&[u8]>,
767 ) -> Result<Option<MockResponse>> {
768 Ok(Some(MockResponse {
769 status_code: self.default_status,
770 headers: HashMap::new(),
771 body: self.default_body.clone(),
772 content_type: "application/json".to_string(),
773 }))
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780 use tempfile::TempDir;
781
782 #[tokio::test]
783 async fn test_priority_chain() {
784 let temp_dir = TempDir::new().unwrap();
785 let fixtures_dir = temp_dir.path().to_path_buf();
786
787 let record_replay = RecordReplayHandler::new(fixtures_dir, true, true, false);
788 let mock_generator =
789 Box::new(SimpleMockGenerator::new(200, r#"{"message": "mock response"}"#.to_string()));
790
791 let handler = PriorityHttpHandler::new_with_openapi(
792 record_replay,
793 None, None, Some(mock_generator),
796 None, );
798
799 let method = Method::GET;
800 let uri = Uri::from_static("/api/test");
801 let headers = HeaderMap::new();
802
803 let response = handler.process_request(&method, &uri, &headers, None).await.unwrap();
804
805 assert_eq!(response.status_code, 200);
806 assert_eq!(response.source.source_type, "mock");
807 }
808}