1use crate::stateful_handler::StatefulResponseHandler;
5use crate::{
6 CustomFixtureLoader, Error, FailureInjector, ProxyHandler, RealityContinuumEngine,
7 RecordReplayHandler, RequestFingerprint, ResponsePriority, ResponseSource, Result,
8 RouteChaosInjector,
9};
10use axum::http::{HeaderMap, Method, StatusCode, Uri};
11use std::collections::HashMap;
12use std::sync::Arc;
13
14pub struct PriorityHttpHandler {
16 custom_fixture_loader: Option<Arc<CustomFixtureLoader>>,
18 record_replay: RecordReplayHandler,
20 stateful_handler: Option<Arc<StatefulResponseHandler>>,
22 route_chaos_injector: Option<Arc<RouteChaosInjector>>,
24 failure_injector: Option<FailureInjector>,
26 proxy_handler: Option<ProxyHandler>,
28 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
30 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
32 continuum_engine: Option<Arc<RealityContinuumEngine>>,
34}
35
36pub trait MockGenerator {
38 fn generate_mock_response(
40 &self,
41 fingerprint: &RequestFingerprint,
42 headers: &HeaderMap,
43 body: Option<&[u8]>,
44 ) -> Result<Option<MockResponse>>;
45}
46
47#[derive(Debug, Clone)]
49pub struct MockResponse {
50 pub status_code: u16,
52 pub headers: HashMap<String, String>,
54 pub body: String,
56 pub content_type: String,
58}
59
60impl PriorityHttpHandler {
61 pub fn new(
63 record_replay: RecordReplayHandler,
64 failure_injector: Option<FailureInjector>,
65 proxy_handler: Option<ProxyHandler>,
66 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
67 ) -> Self {
68 Self {
69 custom_fixture_loader: None,
70 record_replay,
71 stateful_handler: None,
72 route_chaos_injector: None,
73 failure_injector,
74 proxy_handler,
75 mock_generator,
76 openapi_spec: None,
77 continuum_engine: None,
78 }
79 }
80
81 pub fn new_with_openapi(
83 record_replay: RecordReplayHandler,
84 failure_injector: Option<FailureInjector>,
85 proxy_handler: Option<ProxyHandler>,
86 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
87 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
88 ) -> Self {
89 Self {
90 custom_fixture_loader: None,
91 record_replay,
92 stateful_handler: None,
93 route_chaos_injector: None,
94 failure_injector,
95 proxy_handler,
96 mock_generator,
97 openapi_spec,
98 continuum_engine: None,
99 }
100 }
101
102 pub fn with_custom_fixture_loader(mut self, loader: Arc<CustomFixtureLoader>) -> Self {
104 self.custom_fixture_loader = Some(loader);
105 self
106 }
107
108 pub fn with_stateful_handler(mut self, handler: Arc<StatefulResponseHandler>) -> Self {
110 self.stateful_handler = Some(handler);
111 self
112 }
113
114 pub fn with_route_chaos_injector(mut self, injector: Arc<RouteChaosInjector>) -> Self {
116 self.route_chaos_injector = Some(injector);
117 self
118 }
119
120 pub fn with_continuum_engine(mut self, engine: Arc<RealityContinuumEngine>) -> Self {
122 self.continuum_engine = Some(engine);
123 self
124 }
125
126 pub async fn process_request(
128 &self,
129 method: &Method,
130 uri: &Uri,
131 headers: &HeaderMap,
132 body: Option<&[u8]>,
133 ) -> Result<PriorityResponse> {
134 let fingerprint = RequestFingerprint::new(method.clone(), uri, headers, body);
135
136 if let Some(ref custom_loader) = self.custom_fixture_loader {
138 if let Some(custom_fixture) = custom_loader.load_fixture(&fingerprint) {
139 if custom_fixture.delay_ms > 0 {
141 tokio::time::sleep(tokio::time::Duration::from_millis(
142 custom_fixture.delay_ms,
143 ))
144 .await;
145 }
146
147 let response_body = if custom_fixture.response.is_string() {
149 custom_fixture.response.as_str().unwrap().to_string()
150 } else {
151 serde_json::to_string(&custom_fixture.response).map_err(|e| {
152 Error::generic(format!("Failed to serialize custom fixture response: {}", e))
153 })?
154 };
155
156 let content_type = custom_fixture
158 .headers
159 .get("content-type")
160 .cloned()
161 .unwrap_or_else(|| "application/json".to_string());
162
163 return Ok(PriorityResponse {
164 source: ResponseSource::new(
165 ResponsePriority::Replay,
166 "custom_fixture".to_string(),
167 )
168 .with_metadata("fixture_path".to_string(), custom_fixture.path.clone()),
169 status_code: custom_fixture.status,
170 headers: custom_fixture.headers.clone(),
171 body: response_body.into_bytes(),
172 content_type,
173 });
174 }
175 }
176
177 if let Some(recorded_request) =
179 self.record_replay.replay_handler().load_fixture(&fingerprint).await?
180 {
181 let content_type = recorded_request
182 .response_headers
183 .get("content-type")
184 .unwrap_or(&"application/json".to_string())
185 .clone();
186
187 return Ok(PriorityResponse {
188 source: ResponseSource::new(ResponsePriority::Replay, "replay".to_string())
189 .with_metadata("fixture_path".to_string(), "recorded".to_string()),
190 status_code: recorded_request.status_code,
191 headers: recorded_request.response_headers,
192 body: recorded_request.response_body.into_bytes(),
193 content_type,
194 });
195 }
196
197 if let Some(ref stateful_handler) = self.stateful_handler {
199 if let Some(stateful_response) =
200 stateful_handler.process_request(method, uri, headers, body).await?
201 {
202 return Ok(PriorityResponse {
203 source: ResponseSource::new(ResponsePriority::Stateful, "stateful".to_string())
204 .with_metadata("state".to_string(), stateful_response.state)
205 .with_metadata("resource_id".to_string(), stateful_response.resource_id),
206 status_code: stateful_response.status_code,
207 headers: stateful_response.headers,
208 body: stateful_response.body.into_bytes(),
209 content_type: stateful_response.content_type,
210 });
211 }
212 }
213
214 if let Some(ref route_chaos) = self.route_chaos_injector {
216 if let Err(e) = route_chaos.inject_latency(method, uri).await {
218 tracing::warn!("Failed to inject per-route latency: {}", e);
219 }
220
221 if let Some(fault_response) = route_chaos.get_fault_response(method, uri) {
223 let error_response = serde_json::json!({
224 "error": fault_response.error_message,
225 "injected_failure": true,
226 "fault_type": fault_response.fault_type,
227 "timestamp": chrono::Utc::now().to_rfc3339()
228 });
229
230 return Ok(PriorityResponse {
231 source: ResponseSource::new(
232 ResponsePriority::Fail,
233 "route_fault_injection".to_string(),
234 )
235 .with_metadata("fault_type".to_string(), fault_response.fault_type)
236 .with_metadata("error_message".to_string(), fault_response.error_message),
237 status_code: fault_response.status_code,
238 headers: HashMap::new(),
239 body: serde_json::to_string(&error_response)?.into_bytes(),
240 content_type: "application/json".to_string(),
241 });
242 }
243 }
244
245 if let Some(ref failure_injector) = self.failure_injector {
247 let tags = if let Some(ref spec) = self.openapi_spec {
248 fingerprint.openapi_tags(spec).unwrap_or_else(|| fingerprint.tags())
249 } else {
250 fingerprint.tags()
251 };
252 if let Some((status_code, error_message)) = failure_injector.process_request(&tags) {
253 let error_response = serde_json::json!({
254 "error": error_message,
255 "injected_failure": true,
256 "timestamp": chrono::Utc::now().to_rfc3339()
257 });
258
259 return Ok(PriorityResponse {
260 source: ResponseSource::new(
261 ResponsePriority::Fail,
262 "failure_injection".to_string(),
263 )
264 .with_metadata("error_message".to_string(), error_message),
265 status_code,
266 headers: HashMap::new(),
267 body: serde_json::to_string(&error_response)?.into_bytes(),
268 content_type: "application/json".to_string(),
269 });
270 }
271 }
272
273 let should_blend = if let Some(ref continuum_engine) = self.continuum_engine {
275 continuum_engine.is_enabled().await
276 } else {
277 false
278 };
279
280 if let Some(ref proxy_handler) = self.proxy_handler {
282 let migration_mode = if proxy_handler.config.migration_enabled {
284 proxy_handler.config.get_effective_migration_mode(uri.path())
285 } else {
286 None
287 };
288
289 if let Some(crate::proxy::config::MigrationMode::Mock) = migration_mode {
291 } else if proxy_handler.config.should_proxy_with_condition(method, uri, headers, body) {
293 let is_shadow = proxy_handler.config.should_shadow(uri.path());
295
296 if should_blend {
298 let proxy_future = proxy_handler.proxy_request(method, uri, headers, body);
300 let mock_result = if let Some(ref mock_generator) = self.mock_generator {
301 mock_generator.generate_mock_response(&fingerprint, headers, body)
302 } else {
303 Ok(None)
304 };
305
306 let proxy_result = proxy_future.await;
308
309 match (proxy_result, mock_result) {
311 (Ok(proxy_response), Ok(Some(mock_response))) => {
312 if let Some(ref continuum_engine) = self.continuum_engine {
314 let blend_ratio =
315 continuum_engine.get_blend_ratio(uri.path()).await;
316 let blender = continuum_engine.blender();
317
318 let mock_body_str = &mock_response.body;
320 let real_body_bytes =
321 proxy_response.body.clone().unwrap_or_default();
322 let real_body_str = String::from_utf8_lossy(&real_body_bytes);
323
324 let mock_json: serde_json::Value =
325 serde_json::from_str(mock_body_str)
326 .unwrap_or_else(|_| serde_json::json!({}));
327 let real_json: serde_json::Value =
328 serde_json::from_str(&real_body_str)
329 .unwrap_or_else(|_| serde_json::json!({}));
330
331 let blended_json =
333 blender.blend_responses(&mock_json, &real_json, blend_ratio);
334 let blended_body = serde_json::to_string(&blended_json)
335 .unwrap_or_else(|_| real_body_str.to_string());
336
337 let blended_status = blender.blend_status_code(
339 mock_response.status_code,
340 proxy_response.status_code,
341 blend_ratio,
342 );
343
344 let mut proxy_headers = HashMap::new();
346 for (key, value) in proxy_response.headers.iter() {
347 if let Ok(value_str) = value.to_str() {
348 proxy_headers.insert(
349 key.as_str().to_string(),
350 value_str.to_string(),
351 );
352 }
353 }
354 let blended_headers = blender.blend_headers(
355 &mock_response.headers,
356 &proxy_headers,
357 blend_ratio,
358 );
359
360 let content_type = blended_headers
361 .get("content-type")
362 .cloned()
363 .or_else(|| {
364 proxy_response
365 .headers
366 .get("content-type")
367 .and_then(|v| v.to_str().ok())
368 .map(|s| s.to_string())
369 })
370 .unwrap_or_else(|| "application/json".to_string());
371
372 tracing::info!(
373 path = %uri.path(),
374 blend_ratio = blend_ratio,
375 "Reality Continuum: blended mock and real responses"
376 );
377
378 let mut source = ResponseSource::new(
379 ResponsePriority::Proxy,
380 "continuum".to_string(),
381 )
382 .with_metadata("blend_ratio".to_string(), blend_ratio.to_string())
383 .with_metadata(
384 "upstream_url".to_string(),
385 proxy_handler.config.get_upstream_url(uri.path()),
386 );
387
388 if let Some(mode) = migration_mode {
389 source = source.with_metadata(
390 "migration_mode".to_string(),
391 format!("{:?}", mode),
392 );
393 }
394
395 return Ok(PriorityResponse {
396 source,
397 status_code: blended_status,
398 headers: blended_headers,
399 body: blended_body.into_bytes(),
400 content_type,
401 });
402 }
403 }
404 (Ok(proxy_response), Ok(None)) => {
405 tracing::debug!(
407 path = %uri.path(),
408 "Continuum: mock generation failed, using real response"
409 );
410 }
412 (Ok(proxy_response), Err(_)) => {
413 tracing::debug!(
415 path = %uri.path(),
416 "Continuum: mock generation failed, using real response"
417 );
418 }
420 (Err(e), Ok(Some(mock_response))) => {
421 tracing::debug!(
423 path = %uri.path(),
424 error = %e,
425 "Continuum: proxy failed, using mock response"
426 );
427 let mut source = ResponseSource::new(
429 ResponsePriority::Mock,
430 "continuum_fallback".to_string(),
431 )
432 .with_metadata("generated_from".to_string(), "openapi_spec".to_string())
433 .with_metadata(
434 "fallback_reason".to_string(),
435 "proxy_failed".to_string(),
436 );
437
438 if let Some(mode) = migration_mode {
439 source = source.with_metadata(
440 "migration_mode".to_string(),
441 format!("{:?}", mode),
442 );
443 }
444
445 return Ok(PriorityResponse {
446 source,
447 status_code: mock_response.status_code,
448 headers: mock_response.headers,
449 body: mock_response.body.into_bytes(),
450 content_type: mock_response.content_type,
451 });
452 }
453 (Err(e), _) => {
454 tracing::warn!(
456 path = %uri.path(),
457 error = %e,
458 "Continuum: both proxy and mock failed"
459 );
460 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode
462 {
463 return Err(Error::generic(format!(
464 "Proxy request failed in real mode: {}",
465 e
466 )));
467 }
468 }
470 }
471 }
472
473 match proxy_handler.proxy_request(method, uri, headers, body).await {
475 Ok(proxy_response) => {
476 let mut response_headers = HashMap::new();
477 for (key, value) in proxy_response.headers.iter() {
478 let key_str = key.as_str();
479 if let Ok(value_str) = value.to_str() {
480 response_headers.insert(key_str.to_string(), value_str.to_string());
481 }
482 }
483
484 let content_type = response_headers
485 .get("content-type")
486 .unwrap_or(&"application/json".to_string())
487 .clone();
488
489 if is_shadow {
491 if let Some(ref mock_generator) = self.mock_generator {
492 if let Ok(Some(mock_response)) = mock_generator
493 .generate_mock_response(&fingerprint, headers, body)
494 {
495 tracing::info!(
497 path = %uri.path(),
498 real_status = proxy_response.status_code,
499 mock_status = mock_response.status_code,
500 "Shadow mode: comparing real and mock responses"
501 );
502
503 let real_body_bytes =
505 proxy_response.body.clone().unwrap_or_default();
506 let real_body = String::from_utf8_lossy(&real_body_bytes);
507 let mock_body = &mock_response.body;
508
509 if real_body != *mock_body {
510 tracing::warn!(
511 path = %uri.path(),
512 "Shadow mode: real and mock responses differ"
513 );
514 }
515 }
516 }
517 }
518
519 let mut source = ResponseSource::new(
520 ResponsePriority::Proxy,
521 if is_shadow {
522 "shadow".to_string()
523 } else {
524 "proxy".to_string()
525 },
526 )
527 .with_metadata(
528 "upstream_url".to_string(),
529 proxy_handler.config.get_upstream_url(uri.path()),
530 );
531
532 if let Some(mode) = migration_mode {
533 source = source
534 .with_metadata("migration_mode".to_string(), format!("{:?}", mode));
535 }
536
537 return Ok(PriorityResponse {
538 source,
539 status_code: proxy_response.status_code,
540 headers: response_headers,
541 body: proxy_response.body.unwrap_or_default(),
542 content_type,
543 });
544 }
545 Err(e) => {
546 tracing::warn!("Proxy request failed: {}", e);
547 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode {
549 return Err(Error::generic(format!(
550 "Proxy request failed in real mode: {}",
551 e
552 )));
553 }
554 }
556 }
557 }
558 }
559
560 if let Some(ref mock_generator) = self.mock_generator {
562 let migration_mode = if let Some(ref proxy_handler) = self.proxy_handler {
564 if proxy_handler.config.migration_enabled {
565 proxy_handler.config.get_effective_migration_mode(uri.path())
566 } else {
567 None
568 }
569 } else {
570 None
571 };
572
573 if let Some(mock_response) =
574 mock_generator.generate_mock_response(&fingerprint, headers, body)?
575 {
576 let mut source = ResponseSource::new(ResponsePriority::Mock, "mock".to_string())
577 .with_metadata("generated_from".to_string(), "openapi_spec".to_string());
578
579 if let Some(mode) = migration_mode {
580 source =
581 source.with_metadata("migration_mode".to_string(), format!("{:?}", mode));
582 }
583
584 return Ok(PriorityResponse {
585 source,
586 status_code: mock_response.status_code,
587 headers: mock_response.headers,
588 body: mock_response.body.into_bytes(),
589 content_type: mock_response.content_type,
590 });
591 }
592 }
593
594 if self.record_replay.record_handler().should_record(method) {
596 let default_response = serde_json::json!({
598 "message": "Request recorded for future replay",
599 "timestamp": chrono::Utc::now().to_rfc3339(),
600 "fingerprint": fingerprint.to_hash()
601 });
602
603 let response_body = serde_json::to_string(&default_response)?;
604 let status_code = 200;
605
606 self.record_replay
608 .record_handler()
609 .record_request(&fingerprint, status_code, headers, &response_body, None)
610 .await?;
611
612 return Ok(PriorityResponse {
613 source: ResponseSource::new(ResponsePriority::Record, "record".to_string())
614 .with_metadata("recorded".to_string(), "true".to_string()),
615 status_code,
616 headers: HashMap::new(),
617 body: response_body.into_bytes(),
618 content_type: "application/json".to_string(),
619 });
620 }
621
622 Err(Error::generic("No handler could process the request".to_string()))
624 }
625}
626
627#[derive(Debug, Clone)]
629pub struct PriorityResponse {
630 pub source: ResponseSource,
632 pub status_code: u16,
634 pub headers: HashMap<String, String>,
636 pub body: Vec<u8>,
638 pub content_type: String,
640}
641
642impl PriorityResponse {
643 pub fn to_axum_response(self) -> axum::response::Response {
645 let mut response = axum::response::Response::new(axum::body::Body::from(self.body));
646 *response.status_mut() = StatusCode::from_u16(self.status_code).unwrap_or(StatusCode::OK);
647
648 for (key, value) in self.headers {
650 if let (Ok(header_name), Ok(header_value)) =
651 (key.parse::<axum::http::HeaderName>(), value.parse::<axum::http::HeaderValue>())
652 {
653 response.headers_mut().insert(header_name, header_value);
654 }
655 }
656
657 if !response.headers().contains_key("content-type") {
659 if let Ok(header_value) = self.content_type.parse::<axum::http::HeaderValue>() {
660 response.headers_mut().insert("content-type", header_value);
661 }
662 }
663
664 response
665 }
666}
667
668pub struct SimpleMockGenerator {
670 pub default_status: u16,
672 pub default_body: String,
674}
675
676impl SimpleMockGenerator {
677 pub fn new(default_status: u16, default_body: String) -> Self {
679 Self {
680 default_status,
681 default_body,
682 }
683 }
684}
685
686impl MockGenerator for SimpleMockGenerator {
687 fn generate_mock_response(
688 &self,
689 _fingerprint: &RequestFingerprint,
690 _headers: &HeaderMap,
691 _body: Option<&[u8]>,
692 ) -> Result<Option<MockResponse>> {
693 Ok(Some(MockResponse {
694 status_code: self.default_status,
695 headers: HashMap::new(),
696 body: self.default_body.clone(),
697 content_type: "application/json".to_string(),
698 }))
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use tempfile::TempDir;
706
707 #[tokio::test]
708 async fn test_priority_chain() {
709 let temp_dir = TempDir::new().unwrap();
710 let fixtures_dir = temp_dir.path().to_path_buf();
711
712 let record_replay = RecordReplayHandler::new(fixtures_dir, true, true, false);
713 let mock_generator =
714 Box::new(SimpleMockGenerator::new(200, r#"{"message": "mock response"}"#.to_string()));
715
716 let handler = PriorityHttpHandler::new_with_openapi(
717 record_replay,
718 None, None, Some(mock_generator),
721 None, );
723
724 let method = Method::GET;
725 let uri = Uri::from_static("/api/test");
726 let headers = HeaderMap::new();
727
728 let response = handler.process_request(&method, &uri, &headers, None).await.unwrap();
729
730 assert_eq!(response.status_code, 200);
731 assert_eq!(response.source.source_type, "mock");
732 }
733}