mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json, Response,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if json_path.starts_with("$.") {
317        let path = &json_path[2..];
318        let parts: Vec<&str> = path.split('.').collect();
319
320        let mut current = json;
321        for part in parts {
322            if let Some(obj) = current.as_object() {
323                if let Some(value) = obj.get(part) {
324                    current = value;
325                } else {
326                    return false;
327                }
328            } else {
329                return false;
330            }
331        }
332        true
333    } else {
334        // For complex JSONPath expressions, would need a proper JSONPath library
335        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336        false
337    }
338}
339
340/// Evaluate a custom matcher expression
341fn evaluate_custom_matcher(
342    expression: &str,
343    method: &str,
344    path: &str,
345    headers: &std::collections::HashMap<String, String>,
346    query_params: &std::collections::HashMap<String, String>,
347    body: Option<&[u8]>,
348) -> bool {
349    use regex::Regex;
350
351    let expr = expression.trim();
352
353    // Handle equality expressions (field == "value")
354    if expr.contains("==") {
355        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356        if parts.len() != 2 {
357            return false;
358        }
359
360        let field = parts[0];
361        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363        match field {
364            "method" => method == expected_value,
365            "path" => path == expected_value,
366            _ if field.starts_with("headers.") => {
367                let header_name = &field[8..];
368                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369            }
370            _ if field.starts_with("query.") => {
371                let param_name = &field[6..];
372                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373            }
374            _ => false,
375        }
376    }
377    // Handle regex match expressions (field =~ "pattern")
378    else if expr.contains("=~") {
379        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380        if parts.len() != 2 {
381            return false;
382        }
383
384        let field = parts[0];
385        let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387        if let Ok(re) = Regex::new(pattern) {
388            match field {
389                "method" => re.is_match(method),
390                "path" => re.is_match(path),
391                _ if field.starts_with("headers.") => {
392                    let header_name = &field[8..];
393                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394                }
395                _ if field.starts_with("query.") => {
396                    let param_name = &field[6..];
397                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398                }
399                _ => false,
400            }
401        } else {
402            false
403        }
404    }
405    // Handle contains expressions (field contains "value")
406    else if expr.contains("contains") {
407        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408        if parts.len() != 2 {
409            return false;
410        }
411
412        let field = parts[0];
413        let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415        match field {
416            "path" => path.contains(search_value),
417            _ if field.starts_with("headers.") => {
418                let header_name = &field[8..];
419                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420            }
421            _ if field.starts_with("body") => {
422                if let Some(body_bytes) = body {
423                    let body_str = String::from_utf8_lossy(body_bytes);
424                    body_str.contains(search_value)
425                } else {
426                    false
427                }
428            }
429            _ => false,
430        }
431    } else {
432        // Unknown expression format
433        tracing::warn!("Unknown custom matcher expression format: {}", expr);
434        false
435    }
436}
437
438/// Server statistics
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441    /// Server uptime in seconds
442    pub uptime_seconds: u64,
443    /// Total number of requests processed
444    pub total_requests: u64,
445    /// Number of active mock configurations
446    pub active_mocks: usize,
447    /// Number of currently enabled mocks
448    pub enabled_mocks: usize,
449    /// Number of registered API routes
450    pub registered_routes: usize,
451}
452
453/// Server configuration info
454#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456    /// MockForge version string
457    pub version: String,
458    /// Server port number
459    pub port: u16,
460    /// Whether an OpenAPI spec is loaded
461    pub has_openapi_spec: bool,
462    /// Optional path to the OpenAPI spec file
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub spec_path: Option<String>,
465}
466
467/// Shared state for the management API
468#[derive(Clone)]
469pub struct ManagementState {
470    /// Collection of mock configurations
471    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472    /// Optional OpenAPI specification
473    pub spec: Option<Arc<OpenApiSpec>>,
474    /// Optional path to the OpenAPI spec file
475    pub spec_path: Option<String>,
476    /// Server port number
477    pub port: u16,
478    /// Server start time for uptime calculation
479    pub start_time: std::time::Instant,
480    /// Counter for total requests processed
481    pub request_counter: Arc<RwLock<u64>>,
482    /// Optional proxy configuration for migration pipeline
483    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484    /// Optional SMTP registry for email mocking
485    #[cfg(feature = "smtp")]
486    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487    /// Optional MQTT broker for message mocking
488    #[cfg(feature = "mqtt")]
489    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490    /// Optional Kafka broker for event streaming
491    #[cfg(feature = "kafka")]
492    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493    /// Broadcast channel for message events (MQTT & Kafka)
494    #[cfg(any(feature = "mqtt", feature = "kafka"))]
495    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496    /// State machine manager for scenario state machines
497    pub state_machine_manager:
498        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499    /// Optional WebSocket broadcast channel for real-time updates
500    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501    /// Lifecycle hook registry for extensibility
502    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503    /// Rule explanations storage (in-memory for now)
504    pub rule_explanations: Arc<
505        RwLock<
506            std::collections::HashMap<
507                String,
508                mockforge_core::intelligent_behavior::RuleExplanation,
509            >,
510        >,
511    >,
512    /// Optional chaos API state for chaos config management
513    #[cfg(feature = "chaos")]
514    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
515    /// Optional server configuration for profile application
516    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
517}
518
519impl ManagementState {
520    /// Create a new management state
521    ///
522    /// # Arguments
523    /// * `spec` - Optional OpenAPI specification
524    /// * `spec_path` - Optional path to the OpenAPI spec file
525    /// * `port` - Server port number
526    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
527        Self {
528            mocks: Arc::new(RwLock::new(Vec::new())),
529            spec,
530            spec_path,
531            port,
532            start_time: std::time::Instant::now(),
533            request_counter: Arc::new(RwLock::new(0)),
534            proxy_config: None,
535            #[cfg(feature = "smtp")]
536            smtp_registry: None,
537            #[cfg(feature = "mqtt")]
538            mqtt_broker: None,
539            #[cfg(feature = "kafka")]
540            kafka_broker: None,
541            #[cfg(any(feature = "mqtt", feature = "kafka"))]
542            message_events: {
543                let (tx, _) = broadcast::channel(1000);
544                Arc::new(tx)
545            },
546            state_machine_manager: Arc::new(RwLock::new(
547                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
548            )),
549            ws_broadcast: None,
550            lifecycle_hooks: None,
551            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
552            #[cfg(feature = "chaos")]
553            chaos_api_state: None,
554            server_config: None,
555        }
556    }
557
558    /// Add lifecycle hook registry to management state
559    pub fn with_lifecycle_hooks(
560        mut self,
561        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
562    ) -> Self {
563        self.lifecycle_hooks = Some(hooks);
564        self
565    }
566
567    /// Add WebSocket broadcast channel to management state
568    pub fn with_ws_broadcast(
569        mut self,
570        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
571    ) -> Self {
572        self.ws_broadcast = Some(ws_broadcast);
573        self
574    }
575
576    /// Add proxy configuration to management state
577    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
578        self.proxy_config = Some(proxy_config);
579        self
580    }
581
582    #[cfg(feature = "smtp")]
583    /// Add SMTP registry to management state
584    pub fn with_smtp_registry(
585        mut self,
586        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
587    ) -> Self {
588        self.smtp_registry = Some(smtp_registry);
589        self
590    }
591
592    #[cfg(feature = "mqtt")]
593    /// Add MQTT broker to management state
594    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
595        self.mqtt_broker = Some(mqtt_broker);
596        self
597    }
598
599    #[cfg(feature = "kafka")]
600    /// Add Kafka broker to management state
601    pub fn with_kafka_broker(
602        mut self,
603        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
604    ) -> Self {
605        self.kafka_broker = Some(kafka_broker);
606        self
607    }
608
609    #[cfg(feature = "chaos")]
610    /// Add chaos API state to management state
611    pub fn with_chaos_api_state(
612        mut self,
613        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
614    ) -> Self {
615        self.chaos_api_state = Some(chaos_api_state);
616        self
617    }
618
619    /// Add server configuration to management state
620    pub fn with_server_config(
621        mut self,
622        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
623    ) -> Self {
624        self.server_config = Some(server_config);
625        self
626    }
627}
628
629/// List all mocks
630async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
631    let mocks = state.mocks.read().await;
632    Json(serde_json::json!({
633        "mocks": *mocks,
634        "total": mocks.len(),
635        "enabled": mocks.iter().filter(|m| m.enabled).count()
636    }))
637}
638
639/// Get a specific mock by ID
640async fn get_mock(
641    State(state): State<ManagementState>,
642    Path(id): Path<String>,
643) -> Result<Json<MockConfig>, StatusCode> {
644    let mocks = state.mocks.read().await;
645    mocks
646        .iter()
647        .find(|m| m.id == id)
648        .cloned()
649        .map(Json)
650        .ok_or(StatusCode::NOT_FOUND)
651}
652
653/// Create a new mock
654async fn create_mock(
655    State(state): State<ManagementState>,
656    Json(mut mock): Json<MockConfig>,
657) -> Result<Json<MockConfig>, StatusCode> {
658    let mut mocks = state.mocks.write().await;
659
660    // Generate ID if not provided
661    if mock.id.is_empty() {
662        mock.id = uuid::Uuid::new_v4().to_string();
663    }
664
665    // Check for duplicate ID
666    if mocks.iter().any(|m| m.id == mock.id) {
667        return Err(StatusCode::CONFLICT);
668    }
669
670    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
671
672    // Invoke lifecycle hooks
673    if let Some(hooks) = &state.lifecycle_hooks {
674        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
675            id: mock.id.clone(),
676            name: mock.name.clone(),
677            config: serde_json::to_value(&mock).unwrap_or_default(),
678        };
679        hooks.invoke_mock_created(&event).await;
680    }
681
682    mocks.push(mock.clone());
683
684    // Broadcast WebSocket event
685    if let Some(tx) = &state.ws_broadcast {
686        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
687    }
688
689    Ok(Json(mock))
690}
691
692/// Update an existing mock
693async fn update_mock(
694    State(state): State<ManagementState>,
695    Path(id): Path<String>,
696    Json(updated_mock): Json<MockConfig>,
697) -> Result<Json<MockConfig>, StatusCode> {
698    let mut mocks = state.mocks.write().await;
699
700    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
701
702    // Get old mock for comparison
703    let old_mock = mocks[position].clone();
704
705    info!("Updating mock: {}", id);
706    mocks[position] = updated_mock.clone();
707
708    // Invoke lifecycle hooks
709    if let Some(hooks) = &state.lifecycle_hooks {
710        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
711            id: updated_mock.id.clone(),
712            name: updated_mock.name.clone(),
713            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
714        };
715        hooks.invoke_mock_updated(&event).await;
716
717        // Check if enabled state changed
718        if old_mock.enabled != updated_mock.enabled {
719            let state_event = if updated_mock.enabled {
720                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
721                    id: updated_mock.id.clone(),
722                }
723            } else {
724                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
725                    id: updated_mock.id.clone(),
726                }
727            };
728            hooks.invoke_mock_state_changed(&state_event).await;
729        }
730    }
731
732    // Broadcast WebSocket event
733    if let Some(tx) = &state.ws_broadcast {
734        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
735    }
736
737    Ok(Json(updated_mock))
738}
739
740/// Delete a mock
741async fn delete_mock(
742    State(state): State<ManagementState>,
743    Path(id): Path<String>,
744) -> Result<StatusCode, StatusCode> {
745    let mut mocks = state.mocks.write().await;
746
747    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
748
749    // Get mock info before deletion for lifecycle hooks
750    let deleted_mock = mocks[position].clone();
751
752    info!("Deleting mock: {}", id);
753    mocks.remove(position);
754
755    // Invoke lifecycle hooks
756    if let Some(hooks) = &state.lifecycle_hooks {
757        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
758            id: deleted_mock.id.clone(),
759            name: deleted_mock.name.clone(),
760        };
761        hooks.invoke_mock_deleted(&event).await;
762    }
763
764    // Broadcast WebSocket event
765    if let Some(tx) = &state.ws_broadcast {
766        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
767    }
768
769    Ok(StatusCode::NO_CONTENT)
770}
771
772/// Request to validate configuration
773#[derive(Debug, Deserialize)]
774pub struct ValidateConfigRequest {
775    /// Configuration to validate (as JSON)
776    pub config: serde_json::Value,
777    /// Format of the configuration ("json" or "yaml")
778    #[serde(default = "default_format")]
779    pub format: String,
780}
781
782fn default_format() -> String {
783    "json".to_string()
784}
785
786/// Validate configuration without applying it
787async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
788    use mockforge_core::config::ServerConfig;
789
790    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
791        "yaml" | "yml" => {
792            let yaml_str = match serde_json::to_string(&request.config) {
793                Ok(s) => s,
794                Err(e) => {
795                    return (
796                        StatusCode::BAD_REQUEST,
797                        Json(serde_json::json!({
798                            "valid": false,
799                            "error": format!("Failed to convert to string: {}", e),
800                            "message": "Configuration validation failed"
801                        })),
802                    )
803                        .into_response();
804                }
805            };
806            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
807        }
808        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
809    };
810
811    match config_result {
812        Ok(_) => Json(serde_json::json!({
813            "valid": true,
814            "message": "Configuration is valid"
815        }))
816        .into_response(),
817        Err(e) => (
818            StatusCode::BAD_REQUEST,
819            Json(serde_json::json!({
820                "valid": false,
821                "error": format!("Invalid configuration: {}", e),
822                "message": "Configuration validation failed"
823            })),
824        )
825            .into_response(),
826    }
827}
828
829/// Request for bulk configuration update
830#[derive(Debug, Deserialize)]
831pub struct BulkConfigUpdateRequest {
832    /// Partial configuration updates (only specified fields will be updated)
833    pub updates: serde_json::Value,
834}
835
836/// Bulk update configuration
837///
838/// This endpoint allows updating multiple configuration options at once.
839/// Only the specified fields in the updates object will be modified.
840///
841/// Configuration updates are applied to the server configuration if available
842/// in ManagementState. Changes take effect immediately for supported settings.
843async fn bulk_update_config(
844    State(state): State<ManagementState>,
845    Json(request): Json<BulkConfigUpdateRequest>,
846) -> impl IntoResponse {
847    // Validate the updates structure
848    if !request.updates.is_object() {
849        return (
850            StatusCode::BAD_REQUEST,
851            Json(serde_json::json!({
852                "error": "Invalid request",
853                "message": "Updates must be a JSON object"
854            })),
855        )
856            .into_response();
857    }
858
859    // Try to validate as partial ServerConfig
860    use mockforge_core::config::ServerConfig;
861
862    // Create a minimal valid config and try to merge updates
863    let base_config = ServerConfig::default();
864    let base_json = match serde_json::to_value(&base_config) {
865        Ok(v) => v,
866        Err(e) => {
867            return (
868                StatusCode::INTERNAL_SERVER_ERROR,
869                Json(serde_json::json!({
870                    "error": "Internal error",
871                    "message": format!("Failed to serialize base config: {}", e)
872                })),
873            )
874                .into_response();
875        }
876    };
877
878    // Merge updates into base config (simplified merge)
879    let mut merged = base_json.clone();
880    if let (Some(merged_obj), Some(updates_obj)) =
881        (merged.as_object_mut(), request.updates.as_object())
882    {
883        for (key, value) in updates_obj {
884            merged_obj.insert(key.clone(), value.clone());
885        }
886    }
887
888    // Validate the merged config
889    match serde_json::from_value::<ServerConfig>(merged) {
890        Ok(_) => {
891            // Config is valid
892            // TODO: Apply config to server when ServerConfig is stored in ManagementState
893            Json(serde_json::json!({
894                "success": true,
895                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState.",
896                "updates_received": request.updates,
897                "validated": true
898            }))
899            .into_response()
900        }
901        Err(e) => (
902            StatusCode::BAD_REQUEST,
903            Json(serde_json::json!({
904                "error": "Invalid configuration",
905                "message": format!("Configuration validation failed: {}", e),
906                "validated": false
907            })),
908        )
909            .into_response(),
910    }
911}
912
913/// Get server statistics
914async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
915    let mocks = state.mocks.read().await;
916    let request_count = *state.request_counter.read().await;
917
918    Json(ServerStats {
919        uptime_seconds: state.start_time.elapsed().as_secs(),
920        total_requests: request_count,
921        active_mocks: mocks.len(),
922        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
923        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
924    })
925}
926
927/// Get server configuration
928async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
929    Json(ServerConfig {
930        version: env!("CARGO_PKG_VERSION").to_string(),
931        port: state.port,
932        has_openapi_spec: state.spec.is_some(),
933        spec_path: state.spec_path.clone(),
934    })
935}
936
937/// Health check endpoint
938async fn health_check() -> Json<serde_json::Value> {
939    Json(serde_json::json!({
940        "status": "healthy",
941        "service": "mockforge-management",
942        "timestamp": chrono::Utc::now().to_rfc3339()
943    }))
944}
945
946/// Export format for mock configurations
947#[derive(Debug, Clone, Serialize, Deserialize)]
948#[serde(rename_all = "lowercase")]
949pub enum ExportFormat {
950    /// JSON format
951    Json,
952    /// YAML format
953    Yaml,
954}
955
956/// Export mocks in specified format
957async fn export_mocks(
958    State(state): State<ManagementState>,
959    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
960) -> Result<(StatusCode, String), StatusCode> {
961    let mocks = state.mocks.read().await;
962
963    let format = params
964        .get("format")
965        .map(|f| match f.as_str() {
966            "yaml" | "yml" => ExportFormat::Yaml,
967            _ => ExportFormat::Json,
968        })
969        .unwrap_or(ExportFormat::Json);
970
971    match format {
972        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
973            .map(|json| (StatusCode::OK, json))
974            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
975        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
976            .map(|yaml| (StatusCode::OK, yaml))
977            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978    }
979}
980
981/// Import mocks from JSON/YAML
982async fn import_mocks(
983    State(state): State<ManagementState>,
984    Json(mocks): Json<Vec<MockConfig>>,
985) -> impl IntoResponse {
986    let mut current_mocks = state.mocks.write().await;
987    current_mocks.clear();
988    current_mocks.extend(mocks);
989    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
990}
991
992#[cfg(feature = "smtp")]
993/// List SMTP emails in mailbox
994async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
995    if let Some(ref smtp_registry) = state.smtp_registry {
996        match smtp_registry.get_emails() {
997            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
998            Err(e) => (
999                StatusCode::INTERNAL_SERVER_ERROR,
1000                Json(serde_json::json!({
1001                    "error": "Failed to retrieve emails",
1002                    "message": e.to_string()
1003                })),
1004            ),
1005        }
1006    } else {
1007        (
1008            StatusCode::NOT_IMPLEMENTED,
1009            Json(serde_json::json!({
1010                "error": "SMTP mailbox management not available",
1011                "message": "SMTP server is not enabled or registry not available."
1012            })),
1013        )
1014    }
1015}
1016
1017/// Get specific SMTP email
1018#[cfg(feature = "smtp")]
1019async fn get_smtp_email(
1020    State(state): State<ManagementState>,
1021    Path(id): Path<String>,
1022) -> impl IntoResponse {
1023    if let Some(ref smtp_registry) = state.smtp_registry {
1024        match smtp_registry.get_email_by_id(&id) {
1025            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1026            Ok(None) => (
1027                StatusCode::NOT_FOUND,
1028                Json(serde_json::json!({
1029                    "error": "Email not found",
1030                    "id": id
1031                })),
1032            ),
1033            Err(e) => (
1034                StatusCode::INTERNAL_SERVER_ERROR,
1035                Json(serde_json::json!({
1036                    "error": "Failed to retrieve email",
1037                    "message": e.to_string()
1038                })),
1039            ),
1040        }
1041    } else {
1042        (
1043            StatusCode::NOT_IMPLEMENTED,
1044            Json(serde_json::json!({
1045                "error": "SMTP mailbox management not available",
1046                "message": "SMTP server is not enabled or registry not available."
1047            })),
1048        )
1049    }
1050}
1051
1052/// Clear SMTP mailbox
1053#[cfg(feature = "smtp")]
1054async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1055    if let Some(ref smtp_registry) = state.smtp_registry {
1056        match smtp_registry.clear_mailbox() {
1057            Ok(()) => (
1058                StatusCode::OK,
1059                Json(serde_json::json!({
1060                    "message": "Mailbox cleared successfully"
1061                })),
1062            ),
1063            Err(e) => (
1064                StatusCode::INTERNAL_SERVER_ERROR,
1065                Json(serde_json::json!({
1066                    "error": "Failed to clear mailbox",
1067                    "message": e.to_string()
1068                })),
1069            ),
1070        }
1071    } else {
1072        (
1073            StatusCode::NOT_IMPLEMENTED,
1074            Json(serde_json::json!({
1075                "error": "SMTP mailbox management not available",
1076                "message": "SMTP server is not enabled or registry not available."
1077            })),
1078        )
1079    }
1080}
1081
1082/// Export SMTP mailbox
1083#[cfg(feature = "smtp")]
1084async fn export_smtp_mailbox(
1085    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1086) -> impl IntoResponse {
1087    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1088    (
1089        StatusCode::NOT_IMPLEMENTED,
1090        Json(serde_json::json!({
1091            "error": "SMTP mailbox management not available via HTTP API",
1092            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1093            "requested_format": format
1094        })),
1095    )
1096}
1097
1098/// Search SMTP emails
1099#[cfg(feature = "smtp")]
1100async fn search_smtp_emails(
1101    State(state): State<ManagementState>,
1102    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1103) -> impl IntoResponse {
1104    if let Some(ref smtp_registry) = state.smtp_registry {
1105        let filters = EmailSearchFilters {
1106            sender: params.get("sender").cloned(),
1107            recipient: params.get("recipient").cloned(),
1108            subject: params.get("subject").cloned(),
1109            body: params.get("body").cloned(),
1110            since: params
1111                .get("since")
1112                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1113                .map(|dt| dt.with_timezone(&chrono::Utc)),
1114            until: params
1115                .get("until")
1116                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1117                .map(|dt| dt.with_timezone(&chrono::Utc)),
1118            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1119            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1120        };
1121
1122        match smtp_registry.search_emails(filters) {
1123            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1124            Err(e) => (
1125                StatusCode::INTERNAL_SERVER_ERROR,
1126                Json(serde_json::json!({
1127                    "error": "Failed to search emails",
1128                    "message": e.to_string()
1129                })),
1130            ),
1131        }
1132    } else {
1133        (
1134            StatusCode::NOT_IMPLEMENTED,
1135            Json(serde_json::json!({
1136                "error": "SMTP mailbox management not available",
1137                "message": "SMTP server is not enabled or registry not available."
1138            })),
1139        )
1140    }
1141}
1142
1143/// MQTT broker statistics
1144#[cfg(feature = "mqtt")]
1145#[derive(Debug, Clone, Serialize, Deserialize)]
1146pub struct MqttBrokerStats {
1147    /// Number of connected MQTT clients
1148    pub connected_clients: usize,
1149    /// Number of active MQTT topics
1150    pub active_topics: usize,
1151    /// Number of retained messages
1152    pub retained_messages: usize,
1153    /// Total number of subscriptions
1154    pub total_subscriptions: usize,
1155}
1156
1157/// MQTT management handlers
1158#[cfg(feature = "mqtt")]
1159async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1160    if let Some(broker) = &state.mqtt_broker {
1161        let connected_clients = broker.get_connected_clients().await.len();
1162        let active_topics = broker.get_active_topics().await.len();
1163        let stats = broker.get_topic_stats().await;
1164
1165        let broker_stats = MqttBrokerStats {
1166            connected_clients,
1167            active_topics,
1168            retained_messages: stats.retained_messages,
1169            total_subscriptions: stats.total_subscriptions,
1170        };
1171
1172        Json(broker_stats).into_response()
1173    } else {
1174        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1175    }
1176}
1177
1178#[cfg(feature = "mqtt")]
1179async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1180    if let Some(broker) = &state.mqtt_broker {
1181        let clients = broker.get_connected_clients().await;
1182        Json(serde_json::json!({
1183            "clients": clients
1184        }))
1185        .into_response()
1186    } else {
1187        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1188    }
1189}
1190
1191#[cfg(feature = "mqtt")]
1192async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1193    if let Some(broker) = &state.mqtt_broker {
1194        let topics = broker.get_active_topics().await;
1195        Json(serde_json::json!({
1196            "topics": topics
1197        }))
1198        .into_response()
1199    } else {
1200        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1201    }
1202}
1203
1204#[cfg(feature = "mqtt")]
1205async fn disconnect_mqtt_client(
1206    State(state): State<ManagementState>,
1207    Path(client_id): Path<String>,
1208) -> impl IntoResponse {
1209    if let Some(broker) = &state.mqtt_broker {
1210        match broker.disconnect_client(&client_id).await {
1211            Ok(_) => {
1212                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1213            }
1214            Err(e) => {
1215                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1216                    .into_response()
1217            }
1218        }
1219    } else {
1220        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1221    }
1222}
1223
1224// ========== MQTT Publish Handler ==========
1225
1226#[cfg(feature = "mqtt")]
1227/// Request to publish a single MQTT message
1228#[derive(Debug, Deserialize)]
1229pub struct MqttPublishRequest {
1230    /// Topic to publish to
1231    pub topic: String,
1232    /// Message payload (string or JSON)
1233    pub payload: String,
1234    /// QoS level (0, 1, or 2)
1235    #[serde(default = "default_qos")]
1236    pub qos: u8,
1237    /// Whether to retain the message
1238    #[serde(default)]
1239    pub retain: bool,
1240}
1241
1242#[cfg(feature = "mqtt")]
1243fn default_qos() -> u8 {
1244    0
1245}
1246
1247#[cfg(feature = "mqtt")]
1248/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1249async fn publish_mqtt_message_handler(
1250    State(state): State<ManagementState>,
1251    Json(request): Json<serde_json::Value>,
1252) -> impl IntoResponse {
1253    // Extract fields from JSON manually
1254    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1255    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1256    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1257    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1258
1259    if topic.is_none() || payload.is_none() {
1260        return (
1261            StatusCode::BAD_REQUEST,
1262            Json(serde_json::json!({
1263                "error": "Invalid request",
1264                "message": "Missing required fields: topic and payload"
1265            })),
1266        );
1267    }
1268
1269    let topic = topic.unwrap();
1270    let payload = payload.unwrap();
1271
1272    if let Some(broker) = &state.mqtt_broker {
1273        // Validate QoS
1274        if qos > 2 {
1275            return (
1276                StatusCode::BAD_REQUEST,
1277                Json(serde_json::json!({
1278                    "error": "Invalid QoS",
1279                    "message": "QoS must be 0, 1, or 2"
1280                })),
1281            );
1282        }
1283
1284        // Convert payload to bytes
1285        let payload_bytes = payload.as_bytes().to_vec();
1286        let client_id = "mockforge-management-api".to_string();
1287
1288        let publish_result = broker
1289            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1290            .await
1291            .map_err(|e| format!("{}", e));
1292
1293        match publish_result {
1294            Ok(_) => {
1295                // Emit message event for real-time monitoring
1296                let event = MessageEvent::Mqtt(MqttMessageEvent {
1297                    topic: topic.clone(),
1298                    payload: payload.clone(),
1299                    qos,
1300                    retain,
1301                    timestamp: chrono::Utc::now().to_rfc3339(),
1302                });
1303                let _ = state.message_events.send(event);
1304
1305                (
1306                    StatusCode::OK,
1307                    Json(serde_json::json!({
1308                        "success": true,
1309                        "message": format!("Message published to topic '{}'", topic),
1310                        "topic": topic,
1311                        "qos": qos,
1312                        "retain": retain
1313                    })),
1314                )
1315            }
1316            Err(error_msg) => (
1317                StatusCode::INTERNAL_SERVER_ERROR,
1318                Json(serde_json::json!({
1319                    "error": "Failed to publish message",
1320                    "message": error_msg
1321                })),
1322            ),
1323        }
1324    } else {
1325        (
1326            StatusCode::SERVICE_UNAVAILABLE,
1327            Json(serde_json::json!({
1328                "error": "MQTT broker not available",
1329                "message": "MQTT broker is not enabled or not available."
1330            })),
1331        )
1332    }
1333}
1334
1335#[cfg(not(feature = "mqtt"))]
1336/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1337async fn publish_mqtt_message_handler(
1338    State(_state): State<ManagementState>,
1339    Json(_request): Json<serde_json::Value>,
1340) -> impl IntoResponse {
1341    (
1342        StatusCode::SERVICE_UNAVAILABLE,
1343        Json(serde_json::json!({
1344            "error": "MQTT feature not enabled",
1345            "message": "MQTT support is not compiled into this build"
1346        })),
1347    )
1348}
1349
1350#[cfg(feature = "mqtt")]
1351/// Request to publish multiple MQTT messages
1352#[derive(Debug, Deserialize)]
1353pub struct MqttBatchPublishRequest {
1354    /// List of messages to publish
1355    pub messages: Vec<MqttPublishRequest>,
1356    /// Delay between messages in milliseconds
1357    #[serde(default = "default_delay")]
1358    pub delay_ms: u64,
1359}
1360
1361#[cfg(feature = "mqtt")]
1362fn default_delay() -> u64 {
1363    100
1364}
1365
1366#[cfg(feature = "mqtt")]
1367/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1368async fn publish_mqtt_batch_handler(
1369    State(state): State<ManagementState>,
1370    Json(request): Json<serde_json::Value>,
1371) -> impl IntoResponse {
1372    // Extract fields from JSON manually
1373    let messages_json = request.get("messages").and_then(|v| v.as_array());
1374    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1375
1376    if messages_json.is_none() {
1377        return (
1378            StatusCode::BAD_REQUEST,
1379            Json(serde_json::json!({
1380                "error": "Invalid request",
1381                "message": "Missing required field: messages"
1382            })),
1383        );
1384    }
1385
1386    let messages_json = messages_json.unwrap();
1387
1388    if let Some(broker) = &state.mqtt_broker {
1389        if messages_json.is_empty() {
1390            return (
1391                StatusCode::BAD_REQUEST,
1392                Json(serde_json::json!({
1393                    "error": "Empty batch",
1394                    "message": "At least one message is required"
1395                })),
1396            );
1397        }
1398
1399        let mut results = Vec::new();
1400        let client_id = "mockforge-management-api".to_string();
1401
1402        for (index, msg_json) in messages_json.iter().enumerate() {
1403            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1404            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1405            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1406            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1407
1408            if topic.is_none() || payload.is_none() {
1409                results.push(serde_json::json!({
1410                    "index": index,
1411                    "success": false,
1412                    "error": "Missing required fields: topic and payload"
1413                }));
1414                continue;
1415            }
1416
1417            let topic = topic.unwrap();
1418            let payload = payload.unwrap();
1419
1420            // Validate QoS
1421            if qos > 2 {
1422                results.push(serde_json::json!({
1423                    "index": index,
1424                    "success": false,
1425                    "error": "Invalid QoS (must be 0, 1, or 2)"
1426                }));
1427                continue;
1428            }
1429
1430            // Convert payload to bytes
1431            let payload_bytes = payload.as_bytes().to_vec();
1432
1433            let publish_result = broker
1434                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1435                .await
1436                .map_err(|e| format!("{}", e));
1437
1438            match publish_result {
1439                Ok(_) => {
1440                    // Emit message event
1441                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1442                        topic: topic.clone(),
1443                        payload: payload.clone(),
1444                        qos,
1445                        retain,
1446                        timestamp: chrono::Utc::now().to_rfc3339(),
1447                    });
1448                    let _ = state.message_events.send(event);
1449
1450                    results.push(serde_json::json!({
1451                        "index": index,
1452                        "success": true,
1453                        "topic": topic,
1454                        "qos": qos
1455                    }));
1456                }
1457                Err(error_msg) => {
1458                    results.push(serde_json::json!({
1459                        "index": index,
1460                        "success": false,
1461                        "error": error_msg
1462                    }));
1463                }
1464            }
1465
1466            // Add delay between messages (except for the last one)
1467            if index < messages_json.len() - 1 && delay_ms > 0 {
1468                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1469            }
1470        }
1471
1472        let success_count =
1473            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1474
1475        (
1476            StatusCode::OK,
1477            Json(serde_json::json!({
1478                "success": true,
1479                "total": messages_json.len(),
1480                "succeeded": success_count,
1481                "failed": messages_json.len() - success_count,
1482                "results": results
1483            })),
1484        )
1485    } else {
1486        (
1487            StatusCode::SERVICE_UNAVAILABLE,
1488            Json(serde_json::json!({
1489                "error": "MQTT broker not available",
1490                "message": "MQTT broker is not enabled or not available."
1491            })),
1492        )
1493    }
1494}
1495
1496#[cfg(not(feature = "mqtt"))]
1497/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1498async fn publish_mqtt_batch_handler(
1499    State(_state): State<ManagementState>,
1500    Json(_request): Json<serde_json::Value>,
1501) -> impl IntoResponse {
1502    (
1503        StatusCode::SERVICE_UNAVAILABLE,
1504        Json(serde_json::json!({
1505            "error": "MQTT feature not enabled",
1506            "message": "MQTT support is not compiled into this build"
1507        })),
1508    )
1509}
1510
1511// Migration pipeline handlers
1512
1513/// Request to set migration mode
1514#[derive(Debug, Deserialize)]
1515struct SetMigrationModeRequest {
1516    mode: String,
1517}
1518
1519/// Get all migration routes
1520async fn get_migration_routes(
1521    State(state): State<ManagementState>,
1522) -> Result<Json<serde_json::Value>, StatusCode> {
1523    let proxy_config = match &state.proxy_config {
1524        Some(config) => config,
1525        None => {
1526            return Ok(Json(serde_json::json!({
1527                "error": "Migration not configured. Proxy config not available."
1528            })));
1529        }
1530    };
1531
1532    let config = proxy_config.read().await;
1533    let routes = config.get_migration_routes();
1534
1535    Ok(Json(serde_json::json!({
1536        "routes": routes
1537    })))
1538}
1539
1540/// Toggle a route's migration mode
1541async fn toggle_route_migration(
1542    State(state): State<ManagementState>,
1543    Path(pattern): Path<String>,
1544) -> Result<Json<serde_json::Value>, StatusCode> {
1545    let proxy_config = match &state.proxy_config {
1546        Some(config) => config,
1547        None => {
1548            return Ok(Json(serde_json::json!({
1549                "error": "Migration not configured. Proxy config not available."
1550            })));
1551        }
1552    };
1553
1554    let mut config = proxy_config.write().await;
1555    let new_mode = match config.toggle_route_migration(&pattern) {
1556        Some(mode) => mode,
1557        None => {
1558            return Ok(Json(serde_json::json!({
1559                "error": format!("Route pattern not found: {}", pattern)
1560            })));
1561        }
1562    };
1563
1564    Ok(Json(serde_json::json!({
1565        "pattern": pattern,
1566        "mode": format!("{:?}", new_mode).to_lowercase()
1567    })))
1568}
1569
1570/// Set a route's migration mode explicitly
1571async fn set_route_migration_mode(
1572    State(state): State<ManagementState>,
1573    Path(pattern): Path<String>,
1574    Json(request): Json<SetMigrationModeRequest>,
1575) -> Result<Json<serde_json::Value>, StatusCode> {
1576    let proxy_config = match &state.proxy_config {
1577        Some(config) => config,
1578        None => {
1579            return Ok(Json(serde_json::json!({
1580                "error": "Migration not configured. Proxy config not available."
1581            })));
1582        }
1583    };
1584
1585    use mockforge_core::proxy::config::MigrationMode;
1586    let mode = match request.mode.to_lowercase().as_str() {
1587        "mock" => MigrationMode::Mock,
1588        "shadow" => MigrationMode::Shadow,
1589        "real" => MigrationMode::Real,
1590        "auto" => MigrationMode::Auto,
1591        _ => {
1592            return Ok(Json(serde_json::json!({
1593                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1594            })));
1595        }
1596    };
1597
1598    let mut config = proxy_config.write().await;
1599    let updated = config.update_rule_migration_mode(&pattern, mode);
1600
1601    if !updated {
1602        return Ok(Json(serde_json::json!({
1603            "error": format!("Route pattern not found: {}", pattern)
1604        })));
1605    }
1606
1607    Ok(Json(serde_json::json!({
1608        "pattern": pattern,
1609        "mode": format!("{:?}", mode).to_lowercase()
1610    })))
1611}
1612
1613/// Toggle a group's migration mode
1614async fn toggle_group_migration(
1615    State(state): State<ManagementState>,
1616    Path(group): Path<String>,
1617) -> Result<Json<serde_json::Value>, StatusCode> {
1618    let proxy_config = match &state.proxy_config {
1619        Some(config) => config,
1620        None => {
1621            return Ok(Json(serde_json::json!({
1622                "error": "Migration not configured. Proxy config not available."
1623            })));
1624        }
1625    };
1626
1627    let mut config = proxy_config.write().await;
1628    let new_mode = config.toggle_group_migration(&group);
1629
1630    Ok(Json(serde_json::json!({
1631        "group": group,
1632        "mode": format!("{:?}", new_mode).to_lowercase()
1633    })))
1634}
1635
1636/// Set a group's migration mode explicitly
1637async fn set_group_migration_mode(
1638    State(state): State<ManagementState>,
1639    Path(group): Path<String>,
1640    Json(request): Json<SetMigrationModeRequest>,
1641) -> Result<Json<serde_json::Value>, StatusCode> {
1642    let proxy_config = match &state.proxy_config {
1643        Some(config) => config,
1644        None => {
1645            return Ok(Json(serde_json::json!({
1646                "error": "Migration not configured. Proxy config not available."
1647            })));
1648        }
1649    };
1650
1651    use mockforge_core::proxy::config::MigrationMode;
1652    let mode = match request.mode.to_lowercase().as_str() {
1653        "mock" => MigrationMode::Mock,
1654        "shadow" => MigrationMode::Shadow,
1655        "real" => MigrationMode::Real,
1656        "auto" => MigrationMode::Auto,
1657        _ => {
1658            return Ok(Json(serde_json::json!({
1659                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1660            })));
1661        }
1662    };
1663
1664    let mut config = proxy_config.write().await;
1665    config.update_group_migration_mode(&group, mode);
1666
1667    Ok(Json(serde_json::json!({
1668        "group": group,
1669        "mode": format!("{:?}", mode).to_lowercase()
1670    })))
1671}
1672
1673/// Get all migration groups
1674async fn get_migration_groups(
1675    State(state): State<ManagementState>,
1676) -> Result<Json<serde_json::Value>, StatusCode> {
1677    let proxy_config = match &state.proxy_config {
1678        Some(config) => config,
1679        None => {
1680            return Ok(Json(serde_json::json!({
1681                "error": "Migration not configured. Proxy config not available."
1682            })));
1683        }
1684    };
1685
1686    let config = proxy_config.read().await;
1687    let groups = config.get_migration_groups();
1688
1689    // Convert to JSON-serializable format
1690    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1691        .into_iter()
1692        .map(|(name, info)| {
1693            (
1694                name,
1695                serde_json::json!({
1696                    "name": info.name,
1697                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1698                    "route_count": info.route_count
1699                }),
1700            )
1701        })
1702        .collect();
1703
1704    Ok(Json(serde_json::json!(groups_json)))
1705}
1706
1707/// Get overall migration status
1708async fn get_migration_status(
1709    State(state): State<ManagementState>,
1710) -> Result<Json<serde_json::Value>, StatusCode> {
1711    let proxy_config = match &state.proxy_config {
1712        Some(config) => config,
1713        None => {
1714            return Ok(Json(serde_json::json!({
1715                "error": "Migration not configured. Proxy config not available."
1716            })));
1717        }
1718    };
1719
1720    let config = proxy_config.read().await;
1721    let routes = config.get_migration_routes();
1722    let groups = config.get_migration_groups();
1723
1724    let mut mock_count = 0;
1725    let mut shadow_count = 0;
1726    let mut real_count = 0;
1727    let mut auto_count = 0;
1728
1729    for route in &routes {
1730        match route.migration_mode {
1731            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1732            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1733            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1734            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1735        }
1736    }
1737
1738    Ok(Json(serde_json::json!({
1739        "total_routes": routes.len(),
1740        "mock_routes": mock_count,
1741        "shadow_routes": shadow_count,
1742        "real_routes": real_count,
1743        "auto_routes": auto_count,
1744        "total_groups": groups.len(),
1745        "migration_enabled": config.migration_enabled
1746    })))
1747}
1748
1749// ========== Proxy Replacement Rules Management ==========
1750
1751/// Request body for creating/updating proxy replacement rules
1752#[derive(Debug, Deserialize, Serialize)]
1753pub struct ProxyRuleRequest {
1754    /// URL pattern to match (supports wildcards like "/api/users/*")
1755    pub pattern: String,
1756    /// Rule type: "request" or "response"
1757    #[serde(rename = "type")]
1758    pub rule_type: String,
1759    /// Optional status code filter for response rules
1760    #[serde(default)]
1761    pub status_codes: Vec<u16>,
1762    /// Body transformations to apply
1763    pub body_transforms: Vec<BodyTransformRequest>,
1764    /// Whether this rule is enabled
1765    #[serde(default = "default_true")]
1766    pub enabled: bool,
1767}
1768
1769/// Request body for individual body transformations
1770#[derive(Debug, Deserialize, Serialize)]
1771pub struct BodyTransformRequest {
1772    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1773    pub path: String,
1774    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1775    pub replace: String,
1776    /// Operation to perform: "replace", "add", or "remove"
1777    #[serde(default)]
1778    pub operation: String,
1779}
1780
1781/// Response format for proxy rules
1782#[derive(Debug, Serialize)]
1783pub struct ProxyRuleResponse {
1784    /// Rule ID (index in the array)
1785    pub id: usize,
1786    /// URL pattern
1787    pub pattern: String,
1788    /// Rule type
1789    #[serde(rename = "type")]
1790    pub rule_type: String,
1791    /// Status codes (for response rules)
1792    pub status_codes: Vec<u16>,
1793    /// Body transformations
1794    pub body_transforms: Vec<BodyTransformRequest>,
1795    /// Whether enabled
1796    pub enabled: bool,
1797}
1798
1799/// List all proxy replacement rules
1800async fn list_proxy_rules(
1801    State(state): State<ManagementState>,
1802) -> Result<Json<serde_json::Value>, StatusCode> {
1803    let proxy_config = match &state.proxy_config {
1804        Some(config) => config,
1805        None => {
1806            return Ok(Json(serde_json::json!({
1807                "error": "Proxy not configured. Proxy config not available."
1808            })));
1809        }
1810    };
1811
1812    let config = proxy_config.read().await;
1813
1814    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1815
1816    // Add request replacement rules
1817    for (idx, rule) in config.request_replacements.iter().enumerate() {
1818        rules.push(ProxyRuleResponse {
1819            id: idx,
1820            pattern: rule.pattern.clone(),
1821            rule_type: "request".to_string(),
1822            status_codes: Vec::new(),
1823            body_transforms: rule
1824                .body_transforms
1825                .iter()
1826                .map(|t| BodyTransformRequest {
1827                    path: t.path.clone(),
1828                    replace: t.replace.clone(),
1829                    operation: format!("{:?}", t.operation).to_lowercase(),
1830                })
1831                .collect(),
1832            enabled: rule.enabled,
1833        });
1834    }
1835
1836    // Add response replacement rules
1837    let request_count = config.request_replacements.len();
1838    for (idx, rule) in config.response_replacements.iter().enumerate() {
1839        rules.push(ProxyRuleResponse {
1840            id: request_count + idx,
1841            pattern: rule.pattern.clone(),
1842            rule_type: "response".to_string(),
1843            status_codes: rule.status_codes.clone(),
1844            body_transforms: rule
1845                .body_transforms
1846                .iter()
1847                .map(|t| BodyTransformRequest {
1848                    path: t.path.clone(),
1849                    replace: t.replace.clone(),
1850                    operation: format!("{:?}", t.operation).to_lowercase(),
1851                })
1852                .collect(),
1853            enabled: rule.enabled,
1854        });
1855    }
1856
1857    Ok(Json(serde_json::json!({
1858        "rules": rules
1859    })))
1860}
1861
1862/// Create a new proxy replacement rule
1863async fn create_proxy_rule(
1864    State(state): State<ManagementState>,
1865    Json(request): Json<ProxyRuleRequest>,
1866) -> Result<Json<serde_json::Value>, StatusCode> {
1867    let proxy_config = match &state.proxy_config {
1868        Some(config) => config,
1869        None => {
1870            return Ok(Json(serde_json::json!({
1871                "error": "Proxy not configured. Proxy config not available."
1872            })));
1873        }
1874    };
1875
1876    // Validate request
1877    if request.body_transforms.is_empty() {
1878        return Ok(Json(serde_json::json!({
1879            "error": "At least one body transform is required"
1880        })));
1881    }
1882
1883    let body_transforms: Vec<BodyTransform> = request
1884        .body_transforms
1885        .iter()
1886        .map(|t| {
1887            let op = match t.operation.as_str() {
1888                "replace" => TransformOperation::Replace,
1889                "add" => TransformOperation::Add,
1890                "remove" => TransformOperation::Remove,
1891                _ => TransformOperation::Replace,
1892            };
1893            BodyTransform {
1894                path: t.path.clone(),
1895                replace: t.replace.clone(),
1896                operation: op,
1897            }
1898        })
1899        .collect();
1900
1901    let new_rule = BodyTransformRule {
1902        pattern: request.pattern.clone(),
1903        status_codes: request.status_codes.clone(),
1904        body_transforms,
1905        enabled: request.enabled,
1906    };
1907
1908    let mut config = proxy_config.write().await;
1909
1910    let rule_id = if request.rule_type == "request" {
1911        config.request_replacements.push(new_rule);
1912        config.request_replacements.len() - 1
1913    } else if request.rule_type == "response" {
1914        config.response_replacements.push(new_rule);
1915        config.request_replacements.len() + config.response_replacements.len() - 1
1916    } else {
1917        return Ok(Json(serde_json::json!({
1918            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1919        })));
1920    };
1921
1922    Ok(Json(serde_json::json!({
1923        "id": rule_id,
1924        "message": "Rule created successfully"
1925    })))
1926}
1927
1928/// Get a specific proxy replacement rule
1929async fn get_proxy_rule(
1930    State(state): State<ManagementState>,
1931    Path(id): Path<String>,
1932) -> Result<Json<serde_json::Value>, StatusCode> {
1933    let proxy_config = match &state.proxy_config {
1934        Some(config) => config,
1935        None => {
1936            return Ok(Json(serde_json::json!({
1937                "error": "Proxy not configured. Proxy config not available."
1938            })));
1939        }
1940    };
1941
1942    let config = proxy_config.read().await;
1943    let rule_id: usize = match id.parse() {
1944        Ok(id) => id,
1945        Err(_) => {
1946            return Ok(Json(serde_json::json!({
1947                "error": format!("Invalid rule ID: {}", id)
1948            })));
1949        }
1950    };
1951
1952    let request_count = config.request_replacements.len();
1953
1954    if rule_id < request_count {
1955        // Request rule
1956        let rule = &config.request_replacements[rule_id];
1957        Ok(Json(serde_json::json!({
1958            "id": rule_id,
1959            "pattern": rule.pattern,
1960            "type": "request",
1961            "status_codes": [],
1962            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1963                "path": t.path,
1964                "replace": t.replace,
1965                "operation": format!("{:?}", t.operation).to_lowercase()
1966            })).collect::<Vec<_>>(),
1967            "enabled": rule.enabled
1968        })))
1969    } else if rule_id < request_count + config.response_replacements.len() {
1970        // Response rule
1971        let response_idx = rule_id - request_count;
1972        let rule = &config.response_replacements[response_idx];
1973        Ok(Json(serde_json::json!({
1974            "id": rule_id,
1975            "pattern": rule.pattern,
1976            "type": "response",
1977            "status_codes": rule.status_codes,
1978            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1979                "path": t.path,
1980                "replace": t.replace,
1981                "operation": format!("{:?}", t.operation).to_lowercase()
1982            })).collect::<Vec<_>>(),
1983            "enabled": rule.enabled
1984        })))
1985    } else {
1986        Ok(Json(serde_json::json!({
1987            "error": format!("Rule ID {} not found", rule_id)
1988        })))
1989    }
1990}
1991
1992/// Update a proxy replacement rule
1993async fn update_proxy_rule(
1994    State(state): State<ManagementState>,
1995    Path(id): Path<String>,
1996    Json(request): Json<ProxyRuleRequest>,
1997) -> Result<Json<serde_json::Value>, StatusCode> {
1998    let proxy_config = match &state.proxy_config {
1999        Some(config) => config,
2000        None => {
2001            return Ok(Json(serde_json::json!({
2002                "error": "Proxy not configured. Proxy config not available."
2003            })));
2004        }
2005    };
2006
2007    let mut config = proxy_config.write().await;
2008    let rule_id: usize = match id.parse() {
2009        Ok(id) => id,
2010        Err(_) => {
2011            return Ok(Json(serde_json::json!({
2012                "error": format!("Invalid rule ID: {}", id)
2013            })));
2014        }
2015    };
2016
2017    let body_transforms: Vec<BodyTransform> = request
2018        .body_transforms
2019        .iter()
2020        .map(|t| {
2021            let op = match t.operation.as_str() {
2022                "replace" => TransformOperation::Replace,
2023                "add" => TransformOperation::Add,
2024                "remove" => TransformOperation::Remove,
2025                _ => TransformOperation::Replace,
2026            };
2027            BodyTransform {
2028                path: t.path.clone(),
2029                replace: t.replace.clone(),
2030                operation: op,
2031            }
2032        })
2033        .collect();
2034
2035    let updated_rule = BodyTransformRule {
2036        pattern: request.pattern.clone(),
2037        status_codes: request.status_codes.clone(),
2038        body_transforms,
2039        enabled: request.enabled,
2040    };
2041
2042    let request_count = config.request_replacements.len();
2043
2044    if rule_id < request_count {
2045        // Update request rule
2046        config.request_replacements[rule_id] = updated_rule;
2047    } else if rule_id < request_count + config.response_replacements.len() {
2048        // Update response rule
2049        let response_idx = rule_id - request_count;
2050        config.response_replacements[response_idx] = updated_rule;
2051    } else {
2052        return Ok(Json(serde_json::json!({
2053            "error": format!("Rule ID {} not found", rule_id)
2054        })));
2055    }
2056
2057    Ok(Json(serde_json::json!({
2058        "id": rule_id,
2059        "message": "Rule updated successfully"
2060    })))
2061}
2062
2063/// Delete a proxy replacement rule
2064async fn delete_proxy_rule(
2065    State(state): State<ManagementState>,
2066    Path(id): Path<String>,
2067) -> Result<Json<serde_json::Value>, StatusCode> {
2068    let proxy_config = match &state.proxy_config {
2069        Some(config) => config,
2070        None => {
2071            return Ok(Json(serde_json::json!({
2072                "error": "Proxy not configured. Proxy config not available."
2073            })));
2074        }
2075    };
2076
2077    let mut config = proxy_config.write().await;
2078    let rule_id: usize = match id.parse() {
2079        Ok(id) => id,
2080        Err(_) => {
2081            return Ok(Json(serde_json::json!({
2082                "error": format!("Invalid rule ID: {}", id)
2083            })));
2084        }
2085    };
2086
2087    let request_count = config.request_replacements.len();
2088
2089    if rule_id < request_count {
2090        // Delete request rule
2091        config.request_replacements.remove(rule_id);
2092    } else if rule_id < request_count + config.response_replacements.len() {
2093        // Delete response rule
2094        let response_idx = rule_id - request_count;
2095        config.response_replacements.remove(response_idx);
2096    } else {
2097        return Ok(Json(serde_json::json!({
2098            "error": format!("Rule ID {} not found", rule_id)
2099        })));
2100    }
2101
2102    Ok(Json(serde_json::json!({
2103        "id": rule_id,
2104        "message": "Rule deleted successfully"
2105    })))
2106}
2107
2108/// Get recent intercepted requests/responses for inspection
2109/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2110async fn get_proxy_inspect(
2111    State(_state): State<ManagementState>,
2112    Query(params): Query<std::collections::HashMap<String, String>>,
2113) -> Result<Json<serde_json::Value>, StatusCode> {
2114    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2115
2116    // TODO: In a full implementation, this would return actual intercepted requests/responses
2117    // For now, return a placeholder response
2118    Ok(Json(serde_json::json!({
2119        "requests": [],
2120        "responses": [],
2121        "limit": limit,
2122        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic in a future version."
2123    })))
2124}
2125
2126/// Build the management API router
2127pub fn management_router(state: ManagementState) -> Router {
2128    let router = Router::new()
2129        .route("/health", get(health_check))
2130        .route("/stats", get(get_stats))
2131        .route("/config", get(get_config))
2132        .route("/config/validate", post(validate_config))
2133        .route("/config/bulk", post(bulk_update_config))
2134        .route("/mocks", get(list_mocks))
2135        .route("/mocks", post(create_mock))
2136        .route("/mocks/{id}", get(get_mock))
2137        .route("/mocks/{id}", put(update_mock))
2138        .route("/mocks/{id}", delete(delete_mock))
2139        .route("/export", get(export_mocks))
2140        .route("/import", post(import_mocks));
2141
2142    #[cfg(feature = "smtp")]
2143    let router = router
2144        .route("/smtp/mailbox", get(list_smtp_emails))
2145        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2146        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2147        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2148        .route("/smtp/mailbox/search", get(search_smtp_emails));
2149
2150    #[cfg(not(feature = "smtp"))]
2151    let router = router;
2152
2153    // MQTT routes
2154    #[cfg(feature = "mqtt")]
2155    let router = router
2156        .route("/mqtt/stats", get(get_mqtt_stats))
2157        .route("/mqtt/clients", get(get_mqtt_clients))
2158        .route("/mqtt/topics", get(get_mqtt_topics))
2159        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2160        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2161        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2162        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2163
2164    #[cfg(not(feature = "mqtt"))]
2165    let router = router
2166        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2167        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2168
2169    #[cfg(feature = "kafka")]
2170    let router = router
2171        .route("/kafka/stats", get(get_kafka_stats))
2172        .route("/kafka/topics", get(get_kafka_topics))
2173        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2174        .route("/kafka/groups", get(get_kafka_groups))
2175        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2176        .route("/kafka/produce", post(produce_kafka_message))
2177        .route("/kafka/produce/batch", post(produce_kafka_batch))
2178        .route("/kafka/messages/stream", get(kafka_messages_stream));
2179
2180    #[cfg(not(feature = "kafka"))]
2181    let router = router;
2182
2183    // Migration pipeline routes
2184    let router = router
2185        .route("/migration/routes", get(get_migration_routes))
2186        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2187        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2188        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2189        .route("/migration/groups/{group}", put(set_group_migration_mode))
2190        .route("/migration/groups", get(get_migration_groups))
2191        .route("/migration/status", get(get_migration_status));
2192
2193    // Proxy replacement rules routes
2194    let router = router
2195        .route("/proxy/rules", get(list_proxy_rules))
2196        .route("/proxy/rules", post(create_proxy_rule))
2197        .route("/proxy/rules/{id}", get(get_proxy_rule))
2198        .route("/proxy/rules/{id}", put(update_proxy_rule))
2199        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2200        .route("/proxy/inspect", get(get_proxy_inspect));
2201
2202    // AI-powered features
2203    let router = router
2204        .route("/ai/generate-spec", post(generate_ai_spec))
2205        .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2206        .route("/mockai/learn", post(learn_from_examples))
2207        .route("/mockai/rules/explanations", get(list_rule_explanations))
2208        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2209        .route("/chaos/config", get(get_chaos_config))
2210        .route("/chaos/config", post(update_chaos_config))
2211        .route("/network/profiles", get(list_network_profiles))
2212        .route("/network/profile/apply", post(apply_network_profile));
2213
2214    // State machine API routes
2215    let router =
2216        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2217
2218    router.with_state(state)
2219}
2220
2221#[cfg(feature = "kafka")]
2222#[derive(Debug, Clone, Serialize, Deserialize)]
2223pub struct KafkaBrokerStats {
2224    /// Number of topics
2225    pub topics: usize,
2226    /// Total number of partitions
2227    pub partitions: usize,
2228    /// Number of consumer groups
2229    pub consumer_groups: usize,
2230    /// Total messages produced
2231    pub messages_produced: u64,
2232    /// Total messages consumed
2233    pub messages_consumed: u64,
2234}
2235
2236#[cfg(feature = "kafka")]
2237#[derive(Debug, Clone, Serialize, Deserialize)]
2238pub struct KafkaTopicInfo {
2239    pub name: String,
2240    pub partitions: usize,
2241    pub replication_factor: i32,
2242}
2243
2244#[cfg(feature = "kafka")]
2245#[derive(Debug, Clone, Serialize, Deserialize)]
2246pub struct KafkaConsumerGroupInfo {
2247    pub group_id: String,
2248    pub members: usize,
2249    pub state: String,
2250}
2251
2252#[cfg(feature = "kafka")]
2253/// Get Kafka broker statistics
2254async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2255    if let Some(broker) = &state.kafka_broker {
2256        let topics = broker.topics.read().await;
2257        let consumer_groups = broker.consumer_groups.read().await;
2258        let metrics = broker.metrics.clone();
2259
2260        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2261        let snapshot = metrics.snapshot();
2262        let messages_produced = snapshot.messages_produced_total;
2263        let messages_consumed = snapshot.messages_consumed_total;
2264
2265        let stats = KafkaBrokerStats {
2266            topics: topics.len(),
2267            partitions: total_partitions,
2268            consumer_groups: consumer_groups.groups().len(),
2269            messages_produced,
2270            messages_consumed,
2271        };
2272
2273        Json(stats).into_response()
2274    } else {
2275        (
2276            StatusCode::SERVICE_UNAVAILABLE,
2277            Json(serde_json::json!({
2278                "error": "Kafka broker not available",
2279                "message": "Kafka broker is not enabled or not available."
2280            })),
2281        )
2282            .into_response()
2283    }
2284}
2285
2286#[cfg(feature = "kafka")]
2287/// List Kafka topics
2288async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2289    if let Some(broker) = &state.kafka_broker {
2290        let topics = broker.topics.read().await;
2291        let topic_list: Vec<KafkaTopicInfo> = topics
2292            .iter()
2293            .map(|(name, topic)| KafkaTopicInfo {
2294                name: name.clone(),
2295                partitions: topic.partitions.len(),
2296                replication_factor: topic.config.replication_factor,
2297            })
2298            .collect();
2299
2300        Json(serde_json::json!({
2301            "topics": topic_list
2302        }))
2303        .into_response()
2304    } else {
2305        (
2306            StatusCode::SERVICE_UNAVAILABLE,
2307            Json(serde_json::json!({
2308                "error": "Kafka broker not available",
2309                "message": "Kafka broker is not enabled or not available."
2310            })),
2311        )
2312            .into_response()
2313    }
2314}
2315
2316#[cfg(feature = "kafka")]
2317/// Get Kafka topic details
2318async fn get_kafka_topic(
2319    State(state): State<ManagementState>,
2320    Path(topic_name): Path<String>,
2321) -> impl IntoResponse {
2322    if let Some(broker) = &state.kafka_broker {
2323        let topics = broker.topics.read().await;
2324        if let Some(topic) = topics.get(&topic_name) {
2325            Json(serde_json::json!({
2326                "name": topic_name,
2327                "partitions": topic.partitions.len(),
2328                "replication_factor": topic.config.replication_factor,
2329                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2330                    "id": idx as i32,
2331                    "leader": 0,
2332                    "replicas": vec![0],
2333                    "message_count": partition.messages.len()
2334                })).collect::<Vec<_>>()
2335            })).into_response()
2336        } else {
2337            (
2338                StatusCode::NOT_FOUND,
2339                Json(serde_json::json!({
2340                    "error": "Topic not found",
2341                    "topic": topic_name
2342                })),
2343            )
2344                .into_response()
2345        }
2346    } else {
2347        (
2348            StatusCode::SERVICE_UNAVAILABLE,
2349            Json(serde_json::json!({
2350                "error": "Kafka broker not available",
2351                "message": "Kafka broker is not enabled or not available."
2352            })),
2353        )
2354            .into_response()
2355    }
2356}
2357
2358#[cfg(feature = "kafka")]
2359/// List Kafka consumer groups
2360async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2361    if let Some(broker) = &state.kafka_broker {
2362        let consumer_groups = broker.consumer_groups.read().await;
2363        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2364            .groups()
2365            .iter()
2366            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2367                group_id: group_id.clone(),
2368                members: group.members.len(),
2369                state: "Stable".to_string(), // Simplified - could be more detailed
2370            })
2371            .collect();
2372
2373        Json(serde_json::json!({
2374            "groups": groups
2375        }))
2376        .into_response()
2377    } else {
2378        (
2379            StatusCode::SERVICE_UNAVAILABLE,
2380            Json(serde_json::json!({
2381                "error": "Kafka broker not available",
2382                "message": "Kafka broker is not enabled or not available."
2383            })),
2384        )
2385            .into_response()
2386    }
2387}
2388
2389#[cfg(feature = "kafka")]
2390/// Get Kafka consumer group details
2391async fn get_kafka_group(
2392    State(state): State<ManagementState>,
2393    Path(group_id): Path<String>,
2394) -> impl IntoResponse {
2395    if let Some(broker) = &state.kafka_broker {
2396        let consumer_groups = broker.consumer_groups.read().await;
2397        if let Some(group) = consumer_groups.groups().get(&group_id) {
2398            Json(serde_json::json!({
2399                "group_id": group_id,
2400                "members": group.members.len(),
2401                "state": "Stable",
2402                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2403                    "member_id": member_id,
2404                    "client_id": member.client_id,
2405                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2406                        "topic": a.topic,
2407                        "partitions": a.partitions
2408                    })).collect::<Vec<_>>()
2409                })).collect::<Vec<_>>(),
2410                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2411                    "topic": topic,
2412                    "partition": partition,
2413                    "offset": offset
2414                })).collect::<Vec<_>>()
2415            })).into_response()
2416        } else {
2417            (
2418                StatusCode::NOT_FOUND,
2419                Json(serde_json::json!({
2420                    "error": "Consumer group not found",
2421                    "group_id": group_id
2422                })),
2423            )
2424                .into_response()
2425        }
2426    } else {
2427        (
2428            StatusCode::SERVICE_UNAVAILABLE,
2429            Json(serde_json::json!({
2430                "error": "Kafka broker not available",
2431                "message": "Kafka broker is not enabled or not available."
2432            })),
2433        )
2434            .into_response()
2435    }
2436}
2437
2438// ========== Kafka Produce Handler ==========
2439
2440#[cfg(feature = "kafka")]
2441#[derive(Debug, Deserialize)]
2442pub struct KafkaProduceRequest {
2443    /// Topic to produce to
2444    pub topic: String,
2445    /// Message key (optional)
2446    #[serde(default)]
2447    pub key: Option<String>,
2448    /// Message value (JSON string or plain string)
2449    pub value: String,
2450    /// Partition ID (optional, auto-assigned if not provided)
2451    #[serde(default)]
2452    pub partition: Option<i32>,
2453    /// Message headers (optional, key-value pairs)
2454    #[serde(default)]
2455    pub headers: Option<std::collections::HashMap<String, String>>,
2456}
2457
2458#[cfg(feature = "kafka")]
2459/// Produce a message to a Kafka topic
2460async fn produce_kafka_message(
2461    State(state): State<ManagementState>,
2462    Json(request): Json<KafkaProduceRequest>,
2463) -> impl IntoResponse {
2464    if let Some(broker) = &state.kafka_broker {
2465        let mut topics = broker.topics.write().await;
2466
2467        // Get or create the topic
2468        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2469            crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2470        });
2471
2472        // Determine partition
2473        let partition_id = if let Some(partition) = request.partition {
2474            partition
2475        } else {
2476            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2477        };
2478
2479        // Validate partition exists
2480        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2481            return (
2482                StatusCode::BAD_REQUEST,
2483                Json(serde_json::json!({
2484                    "error": "Invalid partition",
2485                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2486                })),
2487            )
2488                .into_response();
2489        }
2490
2491        // Create the message
2492        let message = crate::partitions::KafkaMessage {
2493            offset: 0, // Will be set by partition.append
2494            timestamp: chrono::Utc::now().timestamp_millis(),
2495            key: request.key.map(|k| k.as_bytes().to_vec()),
2496            value: request.value.as_bytes().to_vec(),
2497            headers: request
2498                .headers
2499                .unwrap_or_default()
2500                .into_iter()
2501                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2502                .collect(),
2503        };
2504
2505        // Produce to partition
2506        match topic_entry.produce(partition_id, message).await {
2507            Ok(offset) => {
2508                // Record metrics
2509                broker.metrics.record_messages_produced(1);
2510
2511                // Emit message event for real-time monitoring
2512                #[cfg(feature = "kafka")]
2513                {
2514                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2515                        topic: request.topic.clone(),
2516                        key: request.key.clone(),
2517                        value: request.value.clone(),
2518                        partition: partition_id,
2519                        offset,
2520                        headers: request.headers.clone(),
2521                        timestamp: chrono::Utc::now().to_rfc3339(),
2522                    });
2523                    let _ = state.message_events.send(event);
2524                }
2525
2526                Json(serde_json::json!({
2527                    "success": true,
2528                    "message": format!("Message produced to topic '{}'", request.topic),
2529                    "topic": request.topic,
2530                    "partition": partition_id,
2531                    "offset": offset
2532                }))
2533                .into_response()
2534            }
2535            Err(e) => (
2536                StatusCode::INTERNAL_SERVER_ERROR,
2537                Json(serde_json::json!({
2538                    "error": "Failed to produce message",
2539                    "message": e.to_string()
2540                })),
2541            )
2542                .into_response(),
2543        }
2544    } else {
2545        (
2546            StatusCode::SERVICE_UNAVAILABLE,
2547            Json(serde_json::json!({
2548                "error": "Kafka broker not available",
2549                "message": "Kafka broker is not enabled or not available."
2550            })),
2551        )
2552            .into_response()
2553    }
2554}
2555
2556#[cfg(feature = "kafka")]
2557#[derive(Debug, Deserialize)]
2558pub struct KafkaBatchProduceRequest {
2559    /// List of messages to produce
2560    pub messages: Vec<KafkaProduceRequest>,
2561    /// Delay between messages in milliseconds
2562    #[serde(default = "default_delay")]
2563    pub delay_ms: u64,
2564}
2565
2566#[cfg(feature = "kafka")]
2567/// Produce multiple messages to Kafka topics
2568async fn produce_kafka_batch(
2569    State(state): State<ManagementState>,
2570    Json(request): Json<KafkaBatchProduceRequest>,
2571) -> impl IntoResponse {
2572    if let Some(broker) = &state.kafka_broker {
2573        if request.messages.is_empty() {
2574            return (
2575                StatusCode::BAD_REQUEST,
2576                Json(serde_json::json!({
2577                    "error": "Empty batch",
2578                    "message": "At least one message is required"
2579                })),
2580            )
2581                .into_response();
2582        }
2583
2584        let mut results = Vec::new();
2585
2586        for (index, msg_request) in request.messages.iter().enumerate() {
2587            let mut topics = broker.topics.write().await;
2588
2589            // Get or create the topic
2590            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2591                crate::topics::Topic::new(
2592                    msg_request.topic.clone(),
2593                    crate::topics::TopicConfig::default(),
2594                )
2595            });
2596
2597            // Determine partition
2598            let partition_id = if let Some(partition) = msg_request.partition {
2599                partition
2600            } else {
2601                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2602            };
2603
2604            // Validate partition exists
2605            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2606                results.push(serde_json::json!({
2607                    "index": index,
2608                    "success": false,
2609                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2610                }));
2611                continue;
2612            }
2613
2614            // Create the message
2615            let message = crate::partitions::KafkaMessage {
2616                offset: 0,
2617                timestamp: chrono::Utc::now().timestamp_millis(),
2618                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2619                value: msg_request.value.as_bytes().to_vec(),
2620                headers: msg_request
2621                    .headers
2622                    .clone()
2623                    .unwrap_or_default()
2624                    .into_iter()
2625                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2626                    .collect(),
2627            };
2628
2629            // Produce to partition
2630            match topic_entry.produce(partition_id, message).await {
2631                Ok(offset) => {
2632                    broker.metrics.record_messages_produced(1);
2633
2634                    // Emit message event
2635                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2636                        topic: msg_request.topic.clone(),
2637                        key: msg_request.key.clone(),
2638                        value: msg_request.value.clone(),
2639                        partition: partition_id,
2640                        offset,
2641                        headers: msg_request.headers.clone(),
2642                        timestamp: chrono::Utc::now().to_rfc3339(),
2643                    });
2644                    let _ = state.message_events.send(event);
2645
2646                    results.push(serde_json::json!({
2647                        "index": index,
2648                        "success": true,
2649                        "topic": msg_request.topic,
2650                        "partition": partition_id,
2651                        "offset": offset
2652                    }));
2653                }
2654                Err(e) => {
2655                    results.push(serde_json::json!({
2656                        "index": index,
2657                        "success": false,
2658                        "error": e.to_string()
2659                    }));
2660                }
2661            }
2662
2663            // Add delay between messages (except for the last one)
2664            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2665                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2666            }
2667        }
2668
2669        let success_count =
2670            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2671
2672        Json(serde_json::json!({
2673            "success": true,
2674            "total": request.messages.len(),
2675            "succeeded": success_count,
2676            "failed": request.messages.len() - success_count,
2677            "results": results
2678        }))
2679        .into_response()
2680    } else {
2681        (
2682            StatusCode::SERVICE_UNAVAILABLE,
2683            Json(serde_json::json!({
2684                "error": "Kafka broker not available",
2685                "message": "Kafka broker is not enabled or not available."
2686            })),
2687        )
2688            .into_response()
2689    }
2690}
2691
2692// ========== Real-time Message Streaming (SSE) ==========
2693
2694#[cfg(feature = "mqtt")]
2695/// SSE stream for MQTT messages
2696async fn mqtt_messages_stream(
2697    State(state): State<ManagementState>,
2698    Query(params): Query<std::collections::HashMap<String, String>>,
2699) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2700    let mut rx = state.message_events.subscribe();
2701    let topic_filter = params.get("topic").cloned();
2702
2703    let stream = stream::unfold(rx, move |mut rx| {
2704        let topic_filter = topic_filter.clone();
2705
2706        async move {
2707            loop {
2708                match rx.recv().await {
2709                    Ok(MessageEvent::Mqtt(event)) => {
2710                        // Apply topic filter if specified
2711                        if let Some(filter) = &topic_filter {
2712                            if !event.topic.contains(filter) {
2713                                continue;
2714                            }
2715                        }
2716
2717                        let event_json = serde_json::json!({
2718                            "protocol": "mqtt",
2719                            "topic": event.topic,
2720                            "payload": event.payload,
2721                            "qos": event.qos,
2722                            "retain": event.retain,
2723                            "timestamp": event.timestamp,
2724                        });
2725
2726                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2727                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2728                            return Some((Ok(sse_event), rx));
2729                        }
2730                    }
2731                    #[cfg(feature = "kafka")]
2732                    Ok(MessageEvent::Kafka(_)) => {
2733                        // Skip Kafka events in MQTT stream
2734                        continue;
2735                    }
2736                    Err(broadcast::error::RecvError::Closed) => {
2737                        return None;
2738                    }
2739                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2740                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2741                        continue;
2742                    }
2743                }
2744            }
2745        }
2746    });
2747
2748    Sse::new(stream).keep_alive(
2749        axum::response::sse::KeepAlive::new()
2750            .interval(std::time::Duration::from_secs(15))
2751            .text("keep-alive-text"),
2752    )
2753}
2754
2755#[cfg(feature = "kafka")]
2756/// SSE stream for Kafka messages
2757async fn kafka_messages_stream(
2758    State(state): State<ManagementState>,
2759    Query(params): Query<std::collections::HashMap<String, String>>,
2760) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2761    let mut rx = state.message_events.subscribe();
2762    let topic_filter = params.get("topic").cloned();
2763
2764    let stream = stream::unfold(rx, move |mut rx| {
2765        let topic_filter = topic_filter.clone();
2766
2767        async move {
2768            loop {
2769                match rx.recv().await {
2770                    #[cfg(feature = "mqtt")]
2771                    Ok(MessageEvent::Mqtt(_)) => {
2772                        // Skip MQTT events in Kafka stream
2773                        continue;
2774                    }
2775                    Ok(MessageEvent::Kafka(event)) => {
2776                        // Apply topic filter if specified
2777                        if let Some(filter) = &topic_filter {
2778                            if !event.topic.contains(filter) {
2779                                continue;
2780                            }
2781                        }
2782
2783                        let event_json = serde_json::json!({
2784                            "protocol": "kafka",
2785                            "topic": event.topic,
2786                            "key": event.key,
2787                            "value": event.value,
2788                            "partition": event.partition,
2789                            "offset": event.offset,
2790                            "headers": event.headers,
2791                            "timestamp": event.timestamp,
2792                        });
2793
2794                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2795                            let sse_event =
2796                                Event::default().event("kafka_message").data(event_data);
2797                            return Some((Ok(sse_event), rx));
2798                        }
2799                    }
2800                    Err(broadcast::error::RecvError::Closed) => {
2801                        return None;
2802                    }
2803                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2804                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2805                        continue;
2806                    }
2807                }
2808            }
2809        }
2810    });
2811
2812    Sse::new(stream).keep_alive(
2813        axum::response::sse::KeepAlive::new()
2814            .interval(std::time::Duration::from_secs(15))
2815            .text("keep-alive-text"),
2816    )
2817}
2818
2819// ========== AI-Powered Features ==========
2820
2821/// Request for AI-powered API specification generation
2822#[derive(Debug, Deserialize)]
2823pub struct GenerateSpecRequest {
2824    /// Natural language description of the API to generate
2825    pub query: String,
2826    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2827    pub spec_type: String,
2828    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2829    pub api_version: Option<String>,
2830}
2831
2832/// Request for OpenAPI generation from recorded traffic
2833#[derive(Debug, Deserialize)]
2834pub struct GenerateOpenApiFromTrafficRequest {
2835    /// Path to recorder database (optional, defaults to ./recordings.db)
2836    #[serde(default)]
2837    pub database_path: Option<String>,
2838    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2839    #[serde(default)]
2840    pub since: Option<String>,
2841    /// End time for filtering (ISO 8601 format)
2842    #[serde(default)]
2843    pub until: Option<String>,
2844    /// Path pattern filter (supports wildcards, e.g., /api/*)
2845    #[serde(default)]
2846    pub path_pattern: Option<String>,
2847    /// Minimum confidence score for including paths (0.0 to 1.0)
2848    #[serde(default = "default_min_confidence")]
2849    pub min_confidence: f64,
2850}
2851
2852fn default_min_confidence() -> f64 {
2853    0.7
2854}
2855
2856/// Generate API specification from natural language using AI
2857#[cfg(feature = "data-faker")]
2858async fn generate_ai_spec(
2859    State(_state): State<ManagementState>,
2860    Json(request): Json<GenerateSpecRequest>,
2861) -> impl IntoResponse {
2862    use mockforge_data::rag::{
2863        config::{EmbeddingProvider, LlmProvider, RagConfig},
2864        engine::RagEngine,
2865        storage::{DocumentStorage, StorageFactory},
2866    };
2867    use std::sync::Arc;
2868
2869    // Build RAG config from environment variables
2870    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2871        .ok()
2872        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2873
2874    // Check if RAG is configured - require API key
2875    if api_key.is_none() {
2876        return (
2877            StatusCode::SERVICE_UNAVAILABLE,
2878            Json(serde_json::json!({
2879                "error": "AI service not configured",
2880                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2881            })),
2882        )
2883            .into_response();
2884    }
2885
2886    // Build RAG configuration
2887    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2888        .unwrap_or_else(|_| "openai".to_string())
2889        .to_lowercase();
2890
2891    let provider = match provider_str.as_str() {
2892        "openai" => LlmProvider::OpenAI,
2893        "anthropic" => LlmProvider::Anthropic,
2894        "ollama" => LlmProvider::Ollama,
2895        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2896        _ => LlmProvider::OpenAI,
2897    };
2898
2899    let api_endpoint =
2900        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2901            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2902            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2903            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2904            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2905        });
2906
2907    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2908        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2909        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2910        LlmProvider::Ollama => "llama2".to_string(),
2911        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2912    });
2913
2914    // Build RagConfig using default() and override fields
2915    let mut rag_config = RagConfig::default();
2916    rag_config.provider = provider;
2917    rag_config.api_endpoint = api_endpoint;
2918    rag_config.api_key = api_key;
2919    rag_config.model = model;
2920    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2921        .unwrap_or_else(|_| "4096".to_string())
2922        .parse()
2923        .unwrap_or(4096);
2924    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2925        .unwrap_or_else(|_| "0.3".to_string())
2926        .parse()
2927        .unwrap_or(0.3); // Lower temperature for more structured output
2928    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2929        .unwrap_or_else(|_| "60".to_string())
2930        .parse()
2931        .unwrap_or(60);
2932    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2933        .unwrap_or_else(|_| "4000".to_string())
2934        .parse()
2935        .unwrap_or(4000);
2936
2937    // Build the prompt for spec generation
2938    let spec_type_label = match request.spec_type.as_str() {
2939        "openapi" => "OpenAPI 3.0",
2940        "graphql" => "GraphQL",
2941        "asyncapi" => "AsyncAPI",
2942        _ => "OpenAPI 3.0",
2943    };
2944
2945    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2946
2947    let prompt = format!(
2948        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2949
2950User Requirements:
2951{}
2952
2953Instructions:
29541. Generate a complete, valid {} specification
29552. Include all paths, operations, request/response schemas, and components
29563. Use realistic field names and data types
29574. Include proper descriptions and examples
29585. Follow {} best practices
29596. Return ONLY the specification, no additional explanation
29607. For OpenAPI, use version {}
2961
2962Return the specification in {} format."#,
2963        spec_type_label,
2964        request.query,
2965        spec_type_label,
2966        spec_type_label,
2967        api_version,
2968        if request.spec_type == "graphql" {
2969            "GraphQL SDL"
2970        } else {
2971            "YAML"
2972        }
2973    );
2974
2975    // Create in-memory storage for RAG engine
2976    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
2977    // We need to use unsafe transmute or create a wrapper, but for now we'll use
2978    // a simpler approach: create InMemoryStorage directly
2979    use mockforge_data::rag::storage::InMemoryStorage;
2980    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2981
2982    // Create RAG engine
2983    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2984        Ok(engine) => engine,
2985        Err(e) => {
2986            return (
2987                StatusCode::INTERNAL_SERVER_ERROR,
2988                Json(serde_json::json!({
2989                    "error": "Failed to initialize RAG engine",
2990                    "message": e.to_string()
2991                })),
2992            )
2993                .into_response();
2994        }
2995    };
2996
2997    // Generate using RAG engine
2998    match rag_engine.generate(&prompt, None).await {
2999        Ok(generated_text) => {
3000            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3001            let spec = if request.spec_type == "graphql" {
3002                // For GraphQL, extract SDL
3003                extract_graphql_schema(&generated_text)
3004            } else {
3005                // For OpenAPI/AsyncAPI, extract YAML
3006                extract_yaml_spec(&generated_text)
3007            };
3008
3009            Json(serde_json::json!({
3010                "success": true,
3011                "spec": spec,
3012                "spec_type": request.spec_type,
3013            }))
3014            .into_response()
3015        }
3016        Err(e) => (
3017            StatusCode::INTERNAL_SERVER_ERROR,
3018            Json(serde_json::json!({
3019                "error": "AI generation failed",
3020                "message": e.to_string()
3021            })),
3022        )
3023            .into_response(),
3024    }
3025}
3026
3027#[cfg(not(feature = "data-faker"))]
3028async fn generate_ai_spec(
3029    State(_state): State<ManagementState>,
3030    Json(_request): Json<GenerateSpecRequest>,
3031) -> impl IntoResponse {
3032    (
3033        StatusCode::NOT_IMPLEMENTED,
3034        Json(serde_json::json!({
3035            "error": "AI features not enabled",
3036            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3037        })),
3038    )
3039        .into_response()
3040}
3041
3042/// Generate OpenAPI specification from recorded traffic
3043async fn generate_openapi_from_traffic(
3044    State(_state): State<ManagementState>,
3045    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3046) -> impl IntoResponse {
3047    use chrono::{DateTime, Utc};
3048    use mockforge_core::intelligent_behavior::{
3049        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3050        IntelligentBehaviorConfig,
3051    };
3052    use mockforge_recorder::{
3053        database::RecorderDatabase,
3054        openapi_export::{QueryFilters, RecordingsToOpenApi},
3055    };
3056    use std::path::PathBuf;
3057
3058    // Determine database path
3059    let db_path = if let Some(ref path) = request.database_path {
3060        PathBuf::from(path)
3061    } else {
3062        std::env::current_dir()
3063            .unwrap_or_else(|_| PathBuf::from("."))
3064            .join("recordings.db")
3065    };
3066
3067    // Open database
3068    let db = match RecorderDatabase::new(&db_path).await {
3069        Ok(db) => db,
3070        Err(e) => {
3071            return (
3072                StatusCode::BAD_REQUEST,
3073                Json(serde_json::json!({
3074                    "error": "Database error",
3075                    "message": format!("Failed to open recorder database: {}", e)
3076                })),
3077            )
3078                .into_response();
3079        }
3080    };
3081
3082    // Parse time filters
3083    let since_dt = if let Some(ref since_str) = request.since {
3084        match DateTime::parse_from_rfc3339(since_str) {
3085            Ok(dt) => Some(dt.with_timezone(&Utc)),
3086            Err(e) => {
3087                return (
3088                    StatusCode::BAD_REQUEST,
3089                    Json(serde_json::json!({
3090                        "error": "Invalid date format",
3091                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3092                    })),
3093                )
3094                    .into_response();
3095            }
3096        }
3097    } else {
3098        None
3099    };
3100
3101    let until_dt = if let Some(ref until_str) = request.until {
3102        match DateTime::parse_from_rfc3339(until_str) {
3103            Ok(dt) => Some(dt.with_timezone(&Utc)),
3104            Err(e) => {
3105                return (
3106                    StatusCode::BAD_REQUEST,
3107                    Json(serde_json::json!({
3108                        "error": "Invalid date format",
3109                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3110                    })),
3111                )
3112                    .into_response();
3113            }
3114        }
3115    } else {
3116        None
3117    };
3118
3119    // Build query filters
3120    let query_filters = QueryFilters {
3121        since: since_dt,
3122        until: until_dt,
3123        path_pattern: request.path_pattern.clone(),
3124        min_status_code: None,
3125        max_requests: Some(1000),
3126    };
3127
3128    // Query HTTP exchanges
3129    let exchanges = match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await
3130    {
3131        Ok(exchanges) => exchanges,
3132        Err(e) => {
3133            return (
3134                StatusCode::INTERNAL_SERVER_ERROR,
3135                Json(serde_json::json!({
3136                    "error": "Query error",
3137                    "message": format!("Failed to query HTTP exchanges: {}", e)
3138                })),
3139            )
3140                .into_response();
3141        }
3142    };
3143
3144    if exchanges.is_empty() {
3145        return (
3146            StatusCode::NOT_FOUND,
3147            Json(serde_json::json!({
3148                "error": "No exchanges found",
3149                "message": "No HTTP exchanges found matching the specified filters"
3150            })),
3151        )
3152            .into_response();
3153    }
3154
3155    // Create OpenAPI generator config
3156    let behavior_config = IntelligentBehaviorConfig::default();
3157    let gen_config = OpenApiGenerationConfig {
3158        min_confidence: request.min_confidence,
3159        behavior_model: Some(behavior_config.behavior_model),
3160    };
3161
3162    // Generate OpenAPI spec
3163    let generator = OpenApiSpecGenerator::new(gen_config);
3164    let result = match generator.generate_from_exchanges(exchanges).await {
3165        Ok(result) => result,
3166        Err(e) => {
3167            return (
3168                StatusCode::INTERNAL_SERVER_ERROR,
3169                Json(serde_json::json!({
3170                    "error": "Generation error",
3171                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3172                })),
3173            )
3174                .into_response();
3175        }
3176    };
3177
3178    // Prepare response
3179    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3180        raw.clone()
3181    } else {
3182        match serde_json::to_value(&result.spec.spec) {
3183            Ok(json) => json,
3184            Err(e) => {
3185                return (
3186                    StatusCode::INTERNAL_SERVER_ERROR,
3187                    Json(serde_json::json!({
3188                        "error": "Serialization error",
3189                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3190                    })),
3191                )
3192                    .into_response();
3193            }
3194        }
3195    };
3196
3197    // Build response with metadata
3198    let response = serde_json::json!({
3199        "spec": spec_json,
3200        "metadata": {
3201            "requests_analyzed": result.metadata.requests_analyzed,
3202            "paths_inferred": result.metadata.paths_inferred,
3203            "path_confidence": result.metadata.path_confidence,
3204            "generated_at": result.metadata.generated_at.to_rfc3339(),
3205            "duration_ms": result.metadata.duration_ms,
3206        }
3207    });
3208
3209    Json(response).into_response()
3210}
3211
3212/// List all rule explanations
3213async fn list_rule_explanations(
3214    State(state): State<ManagementState>,
3215    Query(params): Query<std::collections::HashMap<String, String>>,
3216) -> impl IntoResponse {
3217    use mockforge_core::intelligent_behavior::RuleType;
3218
3219    let explanations = state.rule_explanations.read().await;
3220    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3221
3222    // Filter by rule type if provided
3223    if let Some(rule_type_str) = params.get("rule_type") {
3224        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3225            explanations_vec.retain(|e| e.rule_type == rule_type);
3226        }
3227    }
3228
3229    // Filter by minimum confidence if provided
3230    if let Some(min_confidence_str) = params.get("min_confidence") {
3231        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3232            explanations_vec.retain(|e| e.confidence >= min_confidence);
3233        }
3234    }
3235
3236    // Sort by confidence (descending) and then by generated_at (descending)
3237    explanations_vec.sort_by(|a, b| {
3238        b.confidence
3239            .partial_cmp(&a.confidence)
3240            .unwrap_or(std::cmp::Ordering::Equal)
3241            .then_with(|| b.generated_at.cmp(&a.generated_at))
3242    });
3243
3244    Json(serde_json::json!({
3245        "explanations": explanations_vec,
3246        "total": explanations_vec.len(),
3247    }))
3248    .into_response()
3249}
3250
3251/// Get a specific rule explanation by ID
3252async fn get_rule_explanation(
3253    State(state): State<ManagementState>,
3254    Path(rule_id): Path<String>,
3255) -> impl IntoResponse {
3256    let explanations = state.rule_explanations.read().await;
3257
3258    match explanations.get(&rule_id) {
3259        Some(explanation) => Json(serde_json::json!({
3260            "explanation": explanation,
3261        }))
3262        .into_response(),
3263        None => (
3264            StatusCode::NOT_FOUND,
3265            Json(serde_json::json!({
3266                "error": "Rule explanation not found",
3267                "message": format!("No explanation found for rule ID: {}", rule_id)
3268            })),
3269        )
3270            .into_response(),
3271    }
3272}
3273
3274/// Request for learning from examples
3275#[derive(Debug, Deserialize)]
3276pub struct LearnFromExamplesRequest {
3277    /// Example request/response pairs to learn from
3278    pub examples: Vec<ExamplePairRequest>,
3279    /// Optional configuration override
3280    #[serde(default)]
3281    pub config: Option<serde_json::Value>,
3282}
3283
3284/// Example pair request format
3285#[derive(Debug, Deserialize)]
3286pub struct ExamplePairRequest {
3287    /// Request data (method, path, body, etc.)
3288    pub request: serde_json::Value,
3289    /// Response data (status_code, body, etc.)
3290    pub response: serde_json::Value,
3291}
3292
3293/// Learn behavioral rules from example pairs
3294///
3295/// This endpoint accepts example request/response pairs, generates behavioral rules
3296/// with explanations, and stores the explanations for later retrieval.
3297async fn learn_from_examples(
3298    State(state): State<ManagementState>,
3299    Json(request): Json<LearnFromExamplesRequest>,
3300) -> impl IntoResponse {
3301    use mockforge_core::intelligent_behavior::{
3302        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3303        rule_generator::{ExamplePair, RuleGenerator},
3304    };
3305
3306    if request.examples.is_empty() {
3307        return (
3308            StatusCode::BAD_REQUEST,
3309            Json(serde_json::json!({
3310                "error": "No examples provided",
3311                "message": "At least one example pair is required"
3312            })),
3313        )
3314            .into_response();
3315    }
3316
3317    // Convert request examples to ExamplePair format
3318    let example_pairs: Result<Vec<ExamplePair>, String> = request
3319        .examples
3320        .into_iter()
3321        .enumerate()
3322        .map(|(idx, ex)| {
3323            // Parse request JSON to extract method, path, body, etc.
3324            let method = ex
3325                .request
3326                .get("method")
3327                .and_then(|v| v.as_str())
3328                .map(|s| s.to_string())
3329                .unwrap_or_else(|| "GET".to_string());
3330            let path = ex
3331                .request
3332                .get("path")
3333                .and_then(|v| v.as_str())
3334                .map(|s| s.to_string())
3335                .unwrap_or_else(|| "/".to_string());
3336            let request_body = ex.request.get("body").cloned();
3337            let query_params = ex
3338                .request
3339                .get("query_params")
3340                .and_then(|v| v.as_object())
3341                .map(|obj| {
3342                    obj.iter()
3343                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3344                        .collect()
3345                })
3346                .unwrap_or_default();
3347            let headers = ex
3348                .request
3349                .get("headers")
3350                .and_then(|v| v.as_object())
3351                .map(|obj| {
3352                    obj.iter()
3353                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3354                        .collect()
3355                })
3356                .unwrap_or_default();
3357
3358            // Parse response JSON to extract status, body, etc.
3359            let status = ex
3360                .response
3361                .get("status_code")
3362                .or_else(|| ex.response.get("status"))
3363                .and_then(|v| v.as_u64())
3364                .map(|n| n as u16)
3365                .unwrap_or(200);
3366            let response_body = ex.response.get("body").cloned();
3367
3368            Ok(ExamplePair {
3369                method,
3370                path,
3371                request: request_body,
3372                status,
3373                response: response_body,
3374                query_params,
3375                headers,
3376                metadata: {
3377                    let mut meta = std::collections::HashMap::new();
3378                    meta.insert("source".to_string(), "api".to_string());
3379                    meta.insert("example_index".to_string(), idx.to_string());
3380                    meta
3381                },
3382            })
3383        })
3384        .collect();
3385
3386    let example_pairs = match example_pairs {
3387        Ok(pairs) => pairs,
3388        Err(e) => {
3389            return (
3390                StatusCode::BAD_REQUEST,
3391                Json(serde_json::json!({
3392                    "error": "Invalid examples",
3393                    "message": e
3394                })),
3395            )
3396                .into_response();
3397        }
3398    };
3399
3400    // Create behavior config (use provided config or default)
3401    let behavior_config = if let Some(config_json) = request.config {
3402        // Try to deserialize custom config, fall back to default
3403        serde_json::from_value(config_json)
3404            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3405            .behavior_model
3406    } else {
3407        BehaviorModelConfig::default()
3408    };
3409
3410    // Create rule generator
3411    let generator = RuleGenerator::new(behavior_config);
3412
3413    // Generate rules with explanations
3414    let (rules, explanations) =
3415        match generator.generate_rules_with_explanations(example_pairs).await {
3416            Ok(result) => result,
3417            Err(e) => {
3418                return (
3419                    StatusCode::INTERNAL_SERVER_ERROR,
3420                    Json(serde_json::json!({
3421                        "error": "Rule generation failed",
3422                        "message": format!("Failed to generate rules: {}", e)
3423                    })),
3424                )
3425                    .into_response();
3426            }
3427        };
3428
3429    // Store explanations in ManagementState
3430    {
3431        let mut stored_explanations = state.rule_explanations.write().await;
3432        for explanation in &explanations {
3433            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3434        }
3435    }
3436
3437    // Prepare response
3438    let response = serde_json::json!({
3439        "success": true,
3440        "rules_generated": {
3441            "consistency_rules": rules.consistency_rules.len(),
3442            "schemas": rules.schemas.len(),
3443            "state_machines": rules.state_transitions.len(),
3444            "system_prompt": !rules.system_prompt.is_empty(),
3445        },
3446        "explanations": explanations.iter().map(|e| serde_json::json!({
3447            "rule_id": e.rule_id,
3448            "rule_type": e.rule_type,
3449            "confidence": e.confidence,
3450            "reasoning": e.reasoning,
3451        })).collect::<Vec<_>>(),
3452        "total_explanations": explanations.len(),
3453    });
3454
3455    Json(response).into_response()
3456}
3457
3458fn extract_yaml_spec(text: &str) -> String {
3459    // Try to find YAML code blocks
3460    if let Some(start) = text.find("```yaml") {
3461        let yaml_start = text[start + 7..].trim_start();
3462        if let Some(end) = yaml_start.find("```") {
3463            return yaml_start[..end].trim().to_string();
3464        }
3465    }
3466    if let Some(start) = text.find("```") {
3467        let content_start = text[start + 3..].trim_start();
3468        if let Some(end) = content_start.find("```") {
3469            return content_start[..end].trim().to_string();
3470        }
3471    }
3472
3473    // Check if it starts with openapi: or asyncapi:
3474    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3475        return text.trim().to_string();
3476    }
3477
3478    // Return as-is if no code blocks found
3479    text.trim().to_string()
3480}
3481
3482fn extract_graphql_schema(text: &str) -> String {
3483    // Try to find GraphQL code blocks
3484    if let Some(start) = text.find("```graphql") {
3485        let schema_start = text[start + 10..].trim_start();
3486        if let Some(end) = schema_start.find("```") {
3487            return schema_start[..end].trim().to_string();
3488        }
3489    }
3490    if let Some(start) = text.find("```") {
3491        let content_start = text[start + 3..].trim_start();
3492        if let Some(end) = content_start.find("```") {
3493            return content_start[..end].trim().to_string();
3494        }
3495    }
3496
3497    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3498    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3499        return text.trim().to_string();
3500    }
3501
3502    text.trim().to_string()
3503}
3504
3505// ========== Chaos Engineering Management ==========
3506
3507/// Get current chaos engineering configuration
3508async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3509    #[cfg(feature = "chaos")]
3510    {
3511        if let Some(chaos_state) = &state.chaos_api_state {
3512            let config = chaos_state.config.read().await;
3513            // Convert ChaosConfig to JSON response format
3514            Json(serde_json::json!({
3515                "enabled": config.enabled,
3516                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3517                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3518                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3519                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3520            }))
3521            .into_response()
3522        } else {
3523            // Chaos API not available, return default
3524            Json(serde_json::json!({
3525                "enabled": false,
3526                "latency": null,
3527                "fault_injection": null,
3528                "rate_limit": null,
3529                "traffic_shaping": null,
3530            }))
3531            .into_response()
3532        }
3533    }
3534    #[cfg(not(feature = "chaos"))]
3535    {
3536        // Chaos feature not enabled
3537        Json(serde_json::json!({
3538            "enabled": false,
3539            "latency": null,
3540            "fault_injection": null,
3541            "rate_limit": null,
3542            "traffic_shaping": null,
3543        }))
3544        .into_response()
3545    }
3546}
3547
3548/// Request to update chaos configuration
3549#[derive(Debug, Deserialize)]
3550pub struct ChaosConfigUpdate {
3551    /// Whether to enable chaos engineering
3552    pub enabled: Option<bool>,
3553    /// Latency configuration
3554    pub latency: Option<serde_json::Value>,
3555    /// Fault injection configuration
3556    pub fault_injection: Option<serde_json::Value>,
3557    /// Rate limiting configuration
3558    pub rate_limit: Option<serde_json::Value>,
3559    /// Traffic shaping configuration
3560    pub traffic_shaping: Option<serde_json::Value>,
3561}
3562
3563/// Update chaos engineering configuration
3564async fn update_chaos_config(
3565    State(state): State<ManagementState>,
3566    Json(config_update): Json<ChaosConfigUpdate>,
3567) -> impl IntoResponse {
3568    #[cfg(feature = "chaos")]
3569    {
3570        if let Some(chaos_state) = &state.chaos_api_state {
3571            use mockforge_chaos::config::{ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig};
3572
3573            let mut config = chaos_state.config.write().await;
3574
3575            // Update enabled flag if provided
3576            if let Some(enabled) = config_update.enabled {
3577                config.enabled = enabled;
3578            }
3579
3580            // Update latency config if provided
3581            if let Some(latency_json) = config_update.latency {
3582                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3583                    config.latency = Some(latency);
3584                }
3585            }
3586
3587            // Update fault injection config if provided
3588            if let Some(fault_json) = config_update.fault_injection {
3589                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3590                    config.fault_injection = Some(fault);
3591                }
3592            }
3593
3594            // Update rate limit config if provided
3595            if let Some(rate_json) = config_update.rate_limit {
3596                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3597                    config.rate_limit = Some(rate);
3598                }
3599            }
3600
3601            // Update traffic shaping config if provided
3602            if let Some(traffic_json) = config_update.traffic_shaping {
3603                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3604                    config.traffic_shaping = Some(traffic);
3605                }
3606            }
3607
3608            // Reinitialize middleware injectors with new config
3609            // The middleware will pick up the changes on the next request
3610            drop(config);
3611
3612            info!("Chaos configuration updated successfully");
3613            Json(serde_json::json!({
3614                "success": true,
3615                "message": "Chaos configuration updated and applied"
3616            }))
3617            .into_response()
3618        } else {
3619            (
3620                StatusCode::SERVICE_UNAVAILABLE,
3621                Json(serde_json::json!({
3622                    "success": false,
3623                    "error": "Chaos API not available",
3624                    "message": "Chaos engineering is not enabled or configured"
3625                })),
3626            )
3627                .into_response()
3628        }
3629    }
3630    #[cfg(not(feature = "chaos"))]
3631    {
3632        (
3633            StatusCode::NOT_IMPLEMENTED,
3634            Json(serde_json::json!({
3635                "success": false,
3636                "error": "Chaos feature not enabled",
3637                "message": "Chaos engineering feature is not compiled into this build"
3638            })),
3639        )
3640            .into_response()
3641    }
3642}
3643
3644// ========== Network Profile Management ==========
3645
3646/// List available network profiles
3647async fn list_network_profiles() -> impl IntoResponse {
3648    use mockforge_core::network_profiles::NetworkProfileCatalog;
3649
3650    let catalog = NetworkProfileCatalog::default();
3651    let profiles: Vec<serde_json::Value> = catalog
3652        .list_profiles_with_description()
3653        .iter()
3654        .map(|(name, description)| {
3655            serde_json::json!({
3656                "name": name,
3657                "description": description,
3658            })
3659        })
3660        .collect();
3661
3662    Json(serde_json::json!({
3663        "profiles": profiles
3664    }))
3665    .into_response()
3666}
3667
3668#[derive(Debug, Deserialize)]
3669/// Request to apply a network profile
3670pub struct ApplyNetworkProfileRequest {
3671    /// Name of the network profile to apply
3672    pub profile_name: String,
3673}
3674
3675/// Apply a network profile
3676async fn apply_network_profile(
3677    State(state): State<ManagementState>,
3678    Json(request): Json<ApplyNetworkProfileRequest>,
3679) -> impl IntoResponse {
3680    use mockforge_core::network_profiles::NetworkProfileCatalog;
3681
3682    let catalog = NetworkProfileCatalog::default();
3683    if let Some(profile) = catalog.get(&request.profile_name) {
3684        // Apply profile to server configuration if available
3685        // NetworkProfile contains latency and traffic_shaping configs
3686        if let Some(server_config) = &state.server_config {
3687            let mut config = server_config.write().await;
3688
3689            // Apply network profile's traffic shaping to core config
3690            use mockforge_core::config::NetworkShapingConfig;
3691
3692            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3693            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3694            // which has bandwidth and burst_loss fields
3695            let network_shaping = NetworkShapingConfig {
3696                enabled: profile.traffic_shaping.bandwidth.enabled || profile.traffic_shaping.burst_loss.enabled,
3697                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3698                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3699                max_connections: 1000, // Default value
3700            };
3701
3702            // Update chaos config if it exists, or create it
3703            // Chaos config is in observability.chaos, not core.chaos
3704            if let Some(ref mut chaos) = config.observability.chaos {
3705                chaos.traffic_shaping = Some(network_shaping);
3706            } else {
3707                // Create minimal chaos config with traffic shaping
3708                use mockforge_core::config::ChaosEngConfig;
3709                config.observability.chaos = Some(ChaosEngConfig {
3710                    enabled: true,
3711                    latency: None,
3712                    fault_injection: None,
3713                    rate_limit: None,
3714                    traffic_shaping: Some(network_shaping),
3715                    scenario: None,
3716                });
3717            }
3718
3719            info!("Network profile '{}' applied to server configuration", request.profile_name);
3720        } else {
3721            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3722        }
3723
3724        // Also update chaos API state if available
3725        #[cfg(feature = "chaos")]
3726        {
3727            if let Some(chaos_state) = &state.chaos_api_state {
3728                use mockforge_chaos::config::TrafficShapingConfig;
3729
3730                let mut chaos_config = chaos_state.config.write().await;
3731                // Apply profile's traffic shaping to chaos API state
3732                let chaos_traffic_shaping = TrafficShapingConfig {
3733                    enabled: profile.traffic_shaping.bandwidth.enabled || profile.traffic_shaping.burst_loss.enabled,
3734                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3735                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3736                    max_connections: 0,
3737                    connection_timeout_ms: 30000,
3738                };
3739                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3740                chaos_config.enabled = true; // Enable chaos when applying a profile
3741                drop(chaos_config);
3742                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3743            }
3744        }
3745
3746        Json(serde_json::json!({
3747            "success": true,
3748            "message": format!("Network profile '{}' applied", request.profile_name),
3749            "profile": {
3750                "name": profile.name,
3751                "description": profile.description,
3752            }
3753        }))
3754        .into_response()
3755    } else {
3756        (
3757            StatusCode::NOT_FOUND,
3758            Json(serde_json::json!({
3759                "error": "Profile not found",
3760                "message": format!("Network profile '{}' not found", request.profile_name)
3761            })),
3762        )
3763            .into_response()
3764    }
3765}
3766
3767/// Build the management API router with UI Builder support
3768pub fn management_router_with_ui_builder(
3769    state: ManagementState,
3770    server_config: mockforge_core::config::ServerConfig,
3771) -> Router {
3772    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3773
3774    // Create the base management router
3775    let management = management_router(state);
3776
3777    // Create UI Builder state and router
3778    let ui_builder_state = UIBuilderState::new(server_config);
3779    let ui_builder = create_ui_builder_router(ui_builder_state);
3780
3781    // Nest UI Builder under /ui-builder
3782    management.nest("/ui-builder", ui_builder)
3783}
3784
3785/// Build management router with spec import API
3786pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3787    use crate::spec_import::{spec_import_router, SpecImportState};
3788
3789    // Create base management router
3790    let management = management_router(state);
3791
3792    // Merge with spec import router
3793    Router::new()
3794        .merge(management)
3795        .merge(spec_import_router(SpecImportState::new()))
3796}
3797
3798#[cfg(test)]
3799mod tests {
3800    use super::*;
3801
3802    #[tokio::test]
3803    async fn test_create_and_get_mock() {
3804        let state = ManagementState::new(None, None, 3000);
3805
3806        let mock = MockConfig {
3807            id: "test-1".to_string(),
3808            name: "Test Mock".to_string(),
3809            method: "GET".to_string(),
3810            path: "/test".to_string(),
3811            response: MockResponse {
3812                body: serde_json::json!({"message": "test"}),
3813                headers: None,
3814            },
3815            enabled: true,
3816            latency_ms: None,
3817            status_code: Some(200),
3818            request_match: None,
3819            priority: None,
3820            scenario: None,
3821            required_scenario_state: None,
3822            new_scenario_state: None,
3823        };
3824
3825        // Create mock
3826        {
3827            let mut mocks = state.mocks.write().await;
3828            mocks.push(mock.clone());
3829        }
3830
3831        // Get mock
3832        let mocks = state.mocks.read().await;
3833        let found = mocks.iter().find(|m| m.id == "test-1");
3834        assert!(found.is_some());
3835        assert_eq!(found.unwrap().name, "Test Mock");
3836    }
3837
3838    #[tokio::test]
3839    async fn test_server_stats() {
3840        let state = ManagementState::new(None, None, 3000);
3841
3842        // Add some mocks
3843        {
3844            let mut mocks = state.mocks.write().await;
3845            mocks.push(MockConfig {
3846                id: "1".to_string(),
3847                name: "Mock 1".to_string(),
3848                method: "GET".to_string(),
3849                path: "/test1".to_string(),
3850                response: MockResponse {
3851                    body: serde_json::json!({}),
3852                    headers: None,
3853                },
3854                enabled: true,
3855                latency_ms: None,
3856                status_code: Some(200),
3857                request_match: None,
3858                priority: None,
3859                scenario: None,
3860                required_scenario_state: None,
3861                new_scenario_state: None,
3862            });
3863            mocks.push(MockConfig {
3864                id: "2".to_string(),
3865                name: "Mock 2".to_string(),
3866                method: "POST".to_string(),
3867                path: "/test2".to_string(),
3868                response: MockResponse {
3869                    body: serde_json::json!({}),
3870                    headers: None,
3871                },
3872                enabled: false,
3873                latency_ms: None,
3874                status_code: Some(201),
3875                request_match: None,
3876                priority: None,
3877                scenario: None,
3878                required_scenario_state: None,
3879                new_scenario_state: None,
3880            });
3881        }
3882
3883        let mocks = state.mocks.read().await;
3884        assert_eq!(mocks.len(), 2);
3885        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3886    }
3887}