mockforge_chaos/
observability_api.rs

1//! Observability API endpoints for the Admin UI
2//!
3//! This module provides REST API endpoints for the Admin UI to interact with
4//! chaos engineering features, including metrics, alerts, traces, and scenarios.
5
6use axum::{
7    extract::{State, WebSocketUpgrade},
8    response::{IntoResponse, Response},
9    routing::{get, post},
10    Json, Router,
11};
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15use crate::{
16    alerts::{Alert, AlertManager},
17    analytics::ChaosAnalytics,
18    dashboard::{DashboardManager, DashboardStats, DashboardUpdate},
19    scenario_orchestrator::ScenarioOrchestrator,
20    scenario_recorder::ScenarioRecorder,
21    scenario_replay::ScenarioReplayEngine,
22    scenario_scheduler::ScenarioScheduler,
23    scenarios::ScenarioEngine,
24    trace_collector::TraceCollector,
25};
26use mockforge_recorder::Recorder;
27use parking_lot::RwLock;
28use printpdf::*;
29use std::collections::HashMap;
30
31/// Generate flamegraph SVG from actual trace data
32fn generate_flamegraph_from_trace(
33    trace_id: &str,
34    traces: &[crate::trace_collector::CollectedTrace],
35) -> String {
36    use std::collections::HashMap;
37
38    let width = 1200;
39    let height = 600;
40    let bar_height = 20;
41    let mut y_offset = 60;
42
43    let mut svg = format!(
44        r#"<svg width="{}" height="{}" xmlns="http://www.w3.org/2000/svg">
45        <rect width="100%" height="100%" fill="white"/>
46        <text x="10" y="20" font-family="monospace" font-size="12">Flamegraph for trace: {}</text>
47        <text x="10" y="35" font-family="monospace" font-size="10">Total spans: {}</text>"#,
48        width,
49        height,
50        trace_id,
51        traces.len()
52    );
53
54    // Build span hierarchy
55    let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
56    for trace in traces {
57        span_map.insert(trace.span_id.clone(), trace);
58    }
59
60    // Find root spans
61    let mut root_spans = Vec::new();
62    for trace in traces {
63        if trace.parent_span_id.is_none() {
64            root_spans.push(trace);
65        }
66    }
67
68    // Sort root spans by start time
69    root_spans.sort_by_key(|s| s.start_time.clone());
70
71    // Calculate total time range
72    let min_start = traces
73        .iter()
74        .map(|t| {
75            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&t.start_time) {
76                dt.timestamp_micros() as u64
77            } else {
78                0
79            }
80        })
81        .min()
82        .unwrap_or(0);
83
84    let max_end = traces
85        .iter()
86        .map(|t| {
87            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&t.start_time) {
88                dt.timestamp_micros() as u64 + t.duration_ms * 1000
89            } else {
90                t.duration_ms * 1000
91            }
92        })
93        .max()
94        .unwrap_or(1000000);
95
96    let total_duration = max_end.saturating_sub(min_start);
97
98    // Render spans level by level
99    let mut current_level = root_spans;
100    let mut level = 0;
101
102    while !current_level.is_empty() && y_offset + bar_height < height {
103        let mut next_level = Vec::new();
104
105        for span in &current_level {
106            // Calculate position and width
107            let start_us = if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&span.start_time) {
108                dt.timestamp_micros() as u64
109            } else {
110                span.start_time.parse().unwrap_or(0)
111            };
112
113            let x = ((start_us.saturating_sub(min_start)) as f64 / total_duration as f64
114                * (width - 40) as f64) as u32
115                + 20;
116            let bar_width = ((span.duration_ms * 1000) as f64 / total_duration as f64
117                * (width - 40) as f64) as u32;
118
119            if bar_width > 0 {
120                let color = format!("#{:x}", (level * 50 + 100) % 256);
121                svg.push_str(&format!(
122                    r#"<rect x="{}" y="{}" width="{}" height="{}" fill="{}" stroke="black" stroke-width="1"/>
123                    <text x="{}" y="{}" font-family="monospace" font-size="10" fill="white">{}</text>"#,
124                    x, y_offset, bar_width, bar_height, color,
125                    x + 2, y_offset + 12, span.name
126                ));
127            }
128
129            // Find children
130            for trace in traces {
131                if trace.parent_span_id.as_ref() == Some(&span.span_id) {
132                    next_level.push(trace);
133                }
134            }
135        }
136
137        // Sort next level by start time
138        next_level.sort_by_key(|s| s.start_time.clone());
139
140        current_level = next_level;
141        y_offset += bar_height + 2;
142        level += 1;
143    }
144
145    svg.push_str("</svg>");
146    svg
147}
148
149/// Calculate the maximum depth of the trace hierarchy
150fn calculate_max_depth(traces: &[crate::trace_collector::CollectedTrace]) -> usize {
151    use std::collections::HashMap;
152
153    let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
154    let mut depth_map: HashMap<String, usize> = HashMap::new();
155
156    // Index spans by span_id
157    for trace in traces {
158        span_map.insert(trace.span_id.clone(), trace);
159    }
160
161    // Calculate depth for each span
162    for trace in traces {
163        calculate_span_depth(&trace.span_id, &span_map, &mut depth_map);
164    }
165
166    depth_map.values().cloned().max().unwrap_or(0)
167}
168
169/// Recursively calculate depth for a span
170fn calculate_span_depth(
171    span_id: &str,
172    span_map: &HashMap<String, &crate::trace_collector::CollectedTrace>,
173    depth_map: &mut HashMap<String, usize>,
174) -> usize {
175    if let Some(&depth) = depth_map.get(span_id) {
176        return depth;
177    }
178
179    let span = match span_map.get(span_id) {
180        Some(s) => s,
181        None => return 0,
182    };
183
184    let depth = if let Some(ref parent_id) = span.parent_span_id {
185        calculate_span_depth(parent_id, span_map, depth_map) + 1
186    } else {
187        0
188    };
189
190    depth_map.insert(span_id.to_string(), depth);
191    depth
192}
193
194/// Find the hottest path (longest duration path) in the trace
195fn find_hottest_path(traces: &[crate::trace_collector::CollectedTrace]) -> Vec<String> {
196    use std::collections::HashMap;
197
198    if traces.is_empty() {
199        return Vec::new();
200    }
201
202    let mut span_map: HashMap<String, &crate::trace_collector::CollectedTrace> = HashMap::new();
203
204    // Index spans by span_id
205    for trace in traces {
206        span_map.insert(trace.span_id.clone(), trace);
207    }
208
209    // Find root spans
210    let mut root_spans = Vec::new();
211    for trace in traces {
212        if trace.parent_span_id.is_none() {
213            root_spans.push(trace);
214        }
215    }
216
217    if root_spans.is_empty() {
218        return Vec::new();
219    }
220
221    // For simplicity, return the path from the first root span
222    // In a real implementation, you'd find the path with maximum total duration
223    let mut path = Vec::new();
224    let mut current = root_spans[0];
225
226    loop {
227        path.push(current.name.clone());
228        let mut found_child = false;
229
230        // Find a child span (simplified - just pick the first one)
231        for trace in traces {
232            if trace.parent_span_id.as_ref() == Some(&current.span_id) {
233                current = trace;
234                found_child = true;
235                break;
236            }
237        }
238
239        if !found_child {
240            break;
241        }
242    }
243
244    path
245}
246
247/// Generate basic HTML content for PDF report
248/// Generate CSV content for scenario comparison
249fn generate_csv_content(scenario_names: &[String], include_comparison: bool) -> String {
250    let mut csv =
251        String::from("Scenario,Total Requests,Success Rate,Avg Latency (ms),Error Rate\n");
252
253    for scenario in scenario_names {
254        // Mock data - in real implementation, would fetch actual metrics
255        let (requests, success_rate, avg_latency, _error_rate) = match scenario.as_str() {
256            "network_degradation" => (1000, 92.5, 250.0, 7.5),
257            "service_instability" => (800, 88.0, 180.0, 12.0),
258            "cascading_failure" => (1200, 85.0, 320.0, 15.0),
259            _ => (1000, 95.0, 150.0, 5.0),
260        };
261
262        csv.push_str(&format!(
263            "{},{},{:.1},{:.1},{:.1}\n",
264            scenario,
265            requests,
266            success_rate,
267            avg_latency,
268            100.0 - success_rate
269        ));
270    }
271
272    if include_comparison && scenario_names.len() > 1 {
273        csv.push_str("\nComparison Summary\n");
274        csv.push_str("Best Success Rate,network_degradation\n");
275        csv.push_str("Worst Latency,service_instability\n");
276        csv.push_str("Highest Error Rate,cascading_failure\n");
277    }
278
279    csv
280}
281
282/// Perform basic scenario comparison
283fn perform_scenario_comparison(baseline: &str, comparisons: &[String]) -> ComparisonResult {
284    // Mock comparison logic - in real implementation, would analyze actual metrics
285    let baseline_metrics = get_scenario_metrics(baseline);
286    let mut regressions = 0;
287    let mut improvements = 0;
288
289    for scenario in comparisons {
290        let metrics = get_scenario_metrics(scenario);
291
292        // Compare success rates
293        if metrics.success_rate < baseline_metrics.success_rate {
294            regressions += 1;
295        } else if metrics.success_rate > baseline_metrics.success_rate {
296            improvements += 1;
297        }
298
299        // Compare latencies
300        if metrics.avg_latency > baseline_metrics.avg_latency {
301            regressions += 1;
302        } else if metrics.avg_latency < baseline_metrics.avg_latency {
303            improvements += 1;
304        }
305    }
306
307    let verdict = if regressions > improvements {
308        "worse".to_string()
309    } else if improvements > regressions {
310        "better".to_string()
311    } else {
312        "similar".to_string()
313    };
314
315    ComparisonResult {
316        baseline: baseline.to_string(),
317        comparisons: comparisons.to_vec(),
318        regressions_count: regressions,
319        improvements_count: improvements,
320        verdict,
321    }
322}
323
324/// Mock scenario metrics
325struct ScenarioMetrics {
326    success_rate: f64,
327    avg_latency: f64,
328}
329
330fn get_scenario_metrics(scenario: &str) -> ScenarioMetrics {
331    match scenario {
332        "network_degradation" => ScenarioMetrics {
333            success_rate: 92.5,
334            avg_latency: 250.0,
335        },
336        "service_instability" => ScenarioMetrics {
337            success_rate: 88.0,
338            avg_latency: 180.0,
339        },
340        "cascading_failure" => ScenarioMetrics {
341            success_rate: 85.0,
342            avg_latency: 320.0,
343        },
344        _ => ScenarioMetrics {
345            success_rate: 95.0,
346            avg_latency: 150.0,
347        },
348    }
349}
350
351/// Simple in-memory dashboard layout manager
352#[derive(Clone)]
353pub struct SimpleDashboardLayoutManager {
354    layouts: Arc<RwLock<HashMap<String, DashboardLayoutSummary>>>,
355}
356
357impl SimpleDashboardLayoutManager {
358    pub fn new() -> Self {
359        let mut layouts = HashMap::new();
360        layouts.insert(
361            "chaos-overview".to_string(),
362            DashboardLayoutSummary {
363                id: "chaos-overview".to_string(),
364                name: "Chaos Engineering Overview".to_string(),
365                description: Some("Real-time overview of chaos engineering activities".to_string()),
366                widget_count: 3,
367            },
368        );
369        layouts.insert(
370            "service-perf".to_string(),
371            DashboardLayoutSummary {
372                id: "service-perf".to_string(),
373                name: "Service Performance".to_string(),
374                description: Some("Detailed service performance metrics".to_string()),
375                widget_count: 2,
376            },
377        );
378
379        Self {
380            layouts: Arc::new(RwLock::new(layouts)),
381        }
382    }
383}
384
385impl Default for SimpleDashboardLayoutManager {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391impl SimpleDashboardLayoutManager {
392    pub fn list_layouts(&self) -> Vec<DashboardLayoutSummary> {
393        self.layouts.read().values().cloned().collect()
394    }
395
396    pub fn get_layout(&self, id: &str) -> Option<DashboardLayoutSummary> {
397        self.layouts.read().get(id).cloned()
398    }
399
400    pub fn create_layout(&self, layout: DashboardLayoutSummary) {
401        self.layouts.write().insert(layout.id.clone(), layout);
402    }
403
404    pub fn update_layout(&self, id: &str, layout: DashboardLayoutSummary) {
405        self.layouts.write().insert(id.to_string(), layout);
406    }
407
408    pub fn delete_layout(&self, id: &str) {
409        self.layouts.write().remove(id);
410    }
411}
412
413/// Observability API state
414#[derive(Clone)]
415pub struct ObservabilityState {
416    pub analytics: Arc<ChaosAnalytics>,
417    pub alert_manager: Arc<AlertManager>,
418    pub dashboard: Arc<DashboardManager>,
419    pub scenario_engine: Arc<ScenarioEngine>,
420    pub recorder: Arc<ScenarioRecorder>,
421    pub request_recorder: Option<Arc<Recorder>>,
422    pub replay_engine: Arc<ScenarioReplayEngine>,
423    pub scheduler: Arc<ScenarioScheduler>,
424    pub orchestrator: Arc<ScenarioOrchestrator>,
425    pub layout_manager: Arc<SimpleDashboardLayoutManager>,
426    pub trace_collector: Arc<TraceCollector>,
427}
428
429/// Response wrapper
430#[derive(Debug, Serialize)]
431pub struct ApiResponse<T> {
432    pub success: bool,
433    pub data: Option<T>,
434    pub error: Option<String>,
435}
436
437impl<T: Serialize> ApiResponse<T> {
438    pub fn success(data: T) -> Self {
439        Self {
440            success: true,
441            data: Some(data),
442            error: None,
443        }
444    }
445
446    pub fn error(message: String) -> Self {
447        Self {
448            success: false,
449            data: None,
450            error: Some(message),
451        }
452    }
453}
454
455/// Create observability API router
456pub fn create_observability_router(state: ObservabilityState) -> Router {
457    Router::new()
458        // Dashboard stats
459        .route("/api/observability/stats", get(get_stats))
460        .route("/api/observability/alerts", get(get_alerts))
461        .route("/api/observability/ws", get(websocket_handler))
462        // Traces
463        .route("/api/observability/traces", get(get_traces))
464        .route("/api/observability/traces/:trace_id/flamegraph", get(get_flamegraph))
465        // Dashboard layouts
466        .route("/api/dashboard/layouts", get(list_dashboard_layouts))
467        .route("/api/dashboard/layouts", post(create_dashboard_layout))
468        .route("/api/dashboard/layouts/:id", get(get_dashboard_layout))
469        .route("/api/dashboard/layouts/:id", post(update_dashboard_layout))
470        .route("/api/dashboard/layouts/:id", axum::routing::delete(delete_dashboard_layout))
471        .route("/api/dashboard/templates", get(get_dashboard_templates))
472        // Reports and exports
473        .route("/api/reports/pdf", post(generate_pdf_report))
474        .route("/api/reports/csv", post(generate_csv_report))
475        .route("/api/reports/compare", post(compare_scenarios))
476        // Chaos scenarios
477        .route("/api/chaos/scenarios", get(list_scenarios))
478        .route("/api/chaos/scenarios/:name", post(start_scenario))
479        .route("/api/chaos/status", get(get_chaos_status))
480        .route("/api/chaos/disable", post(disable_chaos))
481        .route("/api/chaos/reset", post(reset_chaos))
482        // Recording
483        .route("/api/chaos/recording/start", post(start_recording))
484        .route("/api/chaos/recording/stop", post(stop_recording))
485        .route("/api/chaos/recording/status", get(recording_status))
486        .route("/api/chaos/recording/list", get(list_recordings))
487        .route("/api/chaos/recording/export", post(export_recording))
488        // Replay
489        .route("/api/chaos/replay/start", post(start_replay))
490        .route("/api/chaos/replay/stop", post(stop_replay))
491        .route("/api/chaos/replay/status", get(replay_status))
492        // Recorder search
493        .route("/api/recorder/search", post(search_requests))
494        .with_state(state)
495}
496
497/// Get dashboard statistics
498async fn get_stats(State(state): State<ObservabilityState>) -> Json<DashboardStats> {
499    let stats = state.dashboard.get_stats();
500    Json(stats)
501}
502
503/// Get active alerts
504async fn get_alerts(State(state): State<ObservabilityState>) -> Json<Vec<Alert>> {
505    let alerts = state.alert_manager.get_active_alerts();
506    Json(alerts)
507}
508
509/// WebSocket handler for real-time updates
510async fn websocket_handler(
511    ws: WebSocketUpgrade,
512    State(state): State<ObservabilityState>,
513) -> Response {
514    ws.on_upgrade(|socket| handle_websocket(socket, state))
515}
516
517async fn handle_websocket(mut socket: axum::extract::ws::WebSocket, state: ObservabilityState) {
518    use axum::extract::ws::Message;
519
520    let mut rx = state.dashboard.subscribe();
521
522    // Send initial stats
523    let _stats = state.dashboard.get_stats();
524    let update = DashboardUpdate::Ping {
525        timestamp: chrono::Utc::now(),
526    };
527    if let Ok(json) = serde_json::to_string(&update) {
528        let _ = socket.send(Message::Text(json.into())).await;
529    }
530
531    // Stream updates
532    while let Ok(update) = rx.recv().await {
533        if let Ok(json) = serde_json::to_string(&update) {
534            if socket.send(Message::Text(json.into())).await.is_err() {
535                break;
536            }
537        }
538    }
539}
540
541/// Get traces from OpenTelemetry backend
542#[derive(Serialize)]
543struct TracesResponse {
544    traces: Vec<serde_json::Value>,
545}
546
547async fn get_traces(State(state): State<ObservabilityState>) -> Json<TracesResponse> {
548    match state.trace_collector.collect_traces().await {
549        Ok(collected_traces) => {
550            let traces: Vec<serde_json::Value> = collected_traces
551                .into_iter()
552                .map(|trace| {
553                    serde_json::json!({
554                        "trace_id": trace.trace_id,
555                        "span_id": trace.span_id,
556                        "parent_span_id": trace.parent_span_id,
557                        "name": trace.name,
558                        "start_time": trace.start_time,
559                        "end_time": trace.end_time,
560                        "duration_ms": trace.duration_ms,
561                        "attributes": trace.attributes
562                    })
563                })
564                .collect();
565
566            Json(TracesResponse { traces })
567        }
568        Err(e) => {
569            tracing::warn!("Failed to collect traces: {}", e);
570            // Return empty traces on error rather than failing the request
571            Json(TracesResponse { traces: vec![] })
572        }
573    }
574}
575
576/// List available chaos scenarios
577#[derive(Serialize)]
578struct ScenariosResponse {
579    scenarios: Vec<String>,
580}
581
582async fn list_scenarios(State(_state): State<ObservabilityState>) -> Json<ScenariosResponse> {
583    let scenarios = vec![
584        "network_degradation".to_string(),
585        "service_instability".to_string(),
586        "cascading_failure".to_string(),
587        "peak_traffic".to_string(),
588        "slow_backend".to_string(),
589    ];
590
591    Json(ScenariosResponse { scenarios })
592}
593
594/// Start a chaos scenario
595async fn start_scenario(
596    State(state): State<ObservabilityState>,
597    axum::extract::Path(name): axum::extract::Path<String>,
598) -> Json<ApiResponse<String>> {
599    if let Some(scenario) = state.scenario_engine.get_scenario(&name) {
600        state.scenario_engine.start_scenario(scenario);
601        tracing::info!("Starting chaos scenario: {}", name);
602        Json(ApiResponse::success(format!("Started scenario: {}", name)))
603    } else {
604        Json(ApiResponse::error(format!("Scenario '{}' not found", name)))
605    }
606}
607
608/// Get chaos status
609#[derive(Serialize)]
610struct ChaosStatus {
611    is_enabled: bool,
612    active_scenario: Option<String>,
613    current_config: Option<serde_json::Value>,
614}
615
616async fn get_chaos_status(State(state): State<ObservabilityState>) -> Json<ChaosStatus> {
617    let active_scenarios = state.scenario_engine.get_active_scenarios();
618    let is_enabled = !active_scenarios.is_empty();
619    let active_scenario = active_scenarios.first().map(|s| s.name.clone());
620    let current_config = active_scenarios
621        .first()
622        .map(|s| serde_json::to_value(&s.chaos_config).unwrap_or_default());
623
624    Json(ChaosStatus {
625        is_enabled,
626        active_scenario,
627        current_config,
628    })
629}
630
631/// Disable chaos
632async fn disable_chaos() -> Json<ApiResponse<String>> {
633    tracing::info!("Disabling chaos engineering");
634    Json(ApiResponse::success("Chaos disabled".to_string()))
635}
636
637/// Reset chaos configuration
638async fn reset_chaos() -> Json<ApiResponse<String>> {
639    tracing::info!("Resetting chaos configuration");
640    Json(ApiResponse::success("Chaos reset".to_string()))
641}
642
643/// Start recording
644#[derive(Deserialize)]
645struct StartRecordingRequest {
646    scenario_name: String,
647}
648
649async fn start_recording(
650    State(state): State<ObservabilityState>,
651    Json(req): Json<StartRecordingRequest>,
652) -> Json<ApiResponse<String>> {
653    if let Some(scenario) = state.scenario_engine.get_scenario(&req.scenario_name) {
654        match state.recorder.start_recording(scenario.clone()) {
655            Ok(_) => {
656                tracing::info!("Starting recording: {}", req.scenario_name);
657                Json(ApiResponse::success("Recording started".to_string()))
658            }
659            Err(e) => Json(ApiResponse::error(format!("Failed to start recording: {}", e))),
660        }
661    } else {
662        Json(ApiResponse::error(format!("Scenario '{}' not found", req.scenario_name)))
663    }
664}
665
666/// Stop recording
667async fn stop_recording(State(state): State<ObservabilityState>) -> Json<ApiResponse<String>> {
668    match state.recorder.stop_recording() {
669        Ok(recording) => {
670            tracing::info!("Stopping recording: {} events recorded", recording.events.len());
671            Json(ApiResponse::success("Recording stopped".to_string()))
672        }
673        Err(e) => Json(ApiResponse::error(format!("Failed to stop recording: {}", e))),
674    }
675}
676
677/// Recording status
678#[derive(Serialize)]
679struct RecordingStatus {
680    is_recording: bool,
681    current_scenario: Option<String>,
682    events_count: usize,
683}
684
685async fn recording_status(State(state): State<ObservabilityState>) -> Json<RecordingStatus> {
686    let is_recording = state.recorder.is_recording();
687    let current_scenario = state.recorder.get_current_recording().map(|r| r.scenario.name.clone());
688    let events_count = state.recorder.get_current_recording().map(|r| r.events.len()).unwrap_or(0);
689
690    Json(RecordingStatus {
691        is_recording,
692        current_scenario,
693        events_count,
694    })
695}
696
697/// List recordings
698#[derive(Serialize)]
699struct RecordingsResponse {
700    scenarios: Vec<RecordingInfo>,
701}
702
703#[derive(Serialize)]
704struct RecordingInfo {
705    name: String,
706    started_at: String,
707    ended_at: Option<String>,
708    total_events: usize,
709    duration_ms: u64,
710}
711
712async fn list_recordings(State(state): State<ObservabilityState>) -> Json<RecordingsResponse> {
713    let recordings = state.recorder.get_recordings();
714    let scenarios = recordings
715        .into_iter()
716        .map(|r| RecordingInfo {
717            name: r.scenario.name,
718            started_at: r.recording_started.to_rfc3339(),
719            ended_at: r.recording_ended.map(|t| t.to_rfc3339()),
720            total_events: r.events.len(),
721            duration_ms: r.total_duration_ms,
722        })
723        .collect();
724
725    Json(RecordingsResponse { scenarios })
726}
727
728/// Export recording
729#[derive(Deserialize)]
730struct ExportRequest {
731    scenario_name: String,
732    format: String,
733}
734
735async fn export_recording(
736    State(state): State<ObservabilityState>,
737    Json(req): Json<ExportRequest>,
738) -> Response {
739    tracing::info!("Exporting scenario: {} as {}", req.scenario_name, req.format);
740
741    if let Some(recording) = state.recorder.get_recording_by_name(&req.scenario_name) {
742        let filename = format!("{}.{}", req.scenario_name, req.format);
743        let filepath = format!("/tmp/{}", filename);
744
745        let result = match req.format.as_str() {
746            "json" | "yaml" => {
747                recording.save_to_file(&filepath).map(|_| format!("/exports/{}", filename))
748            }
749            _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Unsupported format")),
750        };
751
752        match result {
753            Ok(path) => Json(ApiResponse::<String>::success(path)).into_response(),
754            Err(e) => {
755                Json(ApiResponse::<String>::error(format!("Export failed: {}", e))).into_response()
756            }
757        }
758    } else {
759        Json(ApiResponse::<String>::error(format!(
760            "Recording '{}' not found",
761            req.scenario_name
762        )))
763        .into_response()
764    }
765}
766
767/// Start replay
768#[derive(Deserialize)]
769struct StartReplayRequest {
770    scenario_name: String,
771    speed: f64,
772}
773
774async fn start_replay(
775    State(_state): State<ObservabilityState>,
776    Json(req): Json<StartReplayRequest>,
777) -> Json<ApiResponse<String>> {
778    tracing::info!("Starting replay: {} at {}x speed", req.scenario_name, req.speed);
779    Json(ApiResponse::success("Replay started".to_string()))
780}
781
782/// Stop replay
783async fn stop_replay() -> Json<ApiResponse<String>> {
784    tracing::info!("Stopping replay");
785    Json(ApiResponse::success("Replay stopped".to_string()))
786}
787
788/// Replay status
789#[derive(Serialize)]
790struct ReplayStatus {
791    is_playing: bool,
792    scenario_name: Option<String>,
793    progress: f64,
794}
795
796async fn replay_status(State(state): State<ObservabilityState>) -> Json<ReplayStatus> {
797    if let Some(status) = state.replay_engine.get_status() {
798        Json(ReplayStatus {
799            is_playing: true,
800            scenario_name: Some(status.scenario_name),
801            progress: status.progress,
802        })
803    } else {
804        Json(ReplayStatus {
805            is_playing: false,
806            scenario_name: None,
807            progress: 0.0,
808        })
809    }
810}
811
812/// Search recorded requests
813#[derive(Deserialize)]
814struct SearchRequest {
815    limit: Option<usize>,
816    protocol: Option<String>,
817    method: Option<String>,
818    path: Option<String>,
819    status_code: Option<u16>,
820    trace_id: Option<String>,
821    min_duration_ms: Option<f64>,
822    max_duration_ms: Option<f64>,
823    tags: Option<Vec<String>>,
824}
825
826#[derive(Serialize)]
827struct SearchResponse {
828    requests: Vec<RecordedRequest>,
829}
830
831#[derive(Serialize)]
832struct RecordedRequest {
833    id: i64,
834    timestamp: String,
835    protocol: String,
836    method: String,
837    path: String,
838    status_code: u16,
839    duration_ms: f64,
840    client_ip: Option<String>,
841    request_headers: serde_json::Value,
842    request_body: Option<String>,
843    response_headers: serde_json::Value,
844    response_body: Option<String>,
845}
846
847async fn search_requests(
848    State(state): State<ObservabilityState>,
849    Json(req): Json<SearchRequest>,
850) -> Json<SearchResponse> {
851    // Check if recorder is available
852    let Some(recorder) = &state.request_recorder else {
853        // Fall back to mock data if recorder is not available
854        let mock_requests = vec![RecordedRequest {
855            id: 1,
856            timestamp: chrono::Utc::now().to_rfc3339(),
857            protocol: "http".to_string(),
858            method: "GET".to_string(),
859            path: "/api/test".to_string(),
860            status_code: 200,
861            duration_ms: 150.0,
862            client_ip: Some("127.0.0.1".to_string()),
863            request_headers: serde_json::json!({"user-agent": "test"}),
864            request_body: None,
865            response_headers: serde_json::json!({"content-type": "application/json"}),
866            response_body: Some("{\"status\": \"ok\"}".to_string()),
867        }];
868        return Json(SearchResponse {
869            requests: mock_requests,
870        });
871    };
872
873    use mockforge_recorder::query::{execute_query, QueryFilter};
874
875    // Convert SearchRequest to QueryFilter
876    let filter = QueryFilter {
877        protocol: req.protocol.as_ref().and_then(|p| match p.as_str() {
878            "http" => Some(mockforge_recorder::models::Protocol::Http),
879            "grpc" => Some(mockforge_recorder::models::Protocol::Grpc),
880            "websocket" => Some(mockforge_recorder::models::Protocol::WebSocket),
881            "graphql" => Some(mockforge_recorder::models::Protocol::GraphQL),
882            _ => None,
883        }),
884        method: req.method.clone(),
885        path: req.path.clone(),
886        status_code: req.status_code.map(|s| s as i32),
887        trace_id: req.trace_id.clone(),
888        min_duration_ms: req.min_duration_ms.map(|d| d as i64),
889        max_duration_ms: req.max_duration_ms.map(|d| d as i64),
890        tags: req.tags.clone(),
891        limit: req.limit.map(|l| l as i32),
892        offset: None, // Not supported in current API
893    };
894
895    // Execute the query
896    match execute_query(recorder.database(), filter).await {
897        Ok(result) => {
898            // Convert RecordedExchange to RecordedRequest format
899            let requests: Vec<RecordedRequest> = result
900                .exchanges
901                .into_iter()
902                .map(|exchange| RecordedRequest {
903                    id: exchange.request.id.parse().unwrap_or(0),
904                    timestamp: exchange.request.timestamp.to_rfc3339(),
905                    protocol: exchange.request.protocol.as_str().to_string(),
906                    method: exchange.request.method,
907                    path: exchange.request.path,
908                    status_code: exchange.request.status_code.unwrap_or(0) as u16,
909                    duration_ms: exchange.request.duration_ms.unwrap_or(0) as f64,
910                    client_ip: exchange.request.client_ip,
911                    request_headers: serde_json::from_str(&exchange.request.headers)
912                        .unwrap_or(serde_json::json!({})),
913                    request_body: exchange.request.body,
914                    response_headers: exchange
915                        .response
916                        .as_ref()
917                        .and_then(|r| serde_json::from_str(&r.headers).ok())
918                        .unwrap_or(serde_json::json!({})),
919                    response_body: exchange.response.as_ref().and_then(|r| r.body.clone()),
920                })
921                .collect();
922
923            Json(SearchResponse { requests })
924        }
925        Err(err) => {
926            tracing::error!("Failed to search requests: {}", err);
927            // Return empty result on error
928            Json(SearchResponse { requests: vec![] })
929        }
930    }
931}
932
933// ===== Advanced Observability Endpoints =====
934
935/// Get flamegraph for a trace
936async fn get_flamegraph(
937    State(state): State<ObservabilityState>,
938    axum::extract::Path(trace_id): axum::extract::Path<String>,
939) -> Json<ApiResponse<FlamegraphResponse>> {
940    tracing::info!("Generating flamegraph for trace: {}", trace_id);
941
942    // Get actual trace data
943    let collected_traces = match state.trace_collector.get_trace_by_id(&trace_id).await {
944        Ok(traces) => traces,
945        Err(e) => {
946            return Json(ApiResponse::error(format!("Failed to retrieve trace data: {}", e)));
947        }
948    };
949
950    if collected_traces.is_empty() {
951        return Json(ApiResponse::error(format!("No trace found with ID: {}", trace_id)));
952    }
953
954    // Generate flamegraph SVG from trace data
955    let svg_content = generate_flamegraph_from_trace(&trace_id, &collected_traces);
956    let svg_path = format!("/tmp/flamegraph_{}.svg", trace_id);
957
958    // Write SVG to file
959    if let Err(e) = std::fs::write(&svg_path, svg_content) {
960        return Json(ApiResponse::error(format!("Failed to generate flamegraph: {}", e)));
961    }
962
963    // Calculate stats from actual trace data
964    let total_spans = collected_traces.len();
965    let max_depth = calculate_max_depth(&collected_traces);
966    let total_duration_us =
967        collected_traces.iter().map(|t| t.duration_ms * 1000).max().unwrap_or(0);
968    let hottest_path = find_hottest_path(&collected_traces);
969
970    let stats = FlamegraphStatsResponse {
971        total_spans,
972        max_depth,
973        total_duration_us,
974        hottest_path,
975    };
976
977    Json(ApiResponse::success(FlamegraphResponse {
978        trace_id: trace_id.clone(),
979        svg_url: format!("/flamegraphs/{}.svg", trace_id),
980        stats,
981    }))
982}
983
984#[derive(Serialize)]
985struct FlamegraphResponse {
986    trace_id: String,
987    svg_url: String,
988    stats: FlamegraphStatsResponse,
989}
990
991#[derive(Serialize)]
992struct FlamegraphStatsResponse {
993    total_spans: usize,
994    max_depth: usize,
995    total_duration_us: u64,
996    hottest_path: Vec<String>,
997}
998
999/// List dashboard layouts
1000async fn list_dashboard_layouts(
1001    State(state): State<ObservabilityState>,
1002) -> Json<ApiResponse<Vec<DashboardLayoutSummary>>> {
1003    tracing::info!("Listing dashboard layouts");
1004    let layouts = state.layout_manager.list_layouts();
1005    Json(ApiResponse::success(layouts))
1006}
1007
1008#[derive(Serialize, Clone)]
1009pub struct DashboardLayoutSummary {
1010    id: String,
1011    name: String,
1012    description: Option<String>,
1013    widget_count: usize,
1014}
1015
1016/// Create dashboard layout
1017#[derive(Deserialize)]
1018struct CreateDashboardLayoutRequest {
1019    name: String,
1020    description: Option<String>,
1021    layout_data: serde_json::Value,
1022}
1023
1024/// Create dashboard layout
1025async fn create_dashboard_layout(
1026    State(state): State<ObservabilityState>,
1027    Json(req): Json<CreateDashboardLayoutRequest>,
1028) -> Json<ApiResponse<String>> {
1029    tracing::info!("Creating dashboard layout: {}", req.name);
1030    let id = format!("layout-{}", chrono::Utc::now().timestamp());
1031    let widget_count = req.layout_data.as_array().map(|a| a.len()).unwrap_or(0);
1032    let layout = DashboardLayoutSummary {
1033        id: id.clone(),
1034        name: req.name,
1035        description: req.description,
1036        widget_count,
1037    };
1038    state.layout_manager.create_layout(layout);
1039    Json(ApiResponse::success(id))
1040}
1041
1042/// Get dashboard layout
1043async fn get_dashboard_layout(
1044    State(state): State<ObservabilityState>,
1045    axum::extract::Path(id): axum::extract::Path<String>,
1046) -> Json<ApiResponse<serde_json::Value>> {
1047    tracing::info!("Getting dashboard layout: {}", id);
1048    if let Some(layout) = state.layout_manager.get_layout(&id) {
1049        Json(ApiResponse::success(serde_json::json!({
1050            "id": layout.id,
1051            "name": layout.name,
1052            "description": layout.description,
1053            "widget_count": layout.widget_count
1054        })))
1055    } else {
1056        Json(ApiResponse::error(format!("Layout '{}' not found", id)))
1057    }
1058}
1059
1060/// Update dashboard layout
1061async fn update_dashboard_layout(
1062    State(state): State<ObservabilityState>,
1063    axum::extract::Path(id): axum::extract::Path<String>,
1064    Json(req): Json<CreateDashboardLayoutRequest>,
1065) -> Json<ApiResponse<String>> {
1066    tracing::info!("Updating dashboard layout: {}", id);
1067    let widget_count = req.layout_data.as_array().map(|a| a.len()).unwrap_or(0);
1068    let layout = DashboardLayoutSummary {
1069        id: id.clone(),
1070        name: req.name,
1071        description: req.description,
1072        widget_count,
1073    };
1074    state.layout_manager.update_layout(&id, layout);
1075    Json(ApiResponse::success("Updated".to_string()))
1076}
1077
1078/// Delete dashboard layout
1079async fn delete_dashboard_layout(
1080    State(state): State<ObservabilityState>,
1081    axum::extract::Path(id): axum::extract::Path<String>,
1082) -> Json<ApiResponse<String>> {
1083    tracing::info!("Deleting dashboard layout: {}", id);
1084    state.layout_manager.delete_layout(&id);
1085    Json(ApiResponse::success("Deleted".to_string()))
1086}
1087
1088/// Get dashboard templates
1089async fn get_dashboard_templates(
1090    State(_state): State<ObservabilityState>,
1091) -> Json<ApiResponse<Vec<DashboardLayoutSummary>>> {
1092    tracing::info!("Getting dashboard templates");
1093    // For now, return static templates
1094    Json(ApiResponse::success(vec![
1095        DashboardLayoutSummary {
1096            id: "template-chaos-overview".to_string(),
1097            name: "Chaos Engineering Overview".to_string(),
1098            description: Some("Pre-built chaos engineering dashboard".to_string()),
1099            widget_count: 3,
1100        },
1101        DashboardLayoutSummary {
1102            id: "template-resilience".to_string(),
1103            name: "Resilience Testing".to_string(),
1104            description: Some("Monitor resilience patterns".to_string()),
1105            widget_count: 2,
1106        },
1107    ]))
1108}
1109
1110/// Generate PDF report
1111#[derive(Deserialize)]
1112struct GeneratePdfRequest {
1113    scenario_name: String,
1114    include_charts: bool,
1115}
1116
1117/// Generate a simple PDF report for a chaos scenario
1118fn generate_scenario_pdf(
1119    scenario_name: &str,
1120    include_charts: bool,
1121    output_path: &str,
1122) -> Result<(), Box<dyn std::error::Error>> {
1123    let (doc, page1, layer1) = PdfDocument::new(
1124        format!("Chaos Engineering Report - {}", scenario_name),
1125        Mm(210.0), // A4 width
1126        Mm(297.0), // A4 height
1127        "Layer 1",
1128    );
1129
1130    let font = doc.add_builtin_font(BuiltinFont::Helvetica)?;
1131    let font_bold = doc.add_builtin_font(BuiltinFont::HelveticaBold)?;
1132
1133    let current_layer = doc.get_page(page1).get_layer(layer1);
1134
1135    // Title
1136    current_layer.use_text(
1137        "Chaos Engineering Report".to_string(),
1138        20.0,
1139        Mm(20.0),
1140        Mm(270.0),
1141        &font_bold,
1142    );
1143
1144    // Scenario name
1145    current_layer.use_text(
1146        format!("Scenario: {}", scenario_name),
1147        14.0,
1148        Mm(20.0),
1149        Mm(250.0),
1150        &font,
1151    );
1152
1153    // Generated timestamp
1154    let now = chrono::Utc::now();
1155    current_layer.use_text(
1156        format!("Generated: {}", now.format("%Y-%m-%d %H:%M:%S UTC")),
1157        10.0,
1158        Mm(20.0),
1159        Mm(235.0),
1160        &font,
1161    );
1162
1163    // Summary section
1164    current_layer.use_text("Summary", 14.0, Mm(20.0), Mm(210.0), &font_bold);
1165
1166    let mut y = 190.0;
1167    let metrics = [
1168        ("Total Requests", "1000"),
1169        ("Success Rate", "95.2%"),
1170        ("Average Latency", "150ms"),
1171        ("Error Rate", "4.8%"),
1172    ];
1173
1174    for (label, value) in &metrics {
1175        current_layer.use_text(format!("{}: {}", label, value), 10.0, Mm(20.0), Mm(y), &font);
1176        y -= 10.0;
1177    }
1178
1179    // Charts section if requested
1180    if include_charts {
1181        y -= 20.0;
1182        current_layer.use_text("Charts", 14.0, Mm(20.0), Mm(y), &font_bold);
1183        y -= 15.0;
1184        current_layer.use_text(
1185            "[Chart placeholder - would include actual charts in full implementation]",
1186            10.0,
1187            Mm(20.0),
1188            Mm(y),
1189            &font,
1190        );
1191    }
1192
1193    // Save the PDF
1194    use std::io::BufWriter;
1195    doc.save(&mut BufWriter::new(std::fs::File::create(output_path)?))?;
1196
1197    Ok(())
1198}
1199
1200async fn generate_pdf_report(
1201    State(_state): State<ObservabilityState>,
1202    Json(req): Json<GeneratePdfRequest>,
1203) -> Response {
1204    tracing::info!("Generating PDF report for: {}", req.scenario_name);
1205
1206    let pdf_path = format!("/tmp/report_{}.pdf", req.scenario_name);
1207
1208    // Generate PDF using printpdf directly
1209    if let Err(e) = generate_scenario_pdf(&req.scenario_name, req.include_charts, &pdf_path) {
1210        return Json(ApiResponse::<String>::error(format!("Failed to generate PDF: {}", e)))
1211            .into_response();
1212    }
1213
1214    Json(ApiResponse::success(format!("/reports/{}.pdf", req.scenario_name))).into_response()
1215}
1216
1217/// Generate CSV report
1218#[derive(Deserialize)]
1219struct GenerateCsvRequest {
1220    scenario_names: Vec<String>,
1221    include_comparison: bool,
1222}
1223
1224async fn generate_csv_report(
1225    State(_state): State<ObservabilityState>,
1226    Json(req): Json<GenerateCsvRequest>,
1227) -> Response {
1228    tracing::info!("Generating CSV report for: {:?}", req.scenario_names);
1229
1230    let csv_content = generate_csv_content(&req.scenario_names, req.include_comparison);
1231    let csv_path = "/tmp/scenarios_report.csv";
1232
1233    if let Err(e) = std::fs::write(csv_path, csv_content) {
1234        return Json(ApiResponse::<String>::error(format!("Failed to generate CSV: {}", e)))
1235            .into_response();
1236    }
1237
1238    Json(ApiResponse::success("/reports/scenarios.csv".to_string())).into_response()
1239}
1240
1241/// Compare scenarios
1242#[derive(Deserialize)]
1243struct CompareRequest {
1244    baseline_scenario: String,
1245    comparison_scenarios: Vec<String>,
1246}
1247
1248async fn compare_scenarios(
1249    State(_state): State<ObservabilityState>,
1250    Json(req): Json<CompareRequest>,
1251) -> Json<ApiResponse<ComparisonResult>> {
1252    tracing::info!(
1253        "Comparing scenarios - baseline: {}, comparisons: {:?}",
1254        req.baseline_scenario,
1255        req.comparison_scenarios
1256    );
1257
1258    let comparison = perform_scenario_comparison(&req.baseline_scenario, &req.comparison_scenarios);
1259
1260    Json(ApiResponse::success(comparison))
1261}
1262
1263#[derive(Serialize)]
1264struct ComparisonResult {
1265    baseline: String,
1266    comparisons: Vec<String>,
1267    regressions_count: usize,
1268    improvements_count: usize,
1269    verdict: String,
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274    use super::*;
1275
1276    #[test]
1277    fn test_api_response_success() {
1278        let response = ApiResponse::success("test");
1279        assert!(response.success);
1280        assert_eq!(response.data, Some("test"));
1281        assert!(response.error.is_none());
1282    }
1283
1284    #[test]
1285    fn test_api_response_error() {
1286        let response: ApiResponse<String> = ApiResponse::error("error".to_string());
1287        assert!(!response.success);
1288        assert!(response.data.is_none());
1289        assert_eq!(response.error, Some("error".to_string()));
1290    }
1291}