1use axum::{
7 extract::{State, WebSocketUpgrade},
8 response::{IntoResponse, Response},
9 routing::{get, post},
10 Json, Router,
11};
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15use crate::{
16 alerts::{Alert, AlertManager},
17 analytics::ChaosAnalytics,
18 dashboard::{DashboardManager, DashboardStats, DashboardUpdate},
19 scenario_orchestrator::ScenarioOrchestrator,
20 scenario_recorder::ScenarioRecorder,
21 scenario_replay::ScenarioReplayEngine,
22 scenario_scheduler::ScenarioScheduler,
23 scenarios::ScenarioEngine,
24 trace_collector::TraceCollector,
25};
26use mockforge_recorder::Recorder;
27use parking_lot::RwLock;
28use printpdf::*;
29use std::collections::HashMap;
30
31fn generate_flamegraph_from_trace(
33 trace_id: &str,
34 traces: &[crate::trace_collector::CollectedTrace],
35) -> String {
36 use std::collections::HashMap;
37
38 let width = 1200;
39 let height = 600;
40 let bar_height = 20;
41 let mut y_offset = 60;
42
43 let mut svg = format!(
44 r#"<svg width="{}" height="{}" xmlns="http://www.w3.org/2000/svg">
45 <rect width="100%" height="100%" fill="white"/>
46 <text x="10" y="20" font-family="monospace" font-size="12">Flamegraph for trace: {}</text>
47 <text x="10" y="35" font-family="monospace" font-size="10">Total spans: {}</text>"#,
48 width,
49 height,
50 trace_id,
51 traces.len()
52 );
53
54 let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
56 for trace in traces {
57 span_map.insert(trace.span_id.clone(), trace);
58 }
59
60 let mut root_spans = Vec::new();
62 for trace in traces {
63 if trace.parent_span_id.is_none() {
64 root_spans.push(trace);
65 }
66 }
67
68 root_spans.sort_by_key(|s| s.start_time.clone());
70
71 let min_start = traces
73 .iter()
74 .map(|t| {
75 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&t.start_time) {
76 dt.timestamp_micros() as u64
77 } else {
78 0
79 }
80 })
81 .min()
82 .unwrap_or(0);
83
84 let max_end = traces
85 .iter()
86 .map(|t| {
87 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&t.start_time) {
88 dt.timestamp_micros() as u64 + t.duration_ms * 1000
89 } else {
90 t.duration_ms * 1000
91 }
92 })
93 .max()
94 .unwrap_or(1000000);
95
96 let total_duration = max_end.saturating_sub(min_start);
97
98 let mut current_level = root_spans;
100 let mut level = 0;
101
102 while !current_level.is_empty() && y_offset + bar_height < height {
103 let mut next_level = Vec::new();
104
105 for span in ¤t_level {
106 let start_us = if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&span.start_time) {
108 dt.timestamp_micros() as u64
109 } else {
110 span.start_time.parse().unwrap_or(0)
111 };
112
113 let x = ((start_us.saturating_sub(min_start)) as f64 / total_duration as f64
114 * (width - 40) as f64) as u32
115 + 20;
116 let bar_width = ((span.duration_ms * 1000) as f64 / total_duration as f64
117 * (width - 40) as f64) as u32;
118
119 if bar_width > 0 {
120 let color = format!("#{:x}", (level * 50 + 100) % 256);
121 svg.push_str(&format!(
122 r#"<rect x="{}" y="{}" width="{}" height="{}" fill="{}" stroke="black" stroke-width="1"/>
123 <text x="{}" y="{}" font-family="monospace" font-size="10" fill="white">{}</text>"#,
124 x, y_offset, bar_width, bar_height, color,
125 x + 2, y_offset + 12, span.name
126 ));
127 }
128
129 for trace in traces {
131 if trace.parent_span_id.as_ref() == Some(&span.span_id) {
132 next_level.push(trace);
133 }
134 }
135 }
136
137 next_level.sort_by_key(|s| s.start_time.clone());
139
140 current_level = next_level;
141 y_offset += bar_height + 2;
142 level += 1;
143 }
144
145 svg.push_str("</svg>");
146 svg
147}
148
149fn calculate_max_depth(traces: &[crate::trace_collector::CollectedTrace]) -> usize {
151 use std::collections::HashMap;
152
153 let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
154 let mut depth_map: HashMap<String, usize> = HashMap::new();
155
156 for trace in traces {
158 span_map.insert(trace.span_id.clone(), trace);
159 }
160
161 for trace in traces {
163 calculate_span_depth(&trace.span_id, &span_map, &mut depth_map);
164 }
165
166 depth_map.values().cloned().max().unwrap_or(0)
167}
168
169fn calculate_span_depth(
171 span_id: &str,
172 span_map: &HashMap<String, &crate::trace_collector::CollectedTrace>,
173 depth_map: &mut HashMap<String, usize>,
174) -> usize {
175 if let Some(&depth) = depth_map.get(span_id) {
176 return depth;
177 }
178
179 let span = match span_map.get(span_id) {
180 Some(s) => s,
181 None => return 0,
182 };
183
184 let depth = if let Some(ref parent_id) = span.parent_span_id {
185 calculate_span_depth(parent_id, span_map, depth_map) + 1
186 } else {
187 0
188 };
189
190 depth_map.insert(span_id.to_string(), depth);
191 depth
192}
193
194fn find_hottest_path(traces: &[crate::trace_collector::CollectedTrace]) -> Vec<String> {
196 use std::collections::HashMap;
197
198 if traces.is_empty() {
199 return Vec::new();
200 }
201
202 let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
203
204 for trace in traces {
206 span_map.insert(trace.span_id.clone(), trace);
207 }
208
209 let mut root_spans = Vec::new();
211 for trace in traces {
212 if trace.parent_span_id.is_none() {
213 root_spans.push(trace);
214 }
215 }
216
217 if root_spans.is_empty() {
218 return Vec::new();
219 }
220
221 let mut path = Vec::new();
224 let mut current = root_spans[0];
225
226 loop {
227 path.push(current.name.clone());
228 let mut found_child = false;
229
230 for trace in traces {
232 if trace.parent_span_id.as_ref() == Some(¤t.span_id) {
233 current = trace;
234 found_child = true;
235 break;
236 }
237 }
238
239 if !found_child {
240 break;
241 }
242 }
243
244 path
245}
246
247fn generate_csv_content(scenario_names: &[String], include_comparison: bool) -> String {
250 let mut csv =
251 String::from("Scenario,Total Requests,Success Rate,Avg Latency (ms),Error Rate\n");
252
253 for scenario in scenario_names {
254 let (requests, success_rate, avg_latency, _error_rate) = match scenario.as_str() {
256 "network_degradation" => (1000, 92.5, 250.0, 7.5),
257 "service_instability" => (800, 88.0, 180.0, 12.0),
258 "cascading_failure" => (1200, 85.0, 320.0, 15.0),
259 _ => (1000, 95.0, 150.0, 5.0),
260 };
261
262 csv.push_str(&format!(
263 "{},{},{:.1},{:.1},{:.1}\n",
264 scenario,
265 requests,
266 success_rate,
267 avg_latency,
268 100.0 - success_rate
269 ));
270 }
271
272 if include_comparison && scenario_names.len() > 1 {
273 csv.push_str("\nComparison Summary\n");
274 csv.push_str("Best Success Rate,network_degradation\n");
275 csv.push_str("Worst Latency,service_instability\n");
276 csv.push_str("Highest Error Rate,cascading_failure\n");
277 }
278
279 csv
280}
281
282fn perform_scenario_comparison(baseline: &str, comparisons: &[String]) -> ComparisonResult {
284 let baseline_metrics = get_scenario_metrics(baseline);
286 let mut regressions = 0;
287 let mut improvements = 0;
288
289 for scenario in comparisons {
290 let metrics = get_scenario_metrics(scenario);
291
292 if metrics.success_rate < baseline_metrics.success_rate {
294 regressions += 1;
295 } else if metrics.success_rate > baseline_metrics.success_rate {
296 improvements += 1;
297 }
298
299 if metrics.avg_latency > baseline_metrics.avg_latency {
301 regressions += 1;
302 } else if metrics.avg_latency < baseline_metrics.avg_latency {
303 improvements += 1;
304 }
305 }
306
307 let verdict = if regressions > improvements {
308 "worse".to_string()
309 } else if improvements > regressions {
310 "better".to_string()
311 } else {
312 "similar".to_string()
313 };
314
315 ComparisonResult {
316 baseline: baseline.to_string(),
317 comparisons: comparisons.to_vec(),
318 regressions_count: regressions,
319 improvements_count: improvements,
320 verdict,
321 }
322}
323
324struct ScenarioMetrics {
326 success_rate: f64,
327 avg_latency: f64,
328}
329
330fn get_scenario_metrics(scenario: &str) -> ScenarioMetrics {
331 match scenario {
332 "network_degradation" => ScenarioMetrics {
333 success_rate: 92.5,
334 avg_latency: 250.0,
335 },
336 "service_instability" => ScenarioMetrics {
337 success_rate: 88.0,
338 avg_latency: 180.0,
339 },
340 "cascading_failure" => ScenarioMetrics {
341 success_rate: 85.0,
342 avg_latency: 320.0,
343 },
344 _ => ScenarioMetrics {
345 success_rate: 95.0,
346 avg_latency: 150.0,
347 },
348 }
349}
350
351#[derive(Clone)]
353pub struct SimpleDashboardLayoutManager {
354 layouts: Arc<RwLock<HashMap<String, DashboardLayoutSummary>>>,
355}
356
357impl SimpleDashboardLayoutManager {
358 pub fn new() -> Self {
359 let mut layouts = HashMap::new();
360 layouts.insert(
361 "chaos-overview".to_string(),
362 DashboardLayoutSummary {
363 id: "chaos-overview".to_string(),
364 name: "Chaos Engineering Overview".to_string(),
365 description: Some("Real-time overview of chaos engineering activities".to_string()),
366 widget_count: 3,
367 },
368 );
369 layouts.insert(
370 "service-perf".to_string(),
371 DashboardLayoutSummary {
372 id: "service-perf".to_string(),
373 name: "Service Performance".to_string(),
374 description: Some("Detailed service performance metrics".to_string()),
375 widget_count: 2,
376 },
377 );
378
379 Self {
380 layouts: Arc::new(RwLock::new(layouts)),
381 }
382 }
383}
384
385impl Default for SimpleDashboardLayoutManager {
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391impl SimpleDashboardLayoutManager {
392 pub fn list_layouts(&self) -> Vec<DashboardLayoutSummary> {
393 self.layouts.read().values().cloned().collect()
394 }
395
396 pub fn get_layout(&self, id: &str) -> Option<DashboardLayoutSummary> {
397 self.layouts.read().get(id).cloned()
398 }
399
400 pub fn create_layout(&self, layout: DashboardLayoutSummary) {
401 self.layouts.write().insert(layout.id.clone(), layout);
402 }
403
404 pub fn update_layout(&self, id: &str, layout: DashboardLayoutSummary) {
405 self.layouts.write().insert(id.to_string(), layout);
406 }
407
408 pub fn delete_layout(&self, id: &str) {
409 self.layouts.write().remove(id);
410 }
411}
412
413#[derive(Clone)]
415pub struct ObservabilityState {
416 pub analytics: Arc<ChaosAnalytics>,
417 pub alert_manager: Arc<AlertManager>,
418 pub dashboard: Arc<DashboardManager>,
419 pub scenario_engine: Arc<ScenarioEngine>,
420 pub recorder: Arc<ScenarioRecorder>,
421 pub request_recorder: Option<Arc<Recorder>>,
422 pub replay_engine: Arc<ScenarioReplayEngine>,
423 pub scheduler: Arc<ScenarioScheduler>,
424 pub orchestrator: Arc<ScenarioOrchestrator>,
425 pub layout_manager: Arc<SimpleDashboardLayoutManager>,
426 pub trace_collector: Arc<TraceCollector>,
427}
428
429#[derive(Debug, Serialize)]
431pub struct ApiResponse<T> {
432 pub success: bool,
433 pub data: Option<T>,
434 pub error: Option<String>,
435}
436
437impl<T: Serialize> ApiResponse<T> {
438 pub fn success(data: T) -> Self {
439 Self {
440 success: true,
441 data: Some(data),
442 error: None,
443 }
444 }
445
446 pub fn error(message: String) -> Self {
447 Self {
448 success: false,
449 data: None,
450 error: Some(message),
451 }
452 }
453}
454
455pub fn create_observability_router(state: ObservabilityState) -> Router {
457 Router::new()
458 .route("/api/observability/stats", get(get_stats))
460 .route("/api/observability/alerts", get(get_alerts))
461 .route("/api/observability/ws", get(websocket_handler))
462 .route("/api/observability/traces", get(get_traces))
464 .route("/api/observability/traces/:trace_id/flamegraph", get(get_flamegraph))
465 .route("/api/dashboard/layouts", get(list_dashboard_layouts))
467 .route("/api/dashboard/layouts", post(create_dashboard_layout))
468 .route("/api/dashboard/layouts/:id", get(get_dashboard_layout))
469 .route("/api/dashboard/layouts/:id", post(update_dashboard_layout))
470 .route("/api/dashboard/layouts/:id", axum::routing::delete(delete_dashboard_layout))
471 .route("/api/dashboard/templates", get(get_dashboard_templates))
472 .route("/api/reports/pdf", post(generate_pdf_report))
474 .route("/api/reports/csv", post(generate_csv_report))
475 .route("/api/reports/compare", post(compare_scenarios))
476 .route("/api/chaos/scenarios", get(list_scenarios))
478 .route("/api/chaos/scenarios/:name", post(start_scenario))
479 .route("/api/chaos/status", get(get_chaos_status))
480 .route("/api/chaos/disable", post(disable_chaos))
481 .route("/api/chaos/reset", post(reset_chaos))
482 .route("/api/chaos/recording/start", post(start_recording))
484 .route("/api/chaos/recording/stop", post(stop_recording))
485 .route("/api/chaos/recording/status", get(recording_status))
486 .route("/api/chaos/recording/list", get(list_recordings))
487 .route("/api/chaos/recording/export", post(export_recording))
488 .route("/api/chaos/replay/start", post(start_replay))
490 .route("/api/chaos/replay/stop", post(stop_replay))
491 .route("/api/chaos/replay/status", get(replay_status))
492 .route("/api/recorder/search", post(search_requests))
494 .with_state(state)
495}
496
497async fn get_stats(State(state): State<ObservabilityState>) -> Json<DashboardStats> {
499 let stats = state.dashboard.get_stats();
500 Json(stats)
501}
502
503async fn get_alerts(State(state): State<ObservabilityState>) -> Json<Vec<Alert>> {
505 let alerts = state.alert_manager.get_active_alerts();
506 Json(alerts)
507}
508
509async fn websocket_handler(
511 ws: WebSocketUpgrade,
512 State(state): State<ObservabilityState>,
513) -> Response {
514 ws.on_upgrade(|socket| handle_websocket(socket, state))
515}
516
517async fn handle_websocket(mut socket: axum::extract::ws::WebSocket, state: ObservabilityState) {
518 use axum::extract::ws::Message;
519
520 let mut rx = state.dashboard.subscribe();
521
522 let _stats = state.dashboard.get_stats();
524 let update = DashboardUpdate::Ping {
525 timestamp: chrono::Utc::now(),
526 };
527 if let Ok(json) = serde_json::to_string(&update) {
528 let _ = socket.send(Message::Text(json.into())).await;
529 }
530
531 while let Ok(update) = rx.recv().await {
533 if let Ok(json) = serde_json::to_string(&update) {
534 if socket.send(Message::Text(json.into())).await.is_err() {
535 break;
536 }
537 }
538 }
539}
540
541#[derive(Serialize)]
543struct TracesResponse {
544 traces: Vec<serde_json::Value>,
545}
546
547async fn get_traces(State(state): State<ObservabilityState>) -> Json<TracesResponse> {
548 match state.trace_collector.collect_traces().await {
549 Ok(collected_traces) => {
550 let traces: Vec<serde_json::Value> = collected_traces
551 .into_iter()
552 .map(|trace| {
553 serde_json::json!({
554 "trace_id": trace.trace_id,
555 "span_id": trace.span_id,
556 "parent_span_id": trace.parent_span_id,
557 "name": trace.name,
558 "start_time": trace.start_time,
559 "end_time": trace.end_time,
560 "duration_ms": trace.duration_ms,
561 "attributes": trace.attributes
562 })
563 })
564 .collect();
565
566 Json(TracesResponse { traces })
567 }
568 Err(e) => {
569 tracing::warn!("Failed to collect traces: {}", e);
570 Json(TracesResponse { traces: vec![] })
572 }
573 }
574}
575
576#[derive(Serialize)]
578struct ScenariosResponse {
579 scenarios: Vec<String>,
580}
581
582async fn list_scenarios(State(_state): State<ObservabilityState>) -> Json<ScenariosResponse> {
583 let scenarios = vec![
584 "network_degradation".to_string(),
585 "service_instability".to_string(),
586 "cascading_failure".to_string(),
587 "peak_traffic".to_string(),
588 "slow_backend".to_string(),
589 ];
590
591 Json(ScenariosResponse { scenarios })
592}
593
594async fn start_scenario(
596 State(state): State<ObservabilityState>,
597 axum::extract::Path(name): axum::extract::Path<String>,
598) -> Json<ApiResponse<String>> {
599 if let Some(scenario) = state.scenario_engine.get_scenario(&name) {
600 state.scenario_engine.start_scenario(scenario);
601 tracing::info!("Starting chaos scenario: {}", name);
602 Json(ApiResponse::success(format!("Started scenario: {}", name)))
603 } else {
604 Json(ApiResponse::error(format!("Scenario '{}' not found", name)))
605 }
606}
607
608#[derive(Serialize)]
610struct ChaosStatus {
611 is_enabled: bool,
612 active_scenario: Option<String>,
613 current_config: Option<serde_json::Value>,
614}
615
616async fn get_chaos_status(State(state): State<ObservabilityState>) -> Json<ChaosStatus> {
617 let active_scenarios = state.scenario_engine.get_active_scenarios();
618 let is_enabled = !active_scenarios.is_empty();
619 let active_scenario = active_scenarios.first().map(|s| s.name.clone());
620 let current_config = active_scenarios
621 .first()
622 .map(|s| serde_json::to_value(&s.chaos_config).unwrap_or_default());
623
624 Json(ChaosStatus {
625 is_enabled,
626 active_scenario,
627 current_config,
628 })
629}
630
631async fn disable_chaos() -> Json<ApiResponse<String>> {
633 tracing::info!("Disabling chaos engineering");
634 Json(ApiResponse::success("Chaos disabled".to_string()))
635}
636
637async fn reset_chaos() -> Json<ApiResponse<String>> {
639 tracing::info!("Resetting chaos configuration");
640 Json(ApiResponse::success("Chaos reset".to_string()))
641}
642
643#[derive(Deserialize)]
645struct StartRecordingRequest {
646 scenario_name: String,
647}
648
649async fn start_recording(
650 State(state): State<ObservabilityState>,
651 Json(req): Json<StartRecordingRequest>,
652) -> Json<ApiResponse<String>> {
653 if let Some(scenario) = state.scenario_engine.get_scenario(&req.scenario_name) {
654 match state.recorder.start_recording(scenario.clone()) {
655 Ok(_) => {
656 tracing::info!("Starting recording: {}", req.scenario_name);
657 Json(ApiResponse::success("Recording started".to_string()))
658 }
659 Err(e) => Json(ApiResponse::error(format!("Failed to start recording: {}", e))),
660 }
661 } else {
662 Json(ApiResponse::error(format!("Scenario '{}' not found", req.scenario_name)))
663 }
664}
665
666async fn stop_recording(State(state): State<ObservabilityState>) -> Json<ApiResponse<String>> {
668 match state.recorder.stop_recording() {
669 Ok(recording) => {
670 tracing::info!("Stopping recording: {} events recorded", recording.events.len());
671 Json(ApiResponse::success("Recording stopped".to_string()))
672 }
673 Err(e) => Json(ApiResponse::error(format!("Failed to stop recording: {}", e))),
674 }
675}
676
677#[derive(Serialize)]
679struct RecordingStatus {
680 is_recording: bool,
681 current_scenario: Option<String>,
682 events_count: usize,
683}
684
685async fn recording_status(State(state): State<ObservabilityState>) -> Json<RecordingStatus> {
686 let is_recording = state.recorder.is_recording();
687 let current_scenario = state.recorder.get_current_recording().map(|r| r.scenario.name.clone());
688 let events_count = state.recorder.get_current_recording().map(|r| r.events.len()).unwrap_or(0);
689
690 Json(RecordingStatus {
691 is_recording,
692 current_scenario,
693 events_count,
694 })
695}
696
697#[derive(Serialize)]
699struct RecordingsResponse {
700 scenarios: Vec<RecordingInfo>,
701}
702
703#[derive(Serialize)]
704struct RecordingInfo {
705 name: String,
706 started_at: String,
707 ended_at: Option<String>,
708 total_events: usize,
709 duration_ms: u64,
710}
711
712async fn list_recordings(State(state): State<ObservabilityState>) -> Json<RecordingsResponse> {
713 let recordings = state.recorder.get_recordings();
714 let scenarios = recordings
715 .into_iter()
716 .map(|r| RecordingInfo {
717 name: r.scenario.name,
718 started_at: r.recording_started.to_rfc3339(),
719 ended_at: r.recording_ended.map(|t| t.to_rfc3339()),
720 total_events: r.events.len(),
721 duration_ms: r.total_duration_ms,
722 })
723 .collect();
724
725 Json(RecordingsResponse { scenarios })
726}
727
728#[derive(Deserialize)]
730struct ExportRequest {
731 scenario_name: String,
732 format: String,
733}
734
735async fn export_recording(
736 State(state): State<ObservabilityState>,
737 Json(req): Json<ExportRequest>,
738) -> Response {
739 tracing::info!("Exporting scenario: {} as {}", req.scenario_name, req.format);
740
741 if let Some(recording) = state.recorder.get_recording_by_name(&req.scenario_name) {
742 let filename = format!("{}.{}", req.scenario_name, req.format);
743 let filepath = format!("/tmp/{}", filename);
744
745 let result = match req.format.as_str() {
746 "json" | "yaml" => {
747 recording.save_to_file(&filepath).map(|_| format!("/exports/{}", filename))
748 }
749 _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Unsupported format")),
750 };
751
752 match result {
753 Ok(path) => Json(ApiResponse::<String>::success(path)).into_response(),
754 Err(e) => {
755 Json(ApiResponse::<String>::error(format!("Export failed: {}", e))).into_response()
756 }
757 }
758 } else {
759 Json(ApiResponse::<String>::error(format!(
760 "Recording '{}' not found",
761 req.scenario_name
762 )))
763 .into_response()
764 }
765}
766
767#[derive(Deserialize)]
769struct StartReplayRequest {
770 scenario_name: String,
771 speed: f64,
772}
773
774async fn start_replay(
775 State(_state): State<ObservabilityState>,
776 Json(req): Json<StartReplayRequest>,
777) -> Json<ApiResponse<String>> {
778 tracing::info!("Starting replay: {} at {}x speed", req.scenario_name, req.speed);
779 Json(ApiResponse::success("Replay started".to_string()))
780}
781
782async fn stop_replay() -> Json<ApiResponse<String>> {
784 tracing::info!("Stopping replay");
785 Json(ApiResponse::success("Replay stopped".to_string()))
786}
787
788#[derive(Serialize)]
790struct ReplayStatus {
791 is_playing: bool,
792 scenario_name: Option<String>,
793 progress: f64,
794}
795
796async fn replay_status(State(state): State<ObservabilityState>) -> Json<ReplayStatus> {
797 if let Some(status) = state.replay_engine.get_status() {
798 Json(ReplayStatus {
799 is_playing: true,
800 scenario_name: Some(status.scenario_name),
801 progress: status.progress,
802 })
803 } else {
804 Json(ReplayStatus {
805 is_playing: false,
806 scenario_name: None,
807 progress: 0.0,
808 })
809 }
810}
811
812#[derive(Deserialize)]
814struct SearchRequest {
815 limit: Option<usize>,
816 protocol: Option<String>,
817 method: Option<String>,
818 path: Option<String>,
819 status_code: Option<u16>,
820 trace_id: Option<String>,
821 min_duration_ms: Option<f64>,
822 max_duration_ms: Option<f64>,
823 tags: Option<Vec<String>>,
824}
825
826#[derive(Serialize)]
827struct SearchResponse {
828 requests: Vec<RecordedRequest>,
829}
830
831#[derive(Serialize)]
832struct RecordedRequest {
833 id: i64,
834 timestamp: String,
835 protocol: String,
836 method: String,
837 path: String,
838 status_code: u16,
839 duration_ms: f64,
840 client_ip: Option<String>,
841 request_headers: serde_json::Value,
842 request_body: Option<String>,
843 response_headers: serde_json::Value,
844 response_body: Option<String>,
845}
846
847async fn search_requests(
848 State(state): State<ObservabilityState>,
849 Json(req): Json<SearchRequest>,
850) -> Json<SearchResponse> {
851 let Some(recorder) = &state.request_recorder else {
853 let mock_requests = vec![RecordedRequest {
855 id: 1,
856 timestamp: chrono::Utc::now().to_rfc3339(),
857 protocol: "http".to_string(),
858 method: "GET".to_string(),
859 path: "/api/test".to_string(),
860 status_code: 200,
861 duration_ms: 150.0,
862 client_ip: Some("127.0.0.1".to_string()),
863 request_headers: serde_json::json!({"user-agent": "test"}),
864 request_body: None,
865 response_headers: serde_json::json!({"content-type": "application/json"}),
866 response_body: Some("{\"status\": \"ok\"}".to_string()),
867 }];
868 return Json(SearchResponse {
869 requests: mock_requests,
870 });
871 };
872
873 use mockforge_recorder::query::{execute_query, QueryFilter};
874
875 let filter = QueryFilter {
877 protocol: req.protocol.as_ref().and_then(|p| match p.as_str() {
878 "http" => Some(mockforge_recorder::models::Protocol::Http),
879 "grpc" => Some(mockforge_recorder::models::Protocol::Grpc),
880 "websocket" => Some(mockforge_recorder::models::Protocol::WebSocket),
881 "graphql" => Some(mockforge_recorder::models::Protocol::GraphQL),
882 _ => None,
883 }),
884 method: req.method.clone(),
885 path: req.path.clone(),
886 status_code: req.status_code.map(|s| s as i32),
887 trace_id: req.trace_id.clone(),
888 min_duration_ms: req.min_duration_ms.map(|d| d as i64),
889 max_duration_ms: req.max_duration_ms.map(|d| d as i64),
890 tags: req.tags.clone(),
891 limit: req.limit.map(|l| l as i32),
892 offset: None, };
894
895 match execute_query(recorder.database(), filter).await {
897 Ok(result) => {
898 let requests: Vec<RecordedRequest> = result
900 .exchanges
901 .into_iter()
902 .map(|exchange| RecordedRequest {
903 id: exchange.request.id.parse().unwrap_or(0),
904 timestamp: exchange.request.timestamp.to_rfc3339(),
905 protocol: exchange.request.protocol.as_str().to_string(),
906 method: exchange.request.method,
907 path: exchange.request.path,
908 status_code: exchange.request.status_code.unwrap_or(0) as u16,
909 duration_ms: exchange.request.duration_ms.unwrap_or(0) as f64,
910 client_ip: exchange.request.client_ip,
911 request_headers: serde_json::from_str(&exchange.request.headers)
912 .unwrap_or(serde_json::json!({})),
913 request_body: exchange.request.body,
914 response_headers: exchange
915 .response
916 .as_ref()
917 .and_then(|r| serde_json::from_str(&r.headers).ok())
918 .unwrap_or(serde_json::json!({})),
919 response_body: exchange.response.as_ref().and_then(|r| r.body.clone()),
920 })
921 .collect();
922
923 Json(SearchResponse { requests })
924 }
925 Err(err) => {
926 tracing::error!("Failed to search requests: {}", err);
927 Json(SearchResponse { requests: vec![] })
929 }
930 }
931}
932
933async fn get_flamegraph(
937 State(state): State<ObservabilityState>,
938 axum::extract::Path(trace_id): axum::extract::Path<String>,
939) -> Json<ApiResponse<FlamegraphResponse>> {
940 tracing::info!("Generating flamegraph for trace: {}", trace_id);
941
942 let collected_traces = match state.trace_collector.get_trace_by_id(&trace_id).await {
944 Ok(traces) => traces,
945 Err(e) => {
946 return Json(ApiResponse::error(format!("Failed to retrieve trace data: {}", e)));
947 }
948 };
949
950 if collected_traces.is_empty() {
951 return Json(ApiResponse::error(format!("No trace found with ID: {}", trace_id)));
952 }
953
954 let svg_content = generate_flamegraph_from_trace(&trace_id, &collected_traces);
956 let svg_path = format!("/tmp/flamegraph_{}.svg", trace_id);
957
958 if let Err(e) = std::fs::write(&svg_path, svg_content) {
960 return Json(ApiResponse::error(format!("Failed to generate flamegraph: {}", e)));
961 }
962
963 let total_spans = collected_traces.len();
965 let max_depth = calculate_max_depth(&collected_traces);
966 let total_duration_us =
967 collected_traces.iter().map(|t| t.duration_ms * 1000).max().unwrap_or(0);
968 let hottest_path = find_hottest_path(&collected_traces);
969
970 let stats = FlamegraphStatsResponse {
971 total_spans,
972 max_depth,
973 total_duration_us,
974 hottest_path,
975 };
976
977 Json(ApiResponse::success(FlamegraphResponse {
978 trace_id: trace_id.clone(),
979 svg_url: format!("/flamegraphs/{}.svg", trace_id),
980 stats,
981 }))
982}
983
984#[derive(Serialize)]
985struct FlamegraphResponse {
986 trace_id: String,
987 svg_url: String,
988 stats: FlamegraphStatsResponse,
989}
990
991#[derive(Serialize)]
992struct FlamegraphStatsResponse {
993 total_spans: usize,
994 max_depth: usize,
995 total_duration_us: u64,
996 hottest_path: Vec<String>,
997}
998
999async fn list_dashboard_layouts(
1001 State(state): State<ObservabilityState>,
1002) -> Json<ApiResponse<Vec<DashboardLayoutSummary>>> {
1003 tracing::info!("Listing dashboard layouts");
1004 let layouts = state.layout_manager.list_layouts();
1005 Json(ApiResponse::success(layouts))
1006}
1007
1008#[derive(Serialize, Clone)]
1009pub struct DashboardLayoutSummary {
1010 id: String,
1011 name: String,
1012 description: Option<String>,
1013 widget_count: usize,
1014}
1015
1016#[derive(Deserialize)]
1018struct CreateDashboardLayoutRequest {
1019 name: String,
1020 description: Option<String>,
1021 layout_data: serde_json::Value,
1022}
1023
1024async fn create_dashboard_layout(
1026 State(state): State<ObservabilityState>,
1027 Json(req): Json<CreateDashboardLayoutRequest>,
1028) -> Json<ApiResponse<String>> {
1029 tracing::info!("Creating dashboard layout: {}", req.name);
1030 let id = format!("layout-{}", chrono::Utc::now().timestamp());
1031 let widget_count = req.layout_data.as_array().map(|a| a.len()).unwrap_or(0);
1032 let layout = DashboardLayoutSummary {
1033 id: id.clone(),
1034 name: req.name,
1035 description: req.description,
1036 widget_count,
1037 };
1038 state.layout_manager.create_layout(layout);
1039 Json(ApiResponse::success(id))
1040}
1041
1042async fn get_dashboard_layout(
1044 State(state): State<ObservabilityState>,
1045 axum::extract::Path(id): axum::extract::Path<String>,
1046) -> Json<ApiResponse<serde_json::Value>> {
1047 tracing::info!("Getting dashboard layout: {}", id);
1048 if let Some(layout) = state.layout_manager.get_layout(&id) {
1049 Json(ApiResponse::success(serde_json::json!({
1050 "id": layout.id,
1051 "name": layout.name,
1052 "description": layout.description,
1053 "widget_count": layout.widget_count
1054 })))
1055 } else {
1056 Json(ApiResponse::error(format!("Layout '{}' not found", id)))
1057 }
1058}
1059
1060async fn update_dashboard_layout(
1062 State(state): State<ObservabilityState>,
1063 axum::extract::Path(id): axum::extract::Path<String>,
1064 Json(req): Json<CreateDashboardLayoutRequest>,
1065) -> Json<ApiResponse<String>> {
1066 tracing::info!("Updating dashboard layout: {}", id);
1067 let widget_count = req.layout_data.as_array().map(|a| a.len()).unwrap_or(0);
1068 let layout = DashboardLayoutSummary {
1069 id: id.clone(),
1070 name: req.name,
1071 description: req.description,
1072 widget_count,
1073 };
1074 state.layout_manager.update_layout(&id, layout);
1075 Json(ApiResponse::success("Updated".to_string()))
1076}
1077
1078async fn delete_dashboard_layout(
1080 State(state): State<ObservabilityState>,
1081 axum::extract::Path(id): axum::extract::Path<String>,
1082) -> Json<ApiResponse<String>> {
1083 tracing::info!("Deleting dashboard layout: {}", id);
1084 state.layout_manager.delete_layout(&id);
1085 Json(ApiResponse::success("Deleted".to_string()))
1086}
1087
1088async fn get_dashboard_templates(
1090 State(_state): State<ObservabilityState>,
1091) -> Json<ApiResponse<Vec<DashboardLayoutSummary>>> {
1092 tracing::info!("Getting dashboard templates");
1093 Json(ApiResponse::success(vec![
1095 DashboardLayoutSummary {
1096 id: "template-chaos-overview".to_string(),
1097 name: "Chaos Engineering Overview".to_string(),
1098 description: Some("Pre-built chaos engineering dashboard".to_string()),
1099 widget_count: 3,
1100 },
1101 DashboardLayoutSummary {
1102 id: "template-resilience".to_string(),
1103 name: "Resilience Testing".to_string(),
1104 description: Some("Monitor resilience patterns".to_string()),
1105 widget_count: 2,
1106 },
1107 ]))
1108}
1109
1110#[derive(Deserialize)]
1112struct GeneratePdfRequest {
1113 scenario_name: String,
1114 include_charts: bool,
1115}
1116
1117fn generate_scenario_pdf(
1119 scenario_name: &str,
1120 include_charts: bool,
1121 output_path: &str,
1122) -> Result<(), Box<dyn std::error::Error>> {
1123 let (doc, page1, layer1) = PdfDocument::new(
1124 format!("Chaos Engineering Report - {}", scenario_name),
1125 Mm(210.0), Mm(297.0), "Layer 1",
1128 );
1129
1130 let font = doc.add_builtin_font(BuiltinFont::Helvetica)?;
1131 let font_bold = doc.add_builtin_font(BuiltinFont::HelveticaBold)?;
1132
1133 let current_layer = doc.get_page(page1).get_layer(layer1);
1134
1135 current_layer.use_text(
1137 "Chaos Engineering Report".to_string(),
1138 20.0,
1139 Mm(20.0),
1140 Mm(270.0),
1141 &font_bold,
1142 );
1143
1144 current_layer.use_text(
1146 format!("Scenario: {}", scenario_name),
1147 14.0,
1148 Mm(20.0),
1149 Mm(250.0),
1150 &font,
1151 );
1152
1153 let now = chrono::Utc::now();
1155 current_layer.use_text(
1156 format!("Generated: {}", now.format("%Y-%m-%d %H:%M:%S UTC")),
1157 10.0,
1158 Mm(20.0),
1159 Mm(235.0),
1160 &font,
1161 );
1162
1163 current_layer.use_text("Summary", 14.0, Mm(20.0), Mm(210.0), &font_bold);
1165
1166 let mut y = 190.0;
1167 let metrics = [
1168 ("Total Requests", "1000"),
1169 ("Success Rate", "95.2%"),
1170 ("Average Latency", "150ms"),
1171 ("Error Rate", "4.8%"),
1172 ];
1173
1174 for (label, value) in &metrics {
1175 current_layer.use_text(format!("{}: {}", label, value), 10.0, Mm(20.0), Mm(y), &font);
1176 y -= 10.0;
1177 }
1178
1179 if include_charts {
1181 y -= 20.0;
1182 current_layer.use_text("Charts", 14.0, Mm(20.0), Mm(y), &font_bold);
1183 y -= 15.0;
1184 current_layer.use_text(
1185 "[Chart placeholder - would include actual charts in full implementation]",
1186 10.0,
1187 Mm(20.0),
1188 Mm(y),
1189 &font,
1190 );
1191 }
1192
1193 use std::io::BufWriter;
1195 doc.save(&mut BufWriter::new(std::fs::File::create(output_path)?))?;
1196
1197 Ok(())
1198}
1199
1200async fn generate_pdf_report(
1201 State(_state): State<ObservabilityState>,
1202 Json(req): Json<GeneratePdfRequest>,
1203) -> Response {
1204 tracing::info!("Generating PDF report for: {}", req.scenario_name);
1205
1206 let pdf_path = format!("/tmp/report_{}.pdf", req.scenario_name);
1207
1208 if let Err(e) = generate_scenario_pdf(&req.scenario_name, req.include_charts, &pdf_path) {
1210 return Json(ApiResponse::<String>::error(format!("Failed to generate PDF: {}", e)))
1211 .into_response();
1212 }
1213
1214 Json(ApiResponse::success(format!("/reports/{}.pdf", req.scenario_name))).into_response()
1215}
1216
1217#[derive(Deserialize)]
1219struct GenerateCsvRequest {
1220 scenario_names: Vec<String>,
1221 include_comparison: bool,
1222}
1223
1224async fn generate_csv_report(
1225 State(_state): State<ObservabilityState>,
1226 Json(req): Json<GenerateCsvRequest>,
1227) -> Response {
1228 tracing::info!("Generating CSV report for: {:?}", req.scenario_names);
1229
1230 let csv_content = generate_csv_content(&req.scenario_names, req.include_comparison);
1231 let csv_path = "/tmp/scenarios_report.csv";
1232
1233 if let Err(e) = std::fs::write(csv_path, csv_content) {
1234 return Json(ApiResponse::<String>::error(format!("Failed to generate CSV: {}", e)))
1235 .into_response();
1236 }
1237
1238 Json(ApiResponse::success("/reports/scenarios.csv".to_string())).into_response()
1239}
1240
1241#[derive(Deserialize)]
1243struct CompareRequest {
1244 baseline_scenario: String,
1245 comparison_scenarios: Vec<String>,
1246}
1247
1248async fn compare_scenarios(
1249 State(_state): State<ObservabilityState>,
1250 Json(req): Json<CompareRequest>,
1251) -> Json<ApiResponse<ComparisonResult>> {
1252 tracing::info!(
1253 "Comparing scenarios - baseline: {}, comparisons: {:?}",
1254 req.baseline_scenario,
1255 req.comparison_scenarios
1256 );
1257
1258 let comparison = perform_scenario_comparison(&req.baseline_scenario, &req.comparison_scenarios);
1259
1260 Json(ApiResponse::success(comparison))
1261}
1262
1263#[derive(Serialize)]
1264struct ComparisonResult {
1265 baseline: String,
1266 comparisons: Vec<String>,
1267 regressions_count: usize,
1268 improvements_count: usize,
1269 verdict: String,
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274 use super::*;
1275
1276 #[test]
1277 fn test_api_response_success() {
1278 let response = ApiResponse::success("test");
1279 assert!(response.success);
1280 assert_eq!(response.data, Some("test"));
1281 assert!(response.error.is_none());
1282 }
1283
1284 #[test]
1285 fn test_api_response_error() {
1286 let response: ApiResponse<String> = ApiResponse::error("error".to_string());
1287 assert!(!response.success);
1288 assert!(response.data.is_none());
1289 assert_eq!(response.error, Some("error".to_string()));
1290 }
1291}