1use crate::stateful_handler::StatefulResponseHandler;
5use crate::{
6 Error, FailureInjector, ProxyHandler, RealityContinuumEngine, RecordReplayHandler,
7 RequestFingerprint, ResponsePriority, ResponseSource, Result, RouteChaosInjector,
8};
9use axum::http::{HeaderMap, Method, StatusCode, Uri};
10use std::collections::HashMap;
11use std::sync::Arc;
12
13pub struct PriorityHttpHandler {
15 record_replay: RecordReplayHandler,
17 stateful_handler: Option<Arc<StatefulResponseHandler>>,
19 route_chaos_injector: Option<Arc<RouteChaosInjector>>,
21 failure_injector: Option<FailureInjector>,
23 proxy_handler: Option<ProxyHandler>,
25 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
27 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
29 continuum_engine: Option<Arc<RealityContinuumEngine>>,
31}
32
33pub trait MockGenerator {
35 fn generate_mock_response(
37 &self,
38 fingerprint: &RequestFingerprint,
39 headers: &HeaderMap,
40 body: Option<&[u8]>,
41 ) -> Result<Option<MockResponse>>;
42}
43
44#[derive(Debug, Clone)]
46pub struct MockResponse {
47 pub status_code: u16,
49 pub headers: HashMap<String, String>,
51 pub body: String,
53 pub content_type: String,
55}
56
57impl PriorityHttpHandler {
58 pub fn new(
60 record_replay: RecordReplayHandler,
61 failure_injector: Option<FailureInjector>,
62 proxy_handler: Option<ProxyHandler>,
63 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
64 ) -> Self {
65 Self {
66 record_replay,
67 stateful_handler: None,
68 route_chaos_injector: None,
69 failure_injector,
70 proxy_handler,
71 mock_generator,
72 openapi_spec: None,
73 continuum_engine: None,
74 }
75 }
76
77 pub fn new_with_openapi(
79 record_replay: RecordReplayHandler,
80 failure_injector: Option<FailureInjector>,
81 proxy_handler: Option<ProxyHandler>,
82 mock_generator: Option<Box<dyn MockGenerator + Send + Sync>>,
83 openapi_spec: Option<crate::openapi::spec::OpenApiSpec>,
84 ) -> Self {
85 Self {
86 record_replay,
87 stateful_handler: None,
88 route_chaos_injector: None,
89 failure_injector,
90 proxy_handler,
91 mock_generator,
92 openapi_spec,
93 continuum_engine: None,
94 }
95 }
96
97 pub fn with_stateful_handler(mut self, handler: Arc<StatefulResponseHandler>) -> Self {
99 self.stateful_handler = Some(handler);
100 self
101 }
102
103 pub fn with_route_chaos_injector(mut self, injector: Arc<RouteChaosInjector>) -> Self {
105 self.route_chaos_injector = Some(injector);
106 self
107 }
108
109 pub fn with_continuum_engine(mut self, engine: Arc<RealityContinuumEngine>) -> Self {
111 self.continuum_engine = Some(engine);
112 self
113 }
114
115 pub async fn process_request(
117 &self,
118 method: &Method,
119 uri: &Uri,
120 headers: &HeaderMap,
121 body: Option<&[u8]>,
122 ) -> Result<PriorityResponse> {
123 let fingerprint = RequestFingerprint::new(method.clone(), uri, headers, body);
124
125 if let Some(recorded_request) =
127 self.record_replay.replay_handler().load_fixture(&fingerprint).await?
128 {
129 let content_type = recorded_request
130 .response_headers
131 .get("content-type")
132 .unwrap_or(&"application/json".to_string())
133 .clone();
134
135 return Ok(PriorityResponse {
136 source: ResponseSource::new(ResponsePriority::Replay, "replay".to_string())
137 .with_metadata("fixture_path".to_string(), "recorded".to_string()),
138 status_code: recorded_request.status_code,
139 headers: recorded_request.response_headers,
140 body: recorded_request.response_body.into_bytes(),
141 content_type,
142 });
143 }
144
145 if let Some(ref stateful_handler) = self.stateful_handler {
147 if let Some(stateful_response) =
148 stateful_handler.process_request(method, uri, headers, body).await?
149 {
150 return Ok(PriorityResponse {
151 source: ResponseSource::new(ResponsePriority::Stateful, "stateful".to_string())
152 .with_metadata("state".to_string(), stateful_response.state)
153 .with_metadata("resource_id".to_string(), stateful_response.resource_id),
154 status_code: stateful_response.status_code,
155 headers: stateful_response.headers,
156 body: stateful_response.body.into_bytes(),
157 content_type: stateful_response.content_type,
158 });
159 }
160 }
161
162 if let Some(ref route_chaos) = self.route_chaos_injector {
164 if let Err(e) = route_chaos.inject_latency(method, uri).await {
166 tracing::warn!("Failed to inject per-route latency: {}", e);
167 }
168
169 if let Some(fault_response) = route_chaos.get_fault_response(method, uri) {
171 let error_response = serde_json::json!({
172 "error": fault_response.error_message,
173 "injected_failure": true,
174 "fault_type": fault_response.fault_type,
175 "timestamp": chrono::Utc::now().to_rfc3339()
176 });
177
178 return Ok(PriorityResponse {
179 source: ResponseSource::new(
180 ResponsePriority::Fail,
181 "route_fault_injection".to_string(),
182 )
183 .with_metadata("fault_type".to_string(), fault_response.fault_type)
184 .with_metadata("error_message".to_string(), fault_response.error_message),
185 status_code: fault_response.status_code,
186 headers: HashMap::new(),
187 body: serde_json::to_string(&error_response)?.into_bytes(),
188 content_type: "application/json".to_string(),
189 });
190 }
191 }
192
193 if let Some(ref failure_injector) = self.failure_injector {
195 let tags = if let Some(ref spec) = self.openapi_spec {
196 fingerprint.openapi_tags(spec).unwrap_or_else(|| fingerprint.tags())
197 } else {
198 fingerprint.tags()
199 };
200 if let Some((status_code, error_message)) = failure_injector.process_request(&tags) {
201 let error_response = serde_json::json!({
202 "error": error_message,
203 "injected_failure": true,
204 "timestamp": chrono::Utc::now().to_rfc3339()
205 });
206
207 return Ok(PriorityResponse {
208 source: ResponseSource::new(
209 ResponsePriority::Fail,
210 "failure_injection".to_string(),
211 )
212 .with_metadata("error_message".to_string(), error_message),
213 status_code,
214 headers: HashMap::new(),
215 body: serde_json::to_string(&error_response)?.into_bytes(),
216 content_type: "application/json".to_string(),
217 });
218 }
219 }
220
221 let should_blend = if let Some(ref continuum_engine) = self.continuum_engine {
223 continuum_engine.is_enabled().await
224 } else {
225 false
226 };
227
228 if let Some(ref proxy_handler) = self.proxy_handler {
230 let migration_mode = if proxy_handler.config.migration_enabled {
232 proxy_handler.config.get_effective_migration_mode(uri.path())
233 } else {
234 None
235 };
236
237 if let Some(crate::proxy::config::MigrationMode::Mock) = migration_mode {
239 } else if proxy_handler.config.should_proxy_with_condition(method, uri, headers, body) {
241 let is_shadow = proxy_handler.config.should_shadow(uri.path());
243
244 if should_blend {
246 let proxy_future = proxy_handler.proxy_request(method, uri, headers, body);
248 let mock_result = if let Some(ref mock_generator) = self.mock_generator {
249 mock_generator.generate_mock_response(&fingerprint, headers, body)
250 } else {
251 Ok(None)
252 };
253
254 let proxy_result = proxy_future.await;
256
257 match (proxy_result, mock_result) {
259 (Ok(proxy_response), Ok(Some(mock_response))) => {
260 if let Some(ref continuum_engine) = self.continuum_engine {
262 let blend_ratio =
263 continuum_engine.get_blend_ratio(uri.path()).await;
264 let blender = continuum_engine.blender();
265
266 let mock_body_str = &mock_response.body;
268 let real_body_bytes =
269 proxy_response.body.clone().unwrap_or_default();
270 let real_body_str = String::from_utf8_lossy(&real_body_bytes);
271
272 let mock_json: serde_json::Value =
273 serde_json::from_str(mock_body_str)
274 .unwrap_or_else(|_| serde_json::json!({}));
275 let real_json: serde_json::Value =
276 serde_json::from_str(&real_body_str)
277 .unwrap_or_else(|_| serde_json::json!({}));
278
279 let blended_json =
281 blender.blend_responses(&mock_json, &real_json, blend_ratio);
282 let blended_body = serde_json::to_string(&blended_json)
283 .unwrap_or_else(|_| real_body_str.to_string());
284
285 let blended_status = blender.blend_status_code(
287 mock_response.status_code,
288 proxy_response.status_code,
289 blend_ratio,
290 );
291
292 let mut proxy_headers = HashMap::new();
294 for (key, value) in proxy_response.headers.iter() {
295 if let Ok(value_str) = value.to_str() {
296 proxy_headers.insert(
297 key.as_str().to_string(),
298 value_str.to_string(),
299 );
300 }
301 }
302 let blended_headers = blender.blend_headers(
303 &mock_response.headers,
304 &proxy_headers,
305 blend_ratio,
306 );
307
308 let content_type = blended_headers
309 .get("content-type")
310 .cloned()
311 .or_else(|| {
312 proxy_response
313 .headers
314 .get("content-type")
315 .and_then(|v| v.to_str().ok())
316 .map(|s| s.to_string())
317 })
318 .unwrap_or_else(|| "application/json".to_string());
319
320 tracing::info!(
321 path = %uri.path(),
322 blend_ratio = blend_ratio,
323 "Reality Continuum: blended mock and real responses"
324 );
325
326 let mut source = ResponseSource::new(
327 ResponsePriority::Proxy,
328 "continuum".to_string(),
329 )
330 .with_metadata("blend_ratio".to_string(), blend_ratio.to_string())
331 .with_metadata(
332 "upstream_url".to_string(),
333 proxy_handler.config.get_upstream_url(uri.path()),
334 );
335
336 if let Some(mode) = migration_mode {
337 source = source.with_metadata(
338 "migration_mode".to_string(),
339 format!("{:?}", mode),
340 );
341 }
342
343 return Ok(PriorityResponse {
344 source,
345 status_code: blended_status,
346 headers: blended_headers,
347 body: blended_body.into_bytes(),
348 content_type,
349 });
350 }
351 }
352 (Ok(proxy_response), Ok(None)) => {
353 tracing::debug!(
355 path = %uri.path(),
356 "Continuum: mock generation failed, using real response"
357 );
358 }
360 (Ok(proxy_response), Err(_)) => {
361 tracing::debug!(
363 path = %uri.path(),
364 "Continuum: mock generation failed, using real response"
365 );
366 }
368 (Err(e), Ok(Some(mock_response))) => {
369 tracing::debug!(
371 path = %uri.path(),
372 error = %e,
373 "Continuum: proxy failed, using mock response"
374 );
375 let mut source = ResponseSource::new(
377 ResponsePriority::Mock,
378 "continuum_fallback".to_string(),
379 )
380 .with_metadata("generated_from".to_string(), "openapi_spec".to_string())
381 .with_metadata(
382 "fallback_reason".to_string(),
383 "proxy_failed".to_string(),
384 );
385
386 if let Some(mode) = migration_mode {
387 source = source.with_metadata(
388 "migration_mode".to_string(),
389 format!("{:?}", mode),
390 );
391 }
392
393 return Ok(PriorityResponse {
394 source,
395 status_code: mock_response.status_code,
396 headers: mock_response.headers,
397 body: mock_response.body.into_bytes(),
398 content_type: mock_response.content_type,
399 });
400 }
401 (Err(e), _) => {
402 tracing::warn!(
404 path = %uri.path(),
405 error = %e,
406 "Continuum: both proxy and mock failed"
407 );
408 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode
410 {
411 return Err(Error::generic(format!(
412 "Proxy request failed in real mode: {}",
413 e
414 )));
415 }
416 }
418 }
419 }
420
421 match proxy_handler.proxy_request(method, uri, headers, body).await {
423 Ok(proxy_response) => {
424 let mut response_headers = HashMap::new();
425 for (key, value) in proxy_response.headers.iter() {
426 let key_str = key.as_str();
427 if let Ok(value_str) = value.to_str() {
428 response_headers.insert(key_str.to_string(), value_str.to_string());
429 }
430 }
431
432 let content_type = response_headers
433 .get("content-type")
434 .unwrap_or(&"application/json".to_string())
435 .clone();
436
437 if is_shadow {
439 if let Some(ref mock_generator) = self.mock_generator {
440 if let Ok(Some(mock_response)) = mock_generator
441 .generate_mock_response(&fingerprint, headers, body)
442 {
443 tracing::info!(
445 path = %uri.path(),
446 real_status = proxy_response.status_code,
447 mock_status = mock_response.status_code,
448 "Shadow mode: comparing real and mock responses"
449 );
450
451 let real_body_bytes =
453 proxy_response.body.clone().unwrap_or_default();
454 let real_body = String::from_utf8_lossy(&real_body_bytes);
455 let mock_body = &mock_response.body;
456
457 if real_body != *mock_body {
458 tracing::warn!(
459 path = %uri.path(),
460 "Shadow mode: real and mock responses differ"
461 );
462 }
463 }
464 }
465 }
466
467 let mut source = ResponseSource::new(
468 ResponsePriority::Proxy,
469 if is_shadow {
470 "shadow".to_string()
471 } else {
472 "proxy".to_string()
473 },
474 )
475 .with_metadata(
476 "upstream_url".to_string(),
477 proxy_handler.config.get_upstream_url(uri.path()),
478 );
479
480 if let Some(mode) = migration_mode {
481 source = source
482 .with_metadata("migration_mode".to_string(), format!("{:?}", mode));
483 }
484
485 return Ok(PriorityResponse {
486 source,
487 status_code: proxy_response.status_code,
488 headers: response_headers,
489 body: proxy_response.body.unwrap_or_default(),
490 content_type,
491 });
492 }
493 Err(e) => {
494 tracing::warn!("Proxy request failed: {}", e);
495 if let Some(crate::proxy::config::MigrationMode::Real) = migration_mode {
497 return Err(Error::generic(format!(
498 "Proxy request failed in real mode: {}",
499 e
500 )));
501 }
502 }
504 }
505 }
506 }
507
508 if let Some(ref mock_generator) = self.mock_generator {
510 let migration_mode = if let Some(ref proxy_handler) = self.proxy_handler {
512 if proxy_handler.config.migration_enabled {
513 proxy_handler.config.get_effective_migration_mode(uri.path())
514 } else {
515 None
516 }
517 } else {
518 None
519 };
520
521 if let Some(mock_response) =
522 mock_generator.generate_mock_response(&fingerprint, headers, body)?
523 {
524 let mut source = ResponseSource::new(ResponsePriority::Mock, "mock".to_string())
525 .with_metadata("generated_from".to_string(), "openapi_spec".to_string());
526
527 if let Some(mode) = migration_mode {
528 source =
529 source.with_metadata("migration_mode".to_string(), format!("{:?}", mode));
530 }
531
532 return Ok(PriorityResponse {
533 source,
534 status_code: mock_response.status_code,
535 headers: mock_response.headers,
536 body: mock_response.body.into_bytes(),
537 content_type: mock_response.content_type,
538 });
539 }
540 }
541
542 if self.record_replay.record_handler().should_record(method) {
544 let default_response = serde_json::json!({
546 "message": "Request recorded for future replay",
547 "timestamp": chrono::Utc::now().to_rfc3339(),
548 "fingerprint": fingerprint.to_hash()
549 });
550
551 let response_body = serde_json::to_string(&default_response)?;
552 let status_code = 200;
553
554 self.record_replay
556 .record_handler()
557 .record_request(&fingerprint, status_code, headers, &response_body, None)
558 .await?;
559
560 return Ok(PriorityResponse {
561 source: ResponseSource::new(ResponsePriority::Record, "record".to_string())
562 .with_metadata("recorded".to_string(), "true".to_string()),
563 status_code,
564 headers: HashMap::new(),
565 body: response_body.into_bytes(),
566 content_type: "application/json".to_string(),
567 });
568 }
569
570 Err(Error::generic("No handler could process the request".to_string()))
572 }
573}
574
575#[derive(Debug, Clone)]
577pub struct PriorityResponse {
578 pub source: ResponseSource,
580 pub status_code: u16,
582 pub headers: HashMap<String, String>,
584 pub body: Vec<u8>,
586 pub content_type: String,
588}
589
590impl PriorityResponse {
591 pub fn to_axum_response(self) -> axum::response::Response {
593 let mut response = axum::response::Response::new(axum::body::Body::from(self.body));
594 *response.status_mut() = StatusCode::from_u16(self.status_code).unwrap_or(StatusCode::OK);
595
596 for (key, value) in self.headers {
598 if let (Ok(header_name), Ok(header_value)) =
599 (key.parse::<axum::http::HeaderName>(), value.parse::<axum::http::HeaderValue>())
600 {
601 response.headers_mut().insert(header_name, header_value);
602 }
603 }
604
605 if !response.headers().contains_key("content-type") {
607 if let Ok(header_value) = self.content_type.parse::<axum::http::HeaderValue>() {
608 response.headers_mut().insert("content-type", header_value);
609 }
610 }
611
612 response
613 }
614}
615
616pub struct SimpleMockGenerator {
618 pub default_status: u16,
620 pub default_body: String,
622}
623
624impl SimpleMockGenerator {
625 pub fn new(default_status: u16, default_body: String) -> Self {
627 Self {
628 default_status,
629 default_body,
630 }
631 }
632}
633
634impl MockGenerator for SimpleMockGenerator {
635 fn generate_mock_response(
636 &self,
637 _fingerprint: &RequestFingerprint,
638 _headers: &HeaderMap,
639 _body: Option<&[u8]>,
640 ) -> Result<Option<MockResponse>> {
641 Ok(Some(MockResponse {
642 status_code: self.default_status,
643 headers: HashMap::new(),
644 body: self.default_body.clone(),
645 content_type: "application/json".to_string(),
646 }))
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653 use tempfile::TempDir;
654
655 #[tokio::test]
656 async fn test_priority_chain() {
657 let temp_dir = TempDir::new().unwrap();
658 let fixtures_dir = temp_dir.path().to_path_buf();
659
660 let record_replay = RecordReplayHandler::new(fixtures_dir, true, true, false);
661 let mock_generator =
662 Box::new(SimpleMockGenerator::new(200, r#"{"message": "mock response"}"#.to_string()));
663
664 let handler = PriorityHttpHandler::new_with_openapi(
665 record_replay,
666 None, None, Some(mock_generator),
669 None, );
671
672 let method = Method::GET;
673 let uri = Uri::from_static("/api/test");
674 let headers = HeaderMap::new();
675
676 let response = handler.process_request(&method, &uri, &headers, None).await.unwrap();
677
678 assert_eq!(response.status_code, 200);
679 assert_eq!(response.source.source_type, "mock");
680 }
681}