mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json, Response,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if json_path.starts_with("$.") {
317        let path = &json_path[2..];
318        let parts: Vec<&str> = path.split('.').collect();
319
320        let mut current = json;
321        for part in parts {
322            if let Some(obj) = current.as_object() {
323                if let Some(value) = obj.get(part) {
324                    current = value;
325                } else {
326                    return false;
327                }
328            } else {
329                return false;
330            }
331        }
332        true
333    } else {
334        // For complex JSONPath expressions, would need a proper JSONPath library
335        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336        false
337    }
338}
339
340/// Evaluate a custom matcher expression
341fn evaluate_custom_matcher(
342    expression: &str,
343    method: &str,
344    path: &str,
345    headers: &std::collections::HashMap<String, String>,
346    query_params: &std::collections::HashMap<String, String>,
347    body: Option<&[u8]>,
348) -> bool {
349    use regex::Regex;
350
351    let expr = expression.trim();
352
353    // Handle equality expressions (field == "value")
354    if expr.contains("==") {
355        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356        if parts.len() != 2 {
357            return false;
358        }
359
360        let field = parts[0];
361        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363        match field {
364            "method" => method == expected_value,
365            "path" => path == expected_value,
366            _ if field.starts_with("headers.") => {
367                let header_name = &field[8..];
368                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369            }
370            _ if field.starts_with("query.") => {
371                let param_name = &field[6..];
372                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373            }
374            _ => false,
375        }
376    }
377    // Handle regex match expressions (field =~ "pattern")
378    else if expr.contains("=~") {
379        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380        if parts.len() != 2 {
381            return false;
382        }
383
384        let field = parts[0];
385        let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387        if let Ok(re) = Regex::new(pattern) {
388            match field {
389                "method" => re.is_match(method),
390                "path" => re.is_match(path),
391                _ if field.starts_with("headers.") => {
392                    let header_name = &field[8..];
393                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394                }
395                _ if field.starts_with("query.") => {
396                    let param_name = &field[6..];
397                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398                }
399                _ => false,
400            }
401        } else {
402            false
403        }
404    }
405    // Handle contains expressions (field contains "value")
406    else if expr.contains("contains") {
407        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408        if parts.len() != 2 {
409            return false;
410        }
411
412        let field = parts[0];
413        let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415        match field {
416            "path" => path.contains(search_value),
417            _ if field.starts_with("headers.") => {
418                let header_name = &field[8..];
419                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420            }
421            _ if field.starts_with("body") => {
422                if let Some(body_bytes) = body {
423                    let body_str = String::from_utf8_lossy(body_bytes);
424                    body_str.contains(search_value)
425                } else {
426                    false
427                }
428            }
429            _ => false,
430        }
431    } else {
432        // Unknown expression format
433        tracing::warn!("Unknown custom matcher expression format: {}", expr);
434        false
435    }
436}
437
438/// Server statistics
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441    /// Server uptime in seconds
442    pub uptime_seconds: u64,
443    /// Total number of requests processed
444    pub total_requests: u64,
445    /// Number of active mock configurations
446    pub active_mocks: usize,
447    /// Number of currently enabled mocks
448    pub enabled_mocks: usize,
449    /// Number of registered API routes
450    pub registered_routes: usize,
451}
452
453/// Server configuration info
454#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456    /// MockForge version string
457    pub version: String,
458    /// Server port number
459    pub port: u16,
460    /// Whether an OpenAPI spec is loaded
461    pub has_openapi_spec: bool,
462    /// Optional path to the OpenAPI spec file
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub spec_path: Option<String>,
465}
466
467/// Shared state for the management API
468#[derive(Clone)]
469pub struct ManagementState {
470    /// Collection of mock configurations
471    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472    /// Optional OpenAPI specification
473    pub spec: Option<Arc<OpenApiSpec>>,
474    /// Optional path to the OpenAPI spec file
475    pub spec_path: Option<String>,
476    /// Server port number
477    pub port: u16,
478    /// Server start time for uptime calculation
479    pub start_time: std::time::Instant,
480    /// Counter for total requests processed
481    pub request_counter: Arc<RwLock<u64>>,
482    /// Optional proxy configuration for migration pipeline
483    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484    /// Optional SMTP registry for email mocking
485    #[cfg(feature = "smtp")]
486    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487    /// Optional MQTT broker for message mocking
488    #[cfg(feature = "mqtt")]
489    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490    /// Optional Kafka broker for event streaming
491    #[cfg(feature = "kafka")]
492    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493    /// Broadcast channel for message events (MQTT & Kafka)
494    #[cfg(any(feature = "mqtt", feature = "kafka"))]
495    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496    /// State machine manager for scenario state machines
497    pub state_machine_manager:
498        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499    /// Optional WebSocket broadcast channel for real-time updates
500    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501    /// Lifecycle hook registry for extensibility
502    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503    /// Rule explanations storage (in-memory for now)
504    pub rule_explanations: Arc<
505        RwLock<
506            std::collections::HashMap<
507                String,
508                mockforge_core::intelligent_behavior::RuleExplanation,
509            >,
510        >,
511    >,
512    /// Optional chaos API state for chaos config management
513    #[cfg(feature = "chaos")]
514    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
515    /// Optional server configuration for profile application
516    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
517}
518
519impl ManagementState {
520    /// Create a new management state
521    ///
522    /// # Arguments
523    /// * `spec` - Optional OpenAPI specification
524    /// * `spec_path` - Optional path to the OpenAPI spec file
525    /// * `port` - Server port number
526    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
527        Self {
528            mocks: Arc::new(RwLock::new(Vec::new())),
529            spec,
530            spec_path,
531            port,
532            start_time: std::time::Instant::now(),
533            request_counter: Arc::new(RwLock::new(0)),
534            proxy_config: None,
535            #[cfg(feature = "smtp")]
536            smtp_registry: None,
537            #[cfg(feature = "mqtt")]
538            mqtt_broker: None,
539            #[cfg(feature = "kafka")]
540            kafka_broker: None,
541            #[cfg(any(feature = "mqtt", feature = "kafka"))]
542            message_events: {
543                let (tx, _) = broadcast::channel(1000);
544                Arc::new(tx)
545            },
546            state_machine_manager: Arc::new(RwLock::new(
547                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
548            )),
549            ws_broadcast: None,
550            lifecycle_hooks: None,
551            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
552            #[cfg(feature = "chaos")]
553            chaos_api_state: None,
554            server_config: None,
555        }
556    }
557
558    /// Add lifecycle hook registry to management state
559    pub fn with_lifecycle_hooks(
560        mut self,
561        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
562    ) -> Self {
563        self.lifecycle_hooks = Some(hooks);
564        self
565    }
566
567    /// Add WebSocket broadcast channel to management state
568    pub fn with_ws_broadcast(
569        mut self,
570        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
571    ) -> Self {
572        self.ws_broadcast = Some(ws_broadcast);
573        self
574    }
575
576    /// Add proxy configuration to management state
577    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
578        self.proxy_config = Some(proxy_config);
579        self
580    }
581
582    #[cfg(feature = "smtp")]
583    /// Add SMTP registry to management state
584    pub fn with_smtp_registry(
585        mut self,
586        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
587    ) -> Self {
588        self.smtp_registry = Some(smtp_registry);
589        self
590    }
591
592    #[cfg(feature = "mqtt")]
593    /// Add MQTT broker to management state
594    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
595        self.mqtt_broker = Some(mqtt_broker);
596        self
597    }
598
599    #[cfg(feature = "kafka")]
600    /// Add Kafka broker to management state
601    pub fn with_kafka_broker(
602        mut self,
603        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
604    ) -> Self {
605        self.kafka_broker = Some(kafka_broker);
606        self
607    }
608
609    #[cfg(feature = "chaos")]
610    /// Add chaos API state to management state
611    pub fn with_chaos_api_state(
612        mut self,
613        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
614    ) -> Self {
615        self.chaos_api_state = Some(chaos_api_state);
616        self
617    }
618
619    /// Add server configuration to management state
620    pub fn with_server_config(
621        mut self,
622        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
623    ) -> Self {
624        self.server_config = Some(server_config);
625        self
626    }
627}
628
629/// List all mocks
630async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
631    let mocks = state.mocks.read().await;
632    Json(serde_json::json!({
633        "mocks": *mocks,
634        "total": mocks.len(),
635        "enabled": mocks.iter().filter(|m| m.enabled).count()
636    }))
637}
638
639/// Get a specific mock by ID
640async fn get_mock(
641    State(state): State<ManagementState>,
642    Path(id): Path<String>,
643) -> Result<Json<MockConfig>, StatusCode> {
644    let mocks = state.mocks.read().await;
645    mocks
646        .iter()
647        .find(|m| m.id == id)
648        .cloned()
649        .map(Json)
650        .ok_or(StatusCode::NOT_FOUND)
651}
652
653/// Create a new mock
654async fn create_mock(
655    State(state): State<ManagementState>,
656    Json(mut mock): Json<MockConfig>,
657) -> Result<Json<MockConfig>, StatusCode> {
658    let mut mocks = state.mocks.write().await;
659
660    // Generate ID if not provided
661    if mock.id.is_empty() {
662        mock.id = uuid::Uuid::new_v4().to_string();
663    }
664
665    // Check for duplicate ID
666    if mocks.iter().any(|m| m.id == mock.id) {
667        return Err(StatusCode::CONFLICT);
668    }
669
670    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
671
672    // Invoke lifecycle hooks
673    if let Some(hooks) = &state.lifecycle_hooks {
674        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
675            id: mock.id.clone(),
676            name: mock.name.clone(),
677            config: serde_json::to_value(&mock).unwrap_or_default(),
678        };
679        hooks.invoke_mock_created(&event).await;
680    }
681
682    mocks.push(mock.clone());
683
684    // Broadcast WebSocket event
685    if let Some(tx) = &state.ws_broadcast {
686        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
687    }
688
689    Ok(Json(mock))
690}
691
692/// Update an existing mock
693async fn update_mock(
694    State(state): State<ManagementState>,
695    Path(id): Path<String>,
696    Json(updated_mock): Json<MockConfig>,
697) -> Result<Json<MockConfig>, StatusCode> {
698    let mut mocks = state.mocks.write().await;
699
700    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
701
702    // Get old mock for comparison
703    let old_mock = mocks[position].clone();
704
705    info!("Updating mock: {}", id);
706    mocks[position] = updated_mock.clone();
707
708    // Invoke lifecycle hooks
709    if let Some(hooks) = &state.lifecycle_hooks {
710        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
711            id: updated_mock.id.clone(),
712            name: updated_mock.name.clone(),
713            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
714        };
715        hooks.invoke_mock_updated(&event).await;
716
717        // Check if enabled state changed
718        if old_mock.enabled != updated_mock.enabled {
719            let state_event = if updated_mock.enabled {
720                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
721                    id: updated_mock.id.clone(),
722                }
723            } else {
724                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
725                    id: updated_mock.id.clone(),
726                }
727            };
728            hooks.invoke_mock_state_changed(&state_event).await;
729        }
730    }
731
732    // Broadcast WebSocket event
733    if let Some(tx) = &state.ws_broadcast {
734        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
735    }
736
737    Ok(Json(updated_mock))
738}
739
740/// Delete a mock
741async fn delete_mock(
742    State(state): State<ManagementState>,
743    Path(id): Path<String>,
744) -> Result<StatusCode, StatusCode> {
745    let mut mocks = state.mocks.write().await;
746
747    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
748
749    // Get mock info before deletion for lifecycle hooks
750    let deleted_mock = mocks[position].clone();
751
752    info!("Deleting mock: {}", id);
753    mocks.remove(position);
754
755    // Invoke lifecycle hooks
756    if let Some(hooks) = &state.lifecycle_hooks {
757        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
758            id: deleted_mock.id.clone(),
759            name: deleted_mock.name.clone(),
760        };
761        hooks.invoke_mock_deleted(&event).await;
762    }
763
764    // Broadcast WebSocket event
765    if let Some(tx) = &state.ws_broadcast {
766        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
767    }
768
769    Ok(StatusCode::NO_CONTENT)
770}
771
772/// Request to validate configuration
773#[derive(Debug, Deserialize)]
774pub struct ValidateConfigRequest {
775    /// Configuration to validate (as JSON)
776    pub config: serde_json::Value,
777    /// Format of the configuration ("json" or "yaml")
778    #[serde(default = "default_format")]
779    pub format: String,
780}
781
782fn default_format() -> String {
783    "json".to_string()
784}
785
786/// Validate configuration without applying it
787async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
788    use mockforge_core::config::ServerConfig;
789
790    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
791        "yaml" | "yml" => {
792            let yaml_str = match serde_json::to_string(&request.config) {
793                Ok(s) => s,
794                Err(e) => {
795                    return (
796                        StatusCode::BAD_REQUEST,
797                        Json(serde_json::json!({
798                            "valid": false,
799                            "error": format!("Failed to convert to string: {}", e),
800                            "message": "Configuration validation failed"
801                        })),
802                    )
803                        .into_response();
804                }
805            };
806            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
807        }
808        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
809    };
810
811    match config_result {
812        Ok(_) => Json(serde_json::json!({
813            "valid": true,
814            "message": "Configuration is valid"
815        }))
816        .into_response(),
817        Err(e) => (
818            StatusCode::BAD_REQUEST,
819            Json(serde_json::json!({
820                "valid": false,
821                "error": format!("Invalid configuration: {}", e),
822                "message": "Configuration validation failed"
823            })),
824        )
825            .into_response(),
826    }
827}
828
829/// Request for bulk configuration update
830#[derive(Debug, Deserialize)]
831pub struct BulkConfigUpdateRequest {
832    /// Partial configuration updates (only specified fields will be updated)
833    pub updates: serde_json::Value,
834}
835
836/// Bulk update configuration
837///
838/// This endpoint allows updating multiple configuration options at once.
839/// Only the specified fields in the updates object will be modified.
840///
841/// Configuration updates are applied to the server configuration if available
842/// in ManagementState. Changes take effect immediately for supported settings.
843async fn bulk_update_config(
844    State(state): State<ManagementState>,
845    Json(request): Json<BulkConfigUpdateRequest>,
846) -> impl IntoResponse {
847    // Validate the updates structure
848    if !request.updates.is_object() {
849        return (
850            StatusCode::BAD_REQUEST,
851            Json(serde_json::json!({
852                "error": "Invalid request",
853                "message": "Updates must be a JSON object"
854            })),
855        )
856            .into_response();
857    }
858
859    // Try to validate as partial ServerConfig
860    use mockforge_core::config::ServerConfig;
861
862    // Create a minimal valid config and try to merge updates
863    let base_config = ServerConfig::default();
864    let base_json = match serde_json::to_value(&base_config) {
865        Ok(v) => v,
866        Err(e) => {
867            return (
868                StatusCode::INTERNAL_SERVER_ERROR,
869                Json(serde_json::json!({
870                    "error": "Internal error",
871                    "message": format!("Failed to serialize base config: {}", e)
872                })),
873            )
874                .into_response();
875        }
876    };
877
878    // Merge updates into base config (simplified merge)
879    let mut merged = base_json.clone();
880    if let (Some(merged_obj), Some(updates_obj)) =
881        (merged.as_object_mut(), request.updates.as_object())
882    {
883        for (key, value) in updates_obj {
884            merged_obj.insert(key.clone(), value.clone());
885        }
886    }
887
888    // Validate the merged config
889    match serde_json::from_value::<ServerConfig>(merged) {
890        Ok(_) => {
891            // Config is valid
892            // Note: Runtime application of config changes would require:
893            // 1. Storing ServerConfig in ManagementState
894            // 2. Implementing hot-reload mechanism for server configuration
895            // 3. Updating router state and middleware based on new config
896            // For now, this endpoint only validates the configuration structure
897            Json(serde_json::json!({
898                "success": true,
899                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
900                "updates_received": request.updates,
901                "validated": true
902            }))
903            .into_response()
904        }
905        Err(e) => (
906            StatusCode::BAD_REQUEST,
907            Json(serde_json::json!({
908                "error": "Invalid configuration",
909                "message": format!("Configuration validation failed: {}", e),
910                "validated": false
911            })),
912        )
913            .into_response(),
914    }
915}
916
917/// Get server statistics
918async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
919    let mocks = state.mocks.read().await;
920    let request_count = *state.request_counter.read().await;
921
922    Json(ServerStats {
923        uptime_seconds: state.start_time.elapsed().as_secs(),
924        total_requests: request_count,
925        active_mocks: mocks.len(),
926        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
927        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
928    })
929}
930
931/// Get server configuration
932async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
933    Json(ServerConfig {
934        version: env!("CARGO_PKG_VERSION").to_string(),
935        port: state.port,
936        has_openapi_spec: state.spec.is_some(),
937        spec_path: state.spec_path.clone(),
938    })
939}
940
941/// Health check endpoint
942async fn health_check() -> Json<serde_json::Value> {
943    Json(serde_json::json!({
944        "status": "healthy",
945        "service": "mockforge-management",
946        "timestamp": chrono::Utc::now().to_rfc3339()
947    }))
948}
949
950/// Export format for mock configurations
951#[derive(Debug, Clone, Serialize, Deserialize)]
952#[serde(rename_all = "lowercase")]
953pub enum ExportFormat {
954    /// JSON format
955    Json,
956    /// YAML format
957    Yaml,
958}
959
960/// Export mocks in specified format
961async fn export_mocks(
962    State(state): State<ManagementState>,
963    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
964) -> Result<(StatusCode, String), StatusCode> {
965    let mocks = state.mocks.read().await;
966
967    let format = params
968        .get("format")
969        .map(|f| match f.as_str() {
970            "yaml" | "yml" => ExportFormat::Yaml,
971            _ => ExportFormat::Json,
972        })
973        .unwrap_or(ExportFormat::Json);
974
975    match format {
976        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
977            .map(|json| (StatusCode::OK, json))
978            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
979        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
980            .map(|yaml| (StatusCode::OK, yaml))
981            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
982    }
983}
984
985/// Import mocks from JSON/YAML
986async fn import_mocks(
987    State(state): State<ManagementState>,
988    Json(mocks): Json<Vec<MockConfig>>,
989) -> impl IntoResponse {
990    let mut current_mocks = state.mocks.write().await;
991    current_mocks.clear();
992    current_mocks.extend(mocks);
993    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
994}
995
996#[cfg(feature = "smtp")]
997/// List SMTP emails in mailbox
998async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
999    if let Some(ref smtp_registry) = state.smtp_registry {
1000        match smtp_registry.get_emails() {
1001            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1002            Err(e) => (
1003                StatusCode::INTERNAL_SERVER_ERROR,
1004                Json(serde_json::json!({
1005                    "error": "Failed to retrieve emails",
1006                    "message": e.to_string()
1007                })),
1008            ),
1009        }
1010    } else {
1011        (
1012            StatusCode::NOT_IMPLEMENTED,
1013            Json(serde_json::json!({
1014                "error": "SMTP mailbox management not available",
1015                "message": "SMTP server is not enabled or registry not available."
1016            })),
1017        )
1018    }
1019}
1020
1021/// Get specific SMTP email
1022#[cfg(feature = "smtp")]
1023async fn get_smtp_email(
1024    State(state): State<ManagementState>,
1025    Path(id): Path<String>,
1026) -> impl IntoResponse {
1027    if let Some(ref smtp_registry) = state.smtp_registry {
1028        match smtp_registry.get_email_by_id(&id) {
1029            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1030            Ok(None) => (
1031                StatusCode::NOT_FOUND,
1032                Json(serde_json::json!({
1033                    "error": "Email not found",
1034                    "id": id
1035                })),
1036            ),
1037            Err(e) => (
1038                StatusCode::INTERNAL_SERVER_ERROR,
1039                Json(serde_json::json!({
1040                    "error": "Failed to retrieve email",
1041                    "message": e.to_string()
1042                })),
1043            ),
1044        }
1045    } else {
1046        (
1047            StatusCode::NOT_IMPLEMENTED,
1048            Json(serde_json::json!({
1049                "error": "SMTP mailbox management not available",
1050                "message": "SMTP server is not enabled or registry not available."
1051            })),
1052        )
1053    }
1054}
1055
1056/// Clear SMTP mailbox
1057#[cfg(feature = "smtp")]
1058async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1059    if let Some(ref smtp_registry) = state.smtp_registry {
1060        match smtp_registry.clear_mailbox() {
1061            Ok(()) => (
1062                StatusCode::OK,
1063                Json(serde_json::json!({
1064                    "message": "Mailbox cleared successfully"
1065                })),
1066            ),
1067            Err(e) => (
1068                StatusCode::INTERNAL_SERVER_ERROR,
1069                Json(serde_json::json!({
1070                    "error": "Failed to clear mailbox",
1071                    "message": e.to_string()
1072                })),
1073            ),
1074        }
1075    } else {
1076        (
1077            StatusCode::NOT_IMPLEMENTED,
1078            Json(serde_json::json!({
1079                "error": "SMTP mailbox management not available",
1080                "message": "SMTP server is not enabled or registry not available."
1081            })),
1082        )
1083    }
1084}
1085
1086/// Export SMTP mailbox
1087#[cfg(feature = "smtp")]
1088async fn export_smtp_mailbox(
1089    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1090) -> impl IntoResponse {
1091    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1092    (
1093        StatusCode::NOT_IMPLEMENTED,
1094        Json(serde_json::json!({
1095            "error": "SMTP mailbox management not available via HTTP API",
1096            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1097            "requested_format": format
1098        })),
1099    )
1100}
1101
1102/// Search SMTP emails
1103#[cfg(feature = "smtp")]
1104async fn search_smtp_emails(
1105    State(state): State<ManagementState>,
1106    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1107) -> impl IntoResponse {
1108    if let Some(ref smtp_registry) = state.smtp_registry {
1109        let filters = EmailSearchFilters {
1110            sender: params.get("sender").cloned(),
1111            recipient: params.get("recipient").cloned(),
1112            subject: params.get("subject").cloned(),
1113            body: params.get("body").cloned(),
1114            since: params
1115                .get("since")
1116                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1117                .map(|dt| dt.with_timezone(&chrono::Utc)),
1118            until: params
1119                .get("until")
1120                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1121                .map(|dt| dt.with_timezone(&chrono::Utc)),
1122            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1123            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1124        };
1125
1126        match smtp_registry.search_emails(filters) {
1127            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1128            Err(e) => (
1129                StatusCode::INTERNAL_SERVER_ERROR,
1130                Json(serde_json::json!({
1131                    "error": "Failed to search emails",
1132                    "message": e.to_string()
1133                })),
1134            ),
1135        }
1136    } else {
1137        (
1138            StatusCode::NOT_IMPLEMENTED,
1139            Json(serde_json::json!({
1140                "error": "SMTP mailbox management not available",
1141                "message": "SMTP server is not enabled or registry not available."
1142            })),
1143        )
1144    }
1145}
1146
1147/// MQTT broker statistics
1148#[cfg(feature = "mqtt")]
1149#[derive(Debug, Clone, Serialize, Deserialize)]
1150pub struct MqttBrokerStats {
1151    /// Number of connected MQTT clients
1152    pub connected_clients: usize,
1153    /// Number of active MQTT topics
1154    pub active_topics: usize,
1155    /// Number of retained messages
1156    pub retained_messages: usize,
1157    /// Total number of subscriptions
1158    pub total_subscriptions: usize,
1159}
1160
1161/// MQTT management handlers
1162#[cfg(feature = "mqtt")]
1163async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1164    if let Some(broker) = &state.mqtt_broker {
1165        let connected_clients = broker.get_connected_clients().await.len();
1166        let active_topics = broker.get_active_topics().await.len();
1167        let stats = broker.get_topic_stats().await;
1168
1169        let broker_stats = MqttBrokerStats {
1170            connected_clients,
1171            active_topics,
1172            retained_messages: stats.retained_messages,
1173            total_subscriptions: stats.total_subscriptions,
1174        };
1175
1176        Json(broker_stats).into_response()
1177    } else {
1178        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1179    }
1180}
1181
1182#[cfg(feature = "mqtt")]
1183async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1184    if let Some(broker) = &state.mqtt_broker {
1185        let clients = broker.get_connected_clients().await;
1186        Json(serde_json::json!({
1187            "clients": clients
1188        }))
1189        .into_response()
1190    } else {
1191        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1192    }
1193}
1194
1195#[cfg(feature = "mqtt")]
1196async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1197    if let Some(broker) = &state.mqtt_broker {
1198        let topics = broker.get_active_topics().await;
1199        Json(serde_json::json!({
1200            "topics": topics
1201        }))
1202        .into_response()
1203    } else {
1204        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1205    }
1206}
1207
1208#[cfg(feature = "mqtt")]
1209async fn disconnect_mqtt_client(
1210    State(state): State<ManagementState>,
1211    Path(client_id): Path<String>,
1212) -> impl IntoResponse {
1213    if let Some(broker) = &state.mqtt_broker {
1214        match broker.disconnect_client(&client_id).await {
1215            Ok(_) => {
1216                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1217            }
1218            Err(e) => {
1219                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1220                    .into_response()
1221            }
1222        }
1223    } else {
1224        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1225    }
1226}
1227
1228// ========== MQTT Publish Handler ==========
1229
1230#[cfg(feature = "mqtt")]
1231/// Request to publish a single MQTT message
1232#[derive(Debug, Deserialize)]
1233pub struct MqttPublishRequest {
1234    /// Topic to publish to
1235    pub topic: String,
1236    /// Message payload (string or JSON)
1237    pub payload: String,
1238    /// QoS level (0, 1, or 2)
1239    #[serde(default = "default_qos")]
1240    pub qos: u8,
1241    /// Whether to retain the message
1242    #[serde(default)]
1243    pub retain: bool,
1244}
1245
1246#[cfg(feature = "mqtt")]
1247fn default_qos() -> u8 {
1248    0
1249}
1250
1251#[cfg(feature = "mqtt")]
1252/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1253async fn publish_mqtt_message_handler(
1254    State(state): State<ManagementState>,
1255    Json(request): Json<serde_json::Value>,
1256) -> impl IntoResponse {
1257    // Extract fields from JSON manually
1258    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1259    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1260    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1261    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1262
1263    if topic.is_none() || payload.is_none() {
1264        return (
1265            StatusCode::BAD_REQUEST,
1266            Json(serde_json::json!({
1267                "error": "Invalid request",
1268                "message": "Missing required fields: topic and payload"
1269            })),
1270        );
1271    }
1272
1273    let topic = topic.unwrap();
1274    let payload = payload.unwrap();
1275
1276    if let Some(broker) = &state.mqtt_broker {
1277        // Validate QoS
1278        if qos > 2 {
1279            return (
1280                StatusCode::BAD_REQUEST,
1281                Json(serde_json::json!({
1282                    "error": "Invalid QoS",
1283                    "message": "QoS must be 0, 1, or 2"
1284                })),
1285            );
1286        }
1287
1288        // Convert payload to bytes
1289        let payload_bytes = payload.as_bytes().to_vec();
1290        let client_id = "mockforge-management-api".to_string();
1291
1292        let publish_result = broker
1293            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1294            .await
1295            .map_err(|e| format!("{}", e));
1296
1297        match publish_result {
1298            Ok(_) => {
1299                // Emit message event for real-time monitoring
1300                let event = MessageEvent::Mqtt(MqttMessageEvent {
1301                    topic: topic.clone(),
1302                    payload: payload.clone(),
1303                    qos,
1304                    retain,
1305                    timestamp: chrono::Utc::now().to_rfc3339(),
1306                });
1307                let _ = state.message_events.send(event);
1308
1309                (
1310                    StatusCode::OK,
1311                    Json(serde_json::json!({
1312                        "success": true,
1313                        "message": format!("Message published to topic '{}'", topic),
1314                        "topic": topic,
1315                        "qos": qos,
1316                        "retain": retain
1317                    })),
1318                )
1319            }
1320            Err(error_msg) => (
1321                StatusCode::INTERNAL_SERVER_ERROR,
1322                Json(serde_json::json!({
1323                    "error": "Failed to publish message",
1324                    "message": error_msg
1325                })),
1326            ),
1327        }
1328    } else {
1329        (
1330            StatusCode::SERVICE_UNAVAILABLE,
1331            Json(serde_json::json!({
1332                "error": "MQTT broker not available",
1333                "message": "MQTT broker is not enabled or not available."
1334            })),
1335        )
1336    }
1337}
1338
1339#[cfg(not(feature = "mqtt"))]
1340/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1341async fn publish_mqtt_message_handler(
1342    State(_state): State<ManagementState>,
1343    Json(_request): Json<serde_json::Value>,
1344) -> impl IntoResponse {
1345    (
1346        StatusCode::SERVICE_UNAVAILABLE,
1347        Json(serde_json::json!({
1348            "error": "MQTT feature not enabled",
1349            "message": "MQTT support is not compiled into this build"
1350        })),
1351    )
1352}
1353
1354#[cfg(feature = "mqtt")]
1355/// Request to publish multiple MQTT messages
1356#[derive(Debug, Deserialize)]
1357pub struct MqttBatchPublishRequest {
1358    /// List of messages to publish
1359    pub messages: Vec<MqttPublishRequest>,
1360    /// Delay between messages in milliseconds
1361    #[serde(default = "default_delay")]
1362    pub delay_ms: u64,
1363}
1364
1365#[cfg(feature = "mqtt")]
1366fn default_delay() -> u64 {
1367    100
1368}
1369
1370#[cfg(feature = "mqtt")]
1371/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1372async fn publish_mqtt_batch_handler(
1373    State(state): State<ManagementState>,
1374    Json(request): Json<serde_json::Value>,
1375) -> impl IntoResponse {
1376    // Extract fields from JSON manually
1377    let messages_json = request.get("messages").and_then(|v| v.as_array());
1378    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1379
1380    if messages_json.is_none() {
1381        return (
1382            StatusCode::BAD_REQUEST,
1383            Json(serde_json::json!({
1384                "error": "Invalid request",
1385                "message": "Missing required field: messages"
1386            })),
1387        );
1388    }
1389
1390    let messages_json = messages_json.unwrap();
1391
1392    if let Some(broker) = &state.mqtt_broker {
1393        if messages_json.is_empty() {
1394            return (
1395                StatusCode::BAD_REQUEST,
1396                Json(serde_json::json!({
1397                    "error": "Empty batch",
1398                    "message": "At least one message is required"
1399                })),
1400            );
1401        }
1402
1403        let mut results = Vec::new();
1404        let client_id = "mockforge-management-api".to_string();
1405
1406        for (index, msg_json) in messages_json.iter().enumerate() {
1407            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1408            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1409            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1410            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1411
1412            if topic.is_none() || payload.is_none() {
1413                results.push(serde_json::json!({
1414                    "index": index,
1415                    "success": false,
1416                    "error": "Missing required fields: topic and payload"
1417                }));
1418                continue;
1419            }
1420
1421            let topic = topic.unwrap();
1422            let payload = payload.unwrap();
1423
1424            // Validate QoS
1425            if qos > 2 {
1426                results.push(serde_json::json!({
1427                    "index": index,
1428                    "success": false,
1429                    "error": "Invalid QoS (must be 0, 1, or 2)"
1430                }));
1431                continue;
1432            }
1433
1434            // Convert payload to bytes
1435            let payload_bytes = payload.as_bytes().to_vec();
1436
1437            let publish_result = broker
1438                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1439                .await
1440                .map_err(|e| format!("{}", e));
1441
1442            match publish_result {
1443                Ok(_) => {
1444                    // Emit message event
1445                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1446                        topic: topic.clone(),
1447                        payload: payload.clone(),
1448                        qos,
1449                        retain,
1450                        timestamp: chrono::Utc::now().to_rfc3339(),
1451                    });
1452                    let _ = state.message_events.send(event);
1453
1454                    results.push(serde_json::json!({
1455                        "index": index,
1456                        "success": true,
1457                        "topic": topic,
1458                        "qos": qos
1459                    }));
1460                }
1461                Err(error_msg) => {
1462                    results.push(serde_json::json!({
1463                        "index": index,
1464                        "success": false,
1465                        "error": error_msg
1466                    }));
1467                }
1468            }
1469
1470            // Add delay between messages (except for the last one)
1471            if index < messages_json.len() - 1 && delay_ms > 0 {
1472                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1473            }
1474        }
1475
1476        let success_count =
1477            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1478
1479        (
1480            StatusCode::OK,
1481            Json(serde_json::json!({
1482                "success": true,
1483                "total": messages_json.len(),
1484                "succeeded": success_count,
1485                "failed": messages_json.len() - success_count,
1486                "results": results
1487            })),
1488        )
1489    } else {
1490        (
1491            StatusCode::SERVICE_UNAVAILABLE,
1492            Json(serde_json::json!({
1493                "error": "MQTT broker not available",
1494                "message": "MQTT broker is not enabled or not available."
1495            })),
1496        )
1497    }
1498}
1499
1500#[cfg(not(feature = "mqtt"))]
1501/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1502async fn publish_mqtt_batch_handler(
1503    State(_state): State<ManagementState>,
1504    Json(_request): Json<serde_json::Value>,
1505) -> impl IntoResponse {
1506    (
1507        StatusCode::SERVICE_UNAVAILABLE,
1508        Json(serde_json::json!({
1509            "error": "MQTT feature not enabled",
1510            "message": "MQTT support is not compiled into this build"
1511        })),
1512    )
1513}
1514
1515// Migration pipeline handlers
1516
1517/// Request to set migration mode
1518#[derive(Debug, Deserialize)]
1519struct SetMigrationModeRequest {
1520    mode: String,
1521}
1522
1523/// Get all migration routes
1524async fn get_migration_routes(
1525    State(state): State<ManagementState>,
1526) -> Result<Json<serde_json::Value>, StatusCode> {
1527    let proxy_config = match &state.proxy_config {
1528        Some(config) => config,
1529        None => {
1530            return Ok(Json(serde_json::json!({
1531                "error": "Migration not configured. Proxy config not available."
1532            })));
1533        }
1534    };
1535
1536    let config = proxy_config.read().await;
1537    let routes = config.get_migration_routes();
1538
1539    Ok(Json(serde_json::json!({
1540        "routes": routes
1541    })))
1542}
1543
1544/// Toggle a route's migration mode
1545async fn toggle_route_migration(
1546    State(state): State<ManagementState>,
1547    Path(pattern): Path<String>,
1548) -> Result<Json<serde_json::Value>, StatusCode> {
1549    let proxy_config = match &state.proxy_config {
1550        Some(config) => config,
1551        None => {
1552            return Ok(Json(serde_json::json!({
1553                "error": "Migration not configured. Proxy config not available."
1554            })));
1555        }
1556    };
1557
1558    let mut config = proxy_config.write().await;
1559    let new_mode = match config.toggle_route_migration(&pattern) {
1560        Some(mode) => mode,
1561        None => {
1562            return Ok(Json(serde_json::json!({
1563                "error": format!("Route pattern not found: {}", pattern)
1564            })));
1565        }
1566    };
1567
1568    Ok(Json(serde_json::json!({
1569        "pattern": pattern,
1570        "mode": format!("{:?}", new_mode).to_lowercase()
1571    })))
1572}
1573
1574/// Set a route's migration mode explicitly
1575async fn set_route_migration_mode(
1576    State(state): State<ManagementState>,
1577    Path(pattern): Path<String>,
1578    Json(request): Json<SetMigrationModeRequest>,
1579) -> Result<Json<serde_json::Value>, StatusCode> {
1580    let proxy_config = match &state.proxy_config {
1581        Some(config) => config,
1582        None => {
1583            return Ok(Json(serde_json::json!({
1584                "error": "Migration not configured. Proxy config not available."
1585            })));
1586        }
1587    };
1588
1589    use mockforge_core::proxy::config::MigrationMode;
1590    let mode = match request.mode.to_lowercase().as_str() {
1591        "mock" => MigrationMode::Mock,
1592        "shadow" => MigrationMode::Shadow,
1593        "real" => MigrationMode::Real,
1594        "auto" => MigrationMode::Auto,
1595        _ => {
1596            return Ok(Json(serde_json::json!({
1597                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1598            })));
1599        }
1600    };
1601
1602    let mut config = proxy_config.write().await;
1603    let updated = config.update_rule_migration_mode(&pattern, mode);
1604
1605    if !updated {
1606        return Ok(Json(serde_json::json!({
1607            "error": format!("Route pattern not found: {}", pattern)
1608        })));
1609    }
1610
1611    Ok(Json(serde_json::json!({
1612        "pattern": pattern,
1613        "mode": format!("{:?}", mode).to_lowercase()
1614    })))
1615}
1616
1617/// Toggle a group's migration mode
1618async fn toggle_group_migration(
1619    State(state): State<ManagementState>,
1620    Path(group): Path<String>,
1621) -> Result<Json<serde_json::Value>, StatusCode> {
1622    let proxy_config = match &state.proxy_config {
1623        Some(config) => config,
1624        None => {
1625            return Ok(Json(serde_json::json!({
1626                "error": "Migration not configured. Proxy config not available."
1627            })));
1628        }
1629    };
1630
1631    let mut config = proxy_config.write().await;
1632    let new_mode = config.toggle_group_migration(&group);
1633
1634    Ok(Json(serde_json::json!({
1635        "group": group,
1636        "mode": format!("{:?}", new_mode).to_lowercase()
1637    })))
1638}
1639
1640/// Set a group's migration mode explicitly
1641async fn set_group_migration_mode(
1642    State(state): State<ManagementState>,
1643    Path(group): Path<String>,
1644    Json(request): Json<SetMigrationModeRequest>,
1645) -> Result<Json<serde_json::Value>, StatusCode> {
1646    let proxy_config = match &state.proxy_config {
1647        Some(config) => config,
1648        None => {
1649            return Ok(Json(serde_json::json!({
1650                "error": "Migration not configured. Proxy config not available."
1651            })));
1652        }
1653    };
1654
1655    use mockforge_core::proxy::config::MigrationMode;
1656    let mode = match request.mode.to_lowercase().as_str() {
1657        "mock" => MigrationMode::Mock,
1658        "shadow" => MigrationMode::Shadow,
1659        "real" => MigrationMode::Real,
1660        "auto" => MigrationMode::Auto,
1661        _ => {
1662            return Ok(Json(serde_json::json!({
1663                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1664            })));
1665        }
1666    };
1667
1668    let mut config = proxy_config.write().await;
1669    config.update_group_migration_mode(&group, mode);
1670
1671    Ok(Json(serde_json::json!({
1672        "group": group,
1673        "mode": format!("{:?}", mode).to_lowercase()
1674    })))
1675}
1676
1677/// Get all migration groups
1678async fn get_migration_groups(
1679    State(state): State<ManagementState>,
1680) -> Result<Json<serde_json::Value>, StatusCode> {
1681    let proxy_config = match &state.proxy_config {
1682        Some(config) => config,
1683        None => {
1684            return Ok(Json(serde_json::json!({
1685                "error": "Migration not configured. Proxy config not available."
1686            })));
1687        }
1688    };
1689
1690    let config = proxy_config.read().await;
1691    let groups = config.get_migration_groups();
1692
1693    // Convert to JSON-serializable format
1694    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1695        .into_iter()
1696        .map(|(name, info)| {
1697            (
1698                name,
1699                serde_json::json!({
1700                    "name": info.name,
1701                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1702                    "route_count": info.route_count
1703                }),
1704            )
1705        })
1706        .collect();
1707
1708    Ok(Json(serde_json::json!(groups_json)))
1709}
1710
1711/// Get overall migration status
1712async fn get_migration_status(
1713    State(state): State<ManagementState>,
1714) -> Result<Json<serde_json::Value>, StatusCode> {
1715    let proxy_config = match &state.proxy_config {
1716        Some(config) => config,
1717        None => {
1718            return Ok(Json(serde_json::json!({
1719                "error": "Migration not configured. Proxy config not available."
1720            })));
1721        }
1722    };
1723
1724    let config = proxy_config.read().await;
1725    let routes = config.get_migration_routes();
1726    let groups = config.get_migration_groups();
1727
1728    let mut mock_count = 0;
1729    let mut shadow_count = 0;
1730    let mut real_count = 0;
1731    let mut auto_count = 0;
1732
1733    for route in &routes {
1734        match route.migration_mode {
1735            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1736            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1737            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1738            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1739        }
1740    }
1741
1742    Ok(Json(serde_json::json!({
1743        "total_routes": routes.len(),
1744        "mock_routes": mock_count,
1745        "shadow_routes": shadow_count,
1746        "real_routes": real_count,
1747        "auto_routes": auto_count,
1748        "total_groups": groups.len(),
1749        "migration_enabled": config.migration_enabled
1750    })))
1751}
1752
1753// ========== Proxy Replacement Rules Management ==========
1754
1755/// Request body for creating/updating proxy replacement rules
1756#[derive(Debug, Deserialize, Serialize)]
1757pub struct ProxyRuleRequest {
1758    /// URL pattern to match (supports wildcards like "/api/users/*")
1759    pub pattern: String,
1760    /// Rule type: "request" or "response"
1761    #[serde(rename = "type")]
1762    pub rule_type: String,
1763    /// Optional status code filter for response rules
1764    #[serde(default)]
1765    pub status_codes: Vec<u16>,
1766    /// Body transformations to apply
1767    pub body_transforms: Vec<BodyTransformRequest>,
1768    /// Whether this rule is enabled
1769    #[serde(default = "default_true")]
1770    pub enabled: bool,
1771}
1772
1773/// Request body for individual body transformations
1774#[derive(Debug, Deserialize, Serialize)]
1775pub struct BodyTransformRequest {
1776    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1777    pub path: String,
1778    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1779    pub replace: String,
1780    /// Operation to perform: "replace", "add", or "remove"
1781    #[serde(default)]
1782    pub operation: String,
1783}
1784
1785/// Response format for proxy rules
1786#[derive(Debug, Serialize)]
1787pub struct ProxyRuleResponse {
1788    /// Rule ID (index in the array)
1789    pub id: usize,
1790    /// URL pattern
1791    pub pattern: String,
1792    /// Rule type
1793    #[serde(rename = "type")]
1794    pub rule_type: String,
1795    /// Status codes (for response rules)
1796    pub status_codes: Vec<u16>,
1797    /// Body transformations
1798    pub body_transforms: Vec<BodyTransformRequest>,
1799    /// Whether enabled
1800    pub enabled: bool,
1801}
1802
1803/// List all proxy replacement rules
1804async fn list_proxy_rules(
1805    State(state): State<ManagementState>,
1806) -> Result<Json<serde_json::Value>, StatusCode> {
1807    let proxy_config = match &state.proxy_config {
1808        Some(config) => config,
1809        None => {
1810            return Ok(Json(serde_json::json!({
1811                "error": "Proxy not configured. Proxy config not available."
1812            })));
1813        }
1814    };
1815
1816    let config = proxy_config.read().await;
1817
1818    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1819
1820    // Add request replacement rules
1821    for (idx, rule) in config.request_replacements.iter().enumerate() {
1822        rules.push(ProxyRuleResponse {
1823            id: idx,
1824            pattern: rule.pattern.clone(),
1825            rule_type: "request".to_string(),
1826            status_codes: Vec::new(),
1827            body_transforms: rule
1828                .body_transforms
1829                .iter()
1830                .map(|t| BodyTransformRequest {
1831                    path: t.path.clone(),
1832                    replace: t.replace.clone(),
1833                    operation: format!("{:?}", t.operation).to_lowercase(),
1834                })
1835                .collect(),
1836            enabled: rule.enabled,
1837        });
1838    }
1839
1840    // Add response replacement rules
1841    let request_count = config.request_replacements.len();
1842    for (idx, rule) in config.response_replacements.iter().enumerate() {
1843        rules.push(ProxyRuleResponse {
1844            id: request_count + idx,
1845            pattern: rule.pattern.clone(),
1846            rule_type: "response".to_string(),
1847            status_codes: rule.status_codes.clone(),
1848            body_transforms: rule
1849                .body_transforms
1850                .iter()
1851                .map(|t| BodyTransformRequest {
1852                    path: t.path.clone(),
1853                    replace: t.replace.clone(),
1854                    operation: format!("{:?}", t.operation).to_lowercase(),
1855                })
1856                .collect(),
1857            enabled: rule.enabled,
1858        });
1859    }
1860
1861    Ok(Json(serde_json::json!({
1862        "rules": rules
1863    })))
1864}
1865
1866/// Create a new proxy replacement rule
1867async fn create_proxy_rule(
1868    State(state): State<ManagementState>,
1869    Json(request): Json<ProxyRuleRequest>,
1870) -> Result<Json<serde_json::Value>, StatusCode> {
1871    let proxy_config = match &state.proxy_config {
1872        Some(config) => config,
1873        None => {
1874            return Ok(Json(serde_json::json!({
1875                "error": "Proxy not configured. Proxy config not available."
1876            })));
1877        }
1878    };
1879
1880    // Validate request
1881    if request.body_transforms.is_empty() {
1882        return Ok(Json(serde_json::json!({
1883            "error": "At least one body transform is required"
1884        })));
1885    }
1886
1887    let body_transforms: Vec<BodyTransform> = request
1888        .body_transforms
1889        .iter()
1890        .map(|t| {
1891            let op = match t.operation.as_str() {
1892                "replace" => TransformOperation::Replace,
1893                "add" => TransformOperation::Add,
1894                "remove" => TransformOperation::Remove,
1895                _ => TransformOperation::Replace,
1896            };
1897            BodyTransform {
1898                path: t.path.clone(),
1899                replace: t.replace.clone(),
1900                operation: op,
1901            }
1902        })
1903        .collect();
1904
1905    let new_rule = BodyTransformRule {
1906        pattern: request.pattern.clone(),
1907        status_codes: request.status_codes.clone(),
1908        body_transforms,
1909        enabled: request.enabled,
1910    };
1911
1912    let mut config = proxy_config.write().await;
1913
1914    let rule_id = if request.rule_type == "request" {
1915        config.request_replacements.push(new_rule);
1916        config.request_replacements.len() - 1
1917    } else if request.rule_type == "response" {
1918        config.response_replacements.push(new_rule);
1919        config.request_replacements.len() + config.response_replacements.len() - 1
1920    } else {
1921        return Ok(Json(serde_json::json!({
1922            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1923        })));
1924    };
1925
1926    Ok(Json(serde_json::json!({
1927        "id": rule_id,
1928        "message": "Rule created successfully"
1929    })))
1930}
1931
1932/// Get a specific proxy replacement rule
1933async fn get_proxy_rule(
1934    State(state): State<ManagementState>,
1935    Path(id): Path<String>,
1936) -> Result<Json<serde_json::Value>, StatusCode> {
1937    let proxy_config = match &state.proxy_config {
1938        Some(config) => config,
1939        None => {
1940            return Ok(Json(serde_json::json!({
1941                "error": "Proxy not configured. Proxy config not available."
1942            })));
1943        }
1944    };
1945
1946    let config = proxy_config.read().await;
1947    let rule_id: usize = match id.parse() {
1948        Ok(id) => id,
1949        Err(_) => {
1950            return Ok(Json(serde_json::json!({
1951                "error": format!("Invalid rule ID: {}", id)
1952            })));
1953        }
1954    };
1955
1956    let request_count = config.request_replacements.len();
1957
1958    if rule_id < request_count {
1959        // Request rule
1960        let rule = &config.request_replacements[rule_id];
1961        Ok(Json(serde_json::json!({
1962            "id": rule_id,
1963            "pattern": rule.pattern,
1964            "type": "request",
1965            "status_codes": [],
1966            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1967                "path": t.path,
1968                "replace": t.replace,
1969                "operation": format!("{:?}", t.operation).to_lowercase()
1970            })).collect::<Vec<_>>(),
1971            "enabled": rule.enabled
1972        })))
1973    } else if rule_id < request_count + config.response_replacements.len() {
1974        // Response rule
1975        let response_idx = rule_id - request_count;
1976        let rule = &config.response_replacements[response_idx];
1977        Ok(Json(serde_json::json!({
1978            "id": rule_id,
1979            "pattern": rule.pattern,
1980            "type": "response",
1981            "status_codes": rule.status_codes,
1982            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1983                "path": t.path,
1984                "replace": t.replace,
1985                "operation": format!("{:?}", t.operation).to_lowercase()
1986            })).collect::<Vec<_>>(),
1987            "enabled": rule.enabled
1988        })))
1989    } else {
1990        Ok(Json(serde_json::json!({
1991            "error": format!("Rule ID {} not found", rule_id)
1992        })))
1993    }
1994}
1995
1996/// Update a proxy replacement rule
1997async fn update_proxy_rule(
1998    State(state): State<ManagementState>,
1999    Path(id): Path<String>,
2000    Json(request): Json<ProxyRuleRequest>,
2001) -> Result<Json<serde_json::Value>, StatusCode> {
2002    let proxy_config = match &state.proxy_config {
2003        Some(config) => config,
2004        None => {
2005            return Ok(Json(serde_json::json!({
2006                "error": "Proxy not configured. Proxy config not available."
2007            })));
2008        }
2009    };
2010
2011    let mut config = proxy_config.write().await;
2012    let rule_id: usize = match id.parse() {
2013        Ok(id) => id,
2014        Err(_) => {
2015            return Ok(Json(serde_json::json!({
2016                "error": format!("Invalid rule ID: {}", id)
2017            })));
2018        }
2019    };
2020
2021    let body_transforms: Vec<BodyTransform> = request
2022        .body_transforms
2023        .iter()
2024        .map(|t| {
2025            let op = match t.operation.as_str() {
2026                "replace" => TransformOperation::Replace,
2027                "add" => TransformOperation::Add,
2028                "remove" => TransformOperation::Remove,
2029                _ => TransformOperation::Replace,
2030            };
2031            BodyTransform {
2032                path: t.path.clone(),
2033                replace: t.replace.clone(),
2034                operation: op,
2035            }
2036        })
2037        .collect();
2038
2039    let updated_rule = BodyTransformRule {
2040        pattern: request.pattern.clone(),
2041        status_codes: request.status_codes.clone(),
2042        body_transforms,
2043        enabled: request.enabled,
2044    };
2045
2046    let request_count = config.request_replacements.len();
2047
2048    if rule_id < request_count {
2049        // Update request rule
2050        config.request_replacements[rule_id] = updated_rule;
2051    } else if rule_id < request_count + config.response_replacements.len() {
2052        // Update response rule
2053        let response_idx = rule_id - request_count;
2054        config.response_replacements[response_idx] = updated_rule;
2055    } else {
2056        return Ok(Json(serde_json::json!({
2057            "error": format!("Rule ID {} not found", rule_id)
2058        })));
2059    }
2060
2061    Ok(Json(serde_json::json!({
2062        "id": rule_id,
2063        "message": "Rule updated successfully"
2064    })))
2065}
2066
2067/// Delete a proxy replacement rule
2068async fn delete_proxy_rule(
2069    State(state): State<ManagementState>,
2070    Path(id): Path<String>,
2071) -> Result<Json<serde_json::Value>, StatusCode> {
2072    let proxy_config = match &state.proxy_config {
2073        Some(config) => config,
2074        None => {
2075            return Ok(Json(serde_json::json!({
2076                "error": "Proxy not configured. Proxy config not available."
2077            })));
2078        }
2079    };
2080
2081    let mut config = proxy_config.write().await;
2082    let rule_id: usize = match id.parse() {
2083        Ok(id) => id,
2084        Err(_) => {
2085            return Ok(Json(serde_json::json!({
2086                "error": format!("Invalid rule ID: {}", id)
2087            })));
2088        }
2089    };
2090
2091    let request_count = config.request_replacements.len();
2092
2093    if rule_id < request_count {
2094        // Delete request rule
2095        config.request_replacements.remove(rule_id);
2096    } else if rule_id < request_count + config.response_replacements.len() {
2097        // Delete response rule
2098        let response_idx = rule_id - request_count;
2099        config.response_replacements.remove(response_idx);
2100    } else {
2101        return Ok(Json(serde_json::json!({
2102            "error": format!("Rule ID {} not found", rule_id)
2103        })));
2104    }
2105
2106    Ok(Json(serde_json::json!({
2107        "id": rule_id,
2108        "message": "Rule deleted successfully"
2109    })))
2110}
2111
2112/// Get recent intercepted requests/responses for inspection
2113/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2114async fn get_proxy_inspect(
2115    State(_state): State<ManagementState>,
2116    Query(params): Query<std::collections::HashMap<String, String>>,
2117) -> Result<Json<serde_json::Value>, StatusCode> {
2118    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2119
2120    // Note: Request/response inspection would require:
2121    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2122    // 2. Integrating with proxy middleware to capture traffic
2123    // 3. Implementing filtering and pagination for large volumes of traffic
2124    // For now, return an empty response structure indicating the feature is not yet implemented
2125    Ok(Json(serde_json::json!({
2126        "requests": [],
2127        "responses": [],
2128        "limit": limit,
2129        "total": 0,
2130        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2131    })))
2132}
2133
2134/// Build the management API router
2135pub fn management_router(state: ManagementState) -> Router {
2136    let router = Router::new()
2137        .route("/health", get(health_check))
2138        .route("/stats", get(get_stats))
2139        .route("/config", get(get_config))
2140        .route("/config/validate", post(validate_config))
2141        .route("/config/bulk", post(bulk_update_config))
2142        .route("/mocks", get(list_mocks))
2143        .route("/mocks", post(create_mock))
2144        .route("/mocks/{id}", get(get_mock))
2145        .route("/mocks/{id}", put(update_mock))
2146        .route("/mocks/{id}", delete(delete_mock))
2147        .route("/export", get(export_mocks))
2148        .route("/import", post(import_mocks));
2149
2150    #[cfg(feature = "smtp")]
2151    let router = router
2152        .route("/smtp/mailbox", get(list_smtp_emails))
2153        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2154        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2155        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2156        .route("/smtp/mailbox/search", get(search_smtp_emails));
2157
2158    #[cfg(not(feature = "smtp"))]
2159    let router = router;
2160
2161    // MQTT routes
2162    #[cfg(feature = "mqtt")]
2163    let router = router
2164        .route("/mqtt/stats", get(get_mqtt_stats))
2165        .route("/mqtt/clients", get(get_mqtt_clients))
2166        .route("/mqtt/topics", get(get_mqtt_topics))
2167        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2168        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2169        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2170        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2171
2172    #[cfg(not(feature = "mqtt"))]
2173    let router = router
2174        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2175        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2176
2177    #[cfg(feature = "kafka")]
2178    let router = router
2179        .route("/kafka/stats", get(get_kafka_stats))
2180        .route("/kafka/topics", get(get_kafka_topics))
2181        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2182        .route("/kafka/groups", get(get_kafka_groups))
2183        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2184        .route("/kafka/produce", post(produce_kafka_message))
2185        .route("/kafka/produce/batch", post(produce_kafka_batch))
2186        .route("/kafka/messages/stream", get(kafka_messages_stream));
2187
2188    #[cfg(not(feature = "kafka"))]
2189    let router = router;
2190
2191    // Migration pipeline routes
2192    let router = router
2193        .route("/migration/routes", get(get_migration_routes))
2194        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2195        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2196        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2197        .route("/migration/groups/{group}", put(set_group_migration_mode))
2198        .route("/migration/groups", get(get_migration_groups))
2199        .route("/migration/status", get(get_migration_status));
2200
2201    // Proxy replacement rules routes
2202    let router = router
2203        .route("/proxy/rules", get(list_proxy_rules))
2204        .route("/proxy/rules", post(create_proxy_rule))
2205        .route("/proxy/rules/{id}", get(get_proxy_rule))
2206        .route("/proxy/rules/{id}", put(update_proxy_rule))
2207        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2208        .route("/proxy/inspect", get(get_proxy_inspect));
2209
2210    // AI-powered features
2211    let router = router
2212        .route("/ai/generate-spec", post(generate_ai_spec))
2213        .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2214        .route("/mockai/learn", post(learn_from_examples))
2215        .route("/mockai/rules/explanations", get(list_rule_explanations))
2216        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2217        .route("/chaos/config", get(get_chaos_config))
2218        .route("/chaos/config", post(update_chaos_config))
2219        .route("/network/profiles", get(list_network_profiles))
2220        .route("/network/profile/apply", post(apply_network_profile));
2221
2222    // State machine API routes
2223    let router =
2224        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2225
2226    router.with_state(state)
2227}
2228
2229#[cfg(feature = "kafka")]
2230#[derive(Debug, Clone, Serialize, Deserialize)]
2231pub struct KafkaBrokerStats {
2232    /// Number of topics
2233    pub topics: usize,
2234    /// Total number of partitions
2235    pub partitions: usize,
2236    /// Number of consumer groups
2237    pub consumer_groups: usize,
2238    /// Total messages produced
2239    pub messages_produced: u64,
2240    /// Total messages consumed
2241    pub messages_consumed: u64,
2242}
2243
2244#[cfg(feature = "kafka")]
2245#[derive(Debug, Clone, Serialize, Deserialize)]
2246pub struct KafkaTopicInfo {
2247    pub name: String,
2248    pub partitions: usize,
2249    pub replication_factor: i32,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaConsumerGroupInfo {
2255    pub group_id: String,
2256    pub members: usize,
2257    pub state: String,
2258}
2259
2260#[cfg(feature = "kafka")]
2261/// Get Kafka broker statistics
2262async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2263    if let Some(broker) = &state.kafka_broker {
2264        let topics = broker.topics.read().await;
2265        let consumer_groups = broker.consumer_groups.read().await;
2266        let metrics = broker.metrics.clone();
2267
2268        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2269        let snapshot = metrics.snapshot();
2270        let messages_produced = snapshot.messages_produced_total;
2271        let messages_consumed = snapshot.messages_consumed_total;
2272
2273        let stats = KafkaBrokerStats {
2274            topics: topics.len(),
2275            partitions: total_partitions,
2276            consumer_groups: consumer_groups.groups().len(),
2277            messages_produced,
2278            messages_consumed,
2279        };
2280
2281        Json(stats).into_response()
2282    } else {
2283        (
2284            StatusCode::SERVICE_UNAVAILABLE,
2285            Json(serde_json::json!({
2286                "error": "Kafka broker not available",
2287                "message": "Kafka broker is not enabled or not available."
2288            })),
2289        )
2290            .into_response()
2291    }
2292}
2293
2294#[cfg(feature = "kafka")]
2295/// List Kafka topics
2296async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2297    if let Some(broker) = &state.kafka_broker {
2298        let topics = broker.topics.read().await;
2299        let topic_list: Vec<KafkaTopicInfo> = topics
2300            .iter()
2301            .map(|(name, topic)| KafkaTopicInfo {
2302                name: name.clone(),
2303                partitions: topic.partitions.len(),
2304                replication_factor: topic.config.replication_factor,
2305            })
2306            .collect();
2307
2308        Json(serde_json::json!({
2309            "topics": topic_list
2310        }))
2311        .into_response()
2312    } else {
2313        (
2314            StatusCode::SERVICE_UNAVAILABLE,
2315            Json(serde_json::json!({
2316                "error": "Kafka broker not available",
2317                "message": "Kafka broker is not enabled or not available."
2318            })),
2319        )
2320            .into_response()
2321    }
2322}
2323
2324#[cfg(feature = "kafka")]
2325/// Get Kafka topic details
2326async fn get_kafka_topic(
2327    State(state): State<ManagementState>,
2328    Path(topic_name): Path<String>,
2329) -> impl IntoResponse {
2330    if let Some(broker) = &state.kafka_broker {
2331        let topics = broker.topics.read().await;
2332        if let Some(topic) = topics.get(&topic_name) {
2333            Json(serde_json::json!({
2334                "name": topic_name,
2335                "partitions": topic.partitions.len(),
2336                "replication_factor": topic.config.replication_factor,
2337                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2338                    "id": idx as i32,
2339                    "leader": 0,
2340                    "replicas": vec![0],
2341                    "message_count": partition.messages.len()
2342                })).collect::<Vec<_>>()
2343            })).into_response()
2344        } else {
2345            (
2346                StatusCode::NOT_FOUND,
2347                Json(serde_json::json!({
2348                    "error": "Topic not found",
2349                    "topic": topic_name
2350                })),
2351            )
2352                .into_response()
2353        }
2354    } else {
2355        (
2356            StatusCode::SERVICE_UNAVAILABLE,
2357            Json(serde_json::json!({
2358                "error": "Kafka broker not available",
2359                "message": "Kafka broker is not enabled or not available."
2360            })),
2361        )
2362            .into_response()
2363    }
2364}
2365
2366#[cfg(feature = "kafka")]
2367/// List Kafka consumer groups
2368async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2369    if let Some(broker) = &state.kafka_broker {
2370        let consumer_groups = broker.consumer_groups.read().await;
2371        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2372            .groups()
2373            .iter()
2374            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2375                group_id: group_id.clone(),
2376                members: group.members.len(),
2377                state: "Stable".to_string(), // Simplified - could be more detailed
2378            })
2379            .collect();
2380
2381        Json(serde_json::json!({
2382            "groups": groups
2383        }))
2384        .into_response()
2385    } else {
2386        (
2387            StatusCode::SERVICE_UNAVAILABLE,
2388            Json(serde_json::json!({
2389                "error": "Kafka broker not available",
2390                "message": "Kafka broker is not enabled or not available."
2391            })),
2392        )
2393            .into_response()
2394    }
2395}
2396
2397#[cfg(feature = "kafka")]
2398/// Get Kafka consumer group details
2399async fn get_kafka_group(
2400    State(state): State<ManagementState>,
2401    Path(group_id): Path<String>,
2402) -> impl IntoResponse {
2403    if let Some(broker) = &state.kafka_broker {
2404        let consumer_groups = broker.consumer_groups.read().await;
2405        if let Some(group) = consumer_groups.groups().get(&group_id) {
2406            Json(serde_json::json!({
2407                "group_id": group_id,
2408                "members": group.members.len(),
2409                "state": "Stable",
2410                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2411                    "member_id": member_id,
2412                    "client_id": member.client_id,
2413                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2414                        "topic": a.topic,
2415                        "partitions": a.partitions
2416                    })).collect::<Vec<_>>()
2417                })).collect::<Vec<_>>(),
2418                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2419                    "topic": topic,
2420                    "partition": partition,
2421                    "offset": offset
2422                })).collect::<Vec<_>>()
2423            })).into_response()
2424        } else {
2425            (
2426                StatusCode::NOT_FOUND,
2427                Json(serde_json::json!({
2428                    "error": "Consumer group not found",
2429                    "group_id": group_id
2430                })),
2431            )
2432                .into_response()
2433        }
2434    } else {
2435        (
2436            StatusCode::SERVICE_UNAVAILABLE,
2437            Json(serde_json::json!({
2438                "error": "Kafka broker not available",
2439                "message": "Kafka broker is not enabled or not available."
2440            })),
2441        )
2442            .into_response()
2443    }
2444}
2445
2446// ========== Kafka Produce Handler ==========
2447
2448#[cfg(feature = "kafka")]
2449#[derive(Debug, Deserialize)]
2450pub struct KafkaProduceRequest {
2451    /// Topic to produce to
2452    pub topic: String,
2453    /// Message key (optional)
2454    #[serde(default)]
2455    pub key: Option<String>,
2456    /// Message value (JSON string or plain string)
2457    pub value: String,
2458    /// Partition ID (optional, auto-assigned if not provided)
2459    #[serde(default)]
2460    pub partition: Option<i32>,
2461    /// Message headers (optional, key-value pairs)
2462    #[serde(default)]
2463    pub headers: Option<std::collections::HashMap<String, String>>,
2464}
2465
2466#[cfg(feature = "kafka")]
2467/// Produce a message to a Kafka topic
2468async fn produce_kafka_message(
2469    State(state): State<ManagementState>,
2470    Json(request): Json<KafkaProduceRequest>,
2471) -> impl IntoResponse {
2472    if let Some(broker) = &state.kafka_broker {
2473        let mut topics = broker.topics.write().await;
2474
2475        // Get or create the topic
2476        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2477            crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2478        });
2479
2480        // Determine partition
2481        let partition_id = if let Some(partition) = request.partition {
2482            partition
2483        } else {
2484            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2485        };
2486
2487        // Validate partition exists
2488        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2489            return (
2490                StatusCode::BAD_REQUEST,
2491                Json(serde_json::json!({
2492                    "error": "Invalid partition",
2493                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2494                })),
2495            )
2496                .into_response();
2497        }
2498
2499        // Create the message
2500        let message = crate::partitions::KafkaMessage {
2501            offset: 0, // Will be set by partition.append
2502            timestamp: chrono::Utc::now().timestamp_millis(),
2503            key: request.key.map(|k| k.as_bytes().to_vec()),
2504            value: request.value.as_bytes().to_vec(),
2505            headers: request
2506                .headers
2507                .unwrap_or_default()
2508                .into_iter()
2509                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2510                .collect(),
2511        };
2512
2513        // Produce to partition
2514        match topic_entry.produce(partition_id, message).await {
2515            Ok(offset) => {
2516                // Record metrics
2517                broker.metrics.record_messages_produced(1);
2518
2519                // Emit message event for real-time monitoring
2520                #[cfg(feature = "kafka")]
2521                {
2522                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2523                        topic: request.topic.clone(),
2524                        key: request.key.clone(),
2525                        value: request.value.clone(),
2526                        partition: partition_id,
2527                        offset,
2528                        headers: request.headers.clone(),
2529                        timestamp: chrono::Utc::now().to_rfc3339(),
2530                    });
2531                    let _ = state.message_events.send(event);
2532                }
2533
2534                Json(serde_json::json!({
2535                    "success": true,
2536                    "message": format!("Message produced to topic '{}'", request.topic),
2537                    "topic": request.topic,
2538                    "partition": partition_id,
2539                    "offset": offset
2540                }))
2541                .into_response()
2542            }
2543            Err(e) => (
2544                StatusCode::INTERNAL_SERVER_ERROR,
2545                Json(serde_json::json!({
2546                    "error": "Failed to produce message",
2547                    "message": e.to_string()
2548                })),
2549            )
2550                .into_response(),
2551        }
2552    } else {
2553        (
2554            StatusCode::SERVICE_UNAVAILABLE,
2555            Json(serde_json::json!({
2556                "error": "Kafka broker not available",
2557                "message": "Kafka broker is not enabled or not available."
2558            })),
2559        )
2560            .into_response()
2561    }
2562}
2563
2564#[cfg(feature = "kafka")]
2565#[derive(Debug, Deserialize)]
2566pub struct KafkaBatchProduceRequest {
2567    /// List of messages to produce
2568    pub messages: Vec<KafkaProduceRequest>,
2569    /// Delay between messages in milliseconds
2570    #[serde(default = "default_delay")]
2571    pub delay_ms: u64,
2572}
2573
2574#[cfg(feature = "kafka")]
2575/// Produce multiple messages to Kafka topics
2576async fn produce_kafka_batch(
2577    State(state): State<ManagementState>,
2578    Json(request): Json<KafkaBatchProduceRequest>,
2579) -> impl IntoResponse {
2580    if let Some(broker) = &state.kafka_broker {
2581        if request.messages.is_empty() {
2582            return (
2583                StatusCode::BAD_REQUEST,
2584                Json(serde_json::json!({
2585                    "error": "Empty batch",
2586                    "message": "At least one message is required"
2587                })),
2588            )
2589                .into_response();
2590        }
2591
2592        let mut results = Vec::new();
2593
2594        for (index, msg_request) in request.messages.iter().enumerate() {
2595            let mut topics = broker.topics.write().await;
2596
2597            // Get or create the topic
2598            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2599                crate::topics::Topic::new(
2600                    msg_request.topic.clone(),
2601                    crate::topics::TopicConfig::default(),
2602                )
2603            });
2604
2605            // Determine partition
2606            let partition_id = if let Some(partition) = msg_request.partition {
2607                partition
2608            } else {
2609                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2610            };
2611
2612            // Validate partition exists
2613            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2614                results.push(serde_json::json!({
2615                    "index": index,
2616                    "success": false,
2617                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2618                }));
2619                continue;
2620            }
2621
2622            // Create the message
2623            let message = crate::partitions::KafkaMessage {
2624                offset: 0,
2625                timestamp: chrono::Utc::now().timestamp_millis(),
2626                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2627                value: msg_request.value.as_bytes().to_vec(),
2628                headers: msg_request
2629                    .headers
2630                    .clone()
2631                    .unwrap_or_default()
2632                    .into_iter()
2633                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2634                    .collect(),
2635            };
2636
2637            // Produce to partition
2638            match topic_entry.produce(partition_id, message).await {
2639                Ok(offset) => {
2640                    broker.metrics.record_messages_produced(1);
2641
2642                    // Emit message event
2643                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2644                        topic: msg_request.topic.clone(),
2645                        key: msg_request.key.clone(),
2646                        value: msg_request.value.clone(),
2647                        partition: partition_id,
2648                        offset,
2649                        headers: msg_request.headers.clone(),
2650                        timestamp: chrono::Utc::now().to_rfc3339(),
2651                    });
2652                    let _ = state.message_events.send(event);
2653
2654                    results.push(serde_json::json!({
2655                        "index": index,
2656                        "success": true,
2657                        "topic": msg_request.topic,
2658                        "partition": partition_id,
2659                        "offset": offset
2660                    }));
2661                }
2662                Err(e) => {
2663                    results.push(serde_json::json!({
2664                        "index": index,
2665                        "success": false,
2666                        "error": e.to_string()
2667                    }));
2668                }
2669            }
2670
2671            // Add delay between messages (except for the last one)
2672            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2673                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2674            }
2675        }
2676
2677        let success_count =
2678            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2679
2680        Json(serde_json::json!({
2681            "success": true,
2682            "total": request.messages.len(),
2683            "succeeded": success_count,
2684            "failed": request.messages.len() - success_count,
2685            "results": results
2686        }))
2687        .into_response()
2688    } else {
2689        (
2690            StatusCode::SERVICE_UNAVAILABLE,
2691            Json(serde_json::json!({
2692                "error": "Kafka broker not available",
2693                "message": "Kafka broker is not enabled or not available."
2694            })),
2695        )
2696            .into_response()
2697    }
2698}
2699
2700// ========== Real-time Message Streaming (SSE) ==========
2701
2702#[cfg(feature = "mqtt")]
2703/// SSE stream for MQTT messages
2704async fn mqtt_messages_stream(
2705    State(state): State<ManagementState>,
2706    Query(params): Query<std::collections::HashMap<String, String>>,
2707) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2708    let mut rx = state.message_events.subscribe();
2709    let topic_filter = params.get("topic").cloned();
2710
2711    let stream = stream::unfold(rx, move |mut rx| {
2712        let topic_filter = topic_filter.clone();
2713
2714        async move {
2715            loop {
2716                match rx.recv().await {
2717                    Ok(MessageEvent::Mqtt(event)) => {
2718                        // Apply topic filter if specified
2719                        if let Some(filter) = &topic_filter {
2720                            if !event.topic.contains(filter) {
2721                                continue;
2722                            }
2723                        }
2724
2725                        let event_json = serde_json::json!({
2726                            "protocol": "mqtt",
2727                            "topic": event.topic,
2728                            "payload": event.payload,
2729                            "qos": event.qos,
2730                            "retain": event.retain,
2731                            "timestamp": event.timestamp,
2732                        });
2733
2734                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2735                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2736                            return Some((Ok(sse_event), rx));
2737                        }
2738                    }
2739                    #[cfg(feature = "kafka")]
2740                    Ok(MessageEvent::Kafka(_)) => {
2741                        // Skip Kafka events in MQTT stream
2742                        continue;
2743                    }
2744                    Err(broadcast::error::RecvError::Closed) => {
2745                        return None;
2746                    }
2747                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2748                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2749                        continue;
2750                    }
2751                }
2752            }
2753        }
2754    });
2755
2756    Sse::new(stream).keep_alive(
2757        axum::response::sse::KeepAlive::new()
2758            .interval(std::time::Duration::from_secs(15))
2759            .text("keep-alive-text"),
2760    )
2761}
2762
2763#[cfg(feature = "kafka")]
2764/// SSE stream for Kafka messages
2765async fn kafka_messages_stream(
2766    State(state): State<ManagementState>,
2767    Query(params): Query<std::collections::HashMap<String, String>>,
2768) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2769    let mut rx = state.message_events.subscribe();
2770    let topic_filter = params.get("topic").cloned();
2771
2772    let stream = stream::unfold(rx, move |mut rx| {
2773        let topic_filter = topic_filter.clone();
2774
2775        async move {
2776            loop {
2777                match rx.recv().await {
2778                    #[cfg(feature = "mqtt")]
2779                    Ok(MessageEvent::Mqtt(_)) => {
2780                        // Skip MQTT events in Kafka stream
2781                        continue;
2782                    }
2783                    Ok(MessageEvent::Kafka(event)) => {
2784                        // Apply topic filter if specified
2785                        if let Some(filter) = &topic_filter {
2786                            if !event.topic.contains(filter) {
2787                                continue;
2788                            }
2789                        }
2790
2791                        let event_json = serde_json::json!({
2792                            "protocol": "kafka",
2793                            "topic": event.topic,
2794                            "key": event.key,
2795                            "value": event.value,
2796                            "partition": event.partition,
2797                            "offset": event.offset,
2798                            "headers": event.headers,
2799                            "timestamp": event.timestamp,
2800                        });
2801
2802                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2803                            let sse_event =
2804                                Event::default().event("kafka_message").data(event_data);
2805                            return Some((Ok(sse_event), rx));
2806                        }
2807                    }
2808                    Err(broadcast::error::RecvError::Closed) => {
2809                        return None;
2810                    }
2811                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2812                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2813                        continue;
2814                    }
2815                }
2816            }
2817        }
2818    });
2819
2820    Sse::new(stream).keep_alive(
2821        axum::response::sse::KeepAlive::new()
2822            .interval(std::time::Duration::from_secs(15))
2823            .text("keep-alive-text"),
2824    )
2825}
2826
2827// ========== AI-Powered Features ==========
2828
2829/// Request for AI-powered API specification generation
2830#[derive(Debug, Deserialize)]
2831pub struct GenerateSpecRequest {
2832    /// Natural language description of the API to generate
2833    pub query: String,
2834    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2835    pub spec_type: String,
2836    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2837    pub api_version: Option<String>,
2838}
2839
2840/// Request for OpenAPI generation from recorded traffic
2841#[derive(Debug, Deserialize)]
2842pub struct GenerateOpenApiFromTrafficRequest {
2843    /// Path to recorder database (optional, defaults to ./recordings.db)
2844    #[serde(default)]
2845    pub database_path: Option<String>,
2846    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2847    #[serde(default)]
2848    pub since: Option<String>,
2849    /// End time for filtering (ISO 8601 format)
2850    #[serde(default)]
2851    pub until: Option<String>,
2852    /// Path pattern filter (supports wildcards, e.g., /api/*)
2853    #[serde(default)]
2854    pub path_pattern: Option<String>,
2855    /// Minimum confidence score for including paths (0.0 to 1.0)
2856    #[serde(default = "default_min_confidence")]
2857    pub min_confidence: f64,
2858}
2859
2860fn default_min_confidence() -> f64 {
2861    0.7
2862}
2863
2864/// Generate API specification from natural language using AI
2865#[cfg(feature = "data-faker")]
2866async fn generate_ai_spec(
2867    State(_state): State<ManagementState>,
2868    Json(request): Json<GenerateSpecRequest>,
2869) -> impl IntoResponse {
2870    use mockforge_data::rag::{
2871        config::{EmbeddingProvider, LlmProvider, RagConfig},
2872        engine::RagEngine,
2873        storage::{DocumentStorage, StorageFactory},
2874    };
2875    use std::sync::Arc;
2876
2877    // Build RAG config from environment variables
2878    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2879        .ok()
2880        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2881
2882    // Check if RAG is configured - require API key
2883    if api_key.is_none() {
2884        return (
2885            StatusCode::SERVICE_UNAVAILABLE,
2886            Json(serde_json::json!({
2887                "error": "AI service not configured",
2888                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2889            })),
2890        )
2891            .into_response();
2892    }
2893
2894    // Build RAG configuration
2895    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2896        .unwrap_or_else(|_| "openai".to_string())
2897        .to_lowercase();
2898
2899    let provider = match provider_str.as_str() {
2900        "openai" => LlmProvider::OpenAI,
2901        "anthropic" => LlmProvider::Anthropic,
2902        "ollama" => LlmProvider::Ollama,
2903        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2904        _ => LlmProvider::OpenAI,
2905    };
2906
2907    let api_endpoint =
2908        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2909            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2910            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2911            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2912            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2913        });
2914
2915    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2916        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2917        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2918        LlmProvider::Ollama => "llama2".to_string(),
2919        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2920    });
2921
2922    // Build RagConfig using default() and override fields
2923    let mut rag_config = RagConfig::default();
2924    rag_config.provider = provider;
2925    rag_config.api_endpoint = api_endpoint;
2926    rag_config.api_key = api_key;
2927    rag_config.model = model;
2928    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2929        .unwrap_or_else(|_| "4096".to_string())
2930        .parse()
2931        .unwrap_or(4096);
2932    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2933        .unwrap_or_else(|_| "0.3".to_string())
2934        .parse()
2935        .unwrap_or(0.3); // Lower temperature for more structured output
2936    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2937        .unwrap_or_else(|_| "60".to_string())
2938        .parse()
2939        .unwrap_or(60);
2940    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2941        .unwrap_or_else(|_| "4000".to_string())
2942        .parse()
2943        .unwrap_or(4000);
2944
2945    // Build the prompt for spec generation
2946    let spec_type_label = match request.spec_type.as_str() {
2947        "openapi" => "OpenAPI 3.0",
2948        "graphql" => "GraphQL",
2949        "asyncapi" => "AsyncAPI",
2950        _ => "OpenAPI 3.0",
2951    };
2952
2953    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2954
2955    let prompt = format!(
2956        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2957
2958User Requirements:
2959{}
2960
2961Instructions:
29621. Generate a complete, valid {} specification
29632. Include all paths, operations, request/response schemas, and components
29643. Use realistic field names and data types
29654. Include proper descriptions and examples
29665. Follow {} best practices
29676. Return ONLY the specification, no additional explanation
29687. For OpenAPI, use version {}
2969
2970Return the specification in {} format."#,
2971        spec_type_label,
2972        request.query,
2973        spec_type_label,
2974        spec_type_label,
2975        api_version,
2976        if request.spec_type == "graphql" {
2977            "GraphQL SDL"
2978        } else {
2979            "YAML"
2980        }
2981    );
2982
2983    // Create in-memory storage for RAG engine
2984    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
2985    // We need to use unsafe transmute or create a wrapper, but for now we'll use
2986    // a simpler approach: create InMemoryStorage directly
2987    use mockforge_data::rag::storage::InMemoryStorage;
2988    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2989
2990    // Create RAG engine
2991    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2992        Ok(engine) => engine,
2993        Err(e) => {
2994            return (
2995                StatusCode::INTERNAL_SERVER_ERROR,
2996                Json(serde_json::json!({
2997                    "error": "Failed to initialize RAG engine",
2998                    "message": e.to_string()
2999                })),
3000            )
3001                .into_response();
3002        }
3003    };
3004
3005    // Generate using RAG engine
3006    match rag_engine.generate(&prompt, None).await {
3007        Ok(generated_text) => {
3008            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3009            let spec = if request.spec_type == "graphql" {
3010                // For GraphQL, extract SDL
3011                extract_graphql_schema(&generated_text)
3012            } else {
3013                // For OpenAPI/AsyncAPI, extract YAML
3014                extract_yaml_spec(&generated_text)
3015            };
3016
3017            Json(serde_json::json!({
3018                "success": true,
3019                "spec": spec,
3020                "spec_type": request.spec_type,
3021            }))
3022            .into_response()
3023        }
3024        Err(e) => (
3025            StatusCode::INTERNAL_SERVER_ERROR,
3026            Json(serde_json::json!({
3027                "error": "AI generation failed",
3028                "message": e.to_string()
3029            })),
3030        )
3031            .into_response(),
3032    }
3033}
3034
3035#[cfg(not(feature = "data-faker"))]
3036async fn generate_ai_spec(
3037    State(_state): State<ManagementState>,
3038    Json(_request): Json<GenerateSpecRequest>,
3039) -> impl IntoResponse {
3040    (
3041        StatusCode::NOT_IMPLEMENTED,
3042        Json(serde_json::json!({
3043            "error": "AI features not enabled",
3044            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3045        })),
3046    )
3047        .into_response()
3048}
3049
3050/// Generate OpenAPI specification from recorded traffic
3051async fn generate_openapi_from_traffic(
3052    State(_state): State<ManagementState>,
3053    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3054) -> impl IntoResponse {
3055    use chrono::{DateTime, Utc};
3056    use mockforge_core::intelligent_behavior::{
3057        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3058        IntelligentBehaviorConfig,
3059    };
3060    use mockforge_recorder::{
3061        database::RecorderDatabase,
3062        openapi_export::{QueryFilters, RecordingsToOpenApi},
3063    };
3064    use std::path::PathBuf;
3065
3066    // Determine database path
3067    let db_path = if let Some(ref path) = request.database_path {
3068        PathBuf::from(path)
3069    } else {
3070        std::env::current_dir()
3071            .unwrap_or_else(|_| PathBuf::from("."))
3072            .join("recordings.db")
3073    };
3074
3075    // Open database
3076    let db = match RecorderDatabase::new(&db_path).await {
3077        Ok(db) => db,
3078        Err(e) => {
3079            return (
3080                StatusCode::BAD_REQUEST,
3081                Json(serde_json::json!({
3082                    "error": "Database error",
3083                    "message": format!("Failed to open recorder database: {}", e)
3084                })),
3085            )
3086                .into_response();
3087        }
3088    };
3089
3090    // Parse time filters
3091    let since_dt = if let Some(ref since_str) = request.since {
3092        match DateTime::parse_from_rfc3339(since_str) {
3093            Ok(dt) => Some(dt.with_timezone(&Utc)),
3094            Err(e) => {
3095                return (
3096                    StatusCode::BAD_REQUEST,
3097                    Json(serde_json::json!({
3098                        "error": "Invalid date format",
3099                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3100                    })),
3101                )
3102                    .into_response();
3103            }
3104        }
3105    } else {
3106        None
3107    };
3108
3109    let until_dt = if let Some(ref until_str) = request.until {
3110        match DateTime::parse_from_rfc3339(until_str) {
3111            Ok(dt) => Some(dt.with_timezone(&Utc)),
3112            Err(e) => {
3113                return (
3114                    StatusCode::BAD_REQUEST,
3115                    Json(serde_json::json!({
3116                        "error": "Invalid date format",
3117                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3118                    })),
3119                )
3120                    .into_response();
3121            }
3122        }
3123    } else {
3124        None
3125    };
3126
3127    // Build query filters
3128    let query_filters = QueryFilters {
3129        since: since_dt,
3130        until: until_dt,
3131        path_pattern: request.path_pattern.clone(),
3132        min_status_code: None,
3133        max_requests: Some(1000),
3134    };
3135
3136    // Query HTTP exchanges
3137    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3138    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3139    // dependency, so we need to manually convert to the local version.
3140    let exchanges_from_recorder =
3141        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3142            Ok(exchanges) => exchanges,
3143            Err(e) => {
3144                return (
3145                    StatusCode::INTERNAL_SERVER_ERROR,
3146                    Json(serde_json::json!({
3147                        "error": "Query error",
3148                        "message": format!("Failed to query HTTP exchanges: {}", e)
3149                    })),
3150                )
3151                    .into_response();
3152            }
3153        };
3154
3155    if exchanges_from_recorder.is_empty() {
3156        return (
3157            StatusCode::NOT_FOUND,
3158            Json(serde_json::json!({
3159                "error": "No exchanges found",
3160                "message": "No HTTP exchanges found matching the specified filters"
3161            })),
3162        )
3163            .into_response();
3164    }
3165
3166    // Convert to local HttpExchange type to avoid version mismatch
3167    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3168    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3169        .into_iter()
3170        .map(|e| LocalHttpExchange {
3171            method: e.method,
3172            path: e.path,
3173            query_params: e.query_params,
3174            headers: e.headers,
3175            body: e.body,
3176            body_encoding: e.body_encoding,
3177            status_code: e.status_code,
3178            response_headers: e.response_headers,
3179            response_body: e.response_body,
3180            response_body_encoding: e.response_body_encoding,
3181            timestamp: e.timestamp,
3182        })
3183        .collect();
3184
3185    // Create OpenAPI generator config
3186    let behavior_config = IntelligentBehaviorConfig::default();
3187    let gen_config = OpenApiGenerationConfig {
3188        min_confidence: request.min_confidence,
3189        behavior_model: Some(behavior_config.behavior_model),
3190    };
3191
3192    // Generate OpenAPI spec
3193    let generator = OpenApiSpecGenerator::new(gen_config);
3194    let result = match generator.generate_from_exchanges(exchanges).await {
3195        Ok(result) => result,
3196        Err(e) => {
3197            return (
3198                StatusCode::INTERNAL_SERVER_ERROR,
3199                Json(serde_json::json!({
3200                    "error": "Generation error",
3201                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3202                })),
3203            )
3204                .into_response();
3205        }
3206    };
3207
3208    // Prepare response
3209    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3210        raw.clone()
3211    } else {
3212        match serde_json::to_value(&result.spec.spec) {
3213            Ok(json) => json,
3214            Err(e) => {
3215                return (
3216                    StatusCode::INTERNAL_SERVER_ERROR,
3217                    Json(serde_json::json!({
3218                        "error": "Serialization error",
3219                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3220                    })),
3221                )
3222                    .into_response();
3223            }
3224        }
3225    };
3226
3227    // Build response with metadata
3228    let response = serde_json::json!({
3229        "spec": spec_json,
3230        "metadata": {
3231            "requests_analyzed": result.metadata.requests_analyzed,
3232            "paths_inferred": result.metadata.paths_inferred,
3233            "path_confidence": result.metadata.path_confidence,
3234            "generated_at": result.metadata.generated_at.to_rfc3339(),
3235            "duration_ms": result.metadata.duration_ms,
3236        }
3237    });
3238
3239    Json(response).into_response()
3240}
3241
3242/// List all rule explanations
3243async fn list_rule_explanations(
3244    State(state): State<ManagementState>,
3245    Query(params): Query<std::collections::HashMap<String, String>>,
3246) -> impl IntoResponse {
3247    use mockforge_core::intelligent_behavior::RuleType;
3248
3249    let explanations = state.rule_explanations.read().await;
3250    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3251
3252    // Filter by rule type if provided
3253    if let Some(rule_type_str) = params.get("rule_type") {
3254        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3255            explanations_vec.retain(|e| e.rule_type == rule_type);
3256        }
3257    }
3258
3259    // Filter by minimum confidence if provided
3260    if let Some(min_confidence_str) = params.get("min_confidence") {
3261        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3262            explanations_vec.retain(|e| e.confidence >= min_confidence);
3263        }
3264    }
3265
3266    // Sort by confidence (descending) and then by generated_at (descending)
3267    explanations_vec.sort_by(|a, b| {
3268        b.confidence
3269            .partial_cmp(&a.confidence)
3270            .unwrap_or(std::cmp::Ordering::Equal)
3271            .then_with(|| b.generated_at.cmp(&a.generated_at))
3272    });
3273
3274    Json(serde_json::json!({
3275        "explanations": explanations_vec,
3276        "total": explanations_vec.len(),
3277    }))
3278    .into_response()
3279}
3280
3281/// Get a specific rule explanation by ID
3282async fn get_rule_explanation(
3283    State(state): State<ManagementState>,
3284    Path(rule_id): Path<String>,
3285) -> impl IntoResponse {
3286    let explanations = state.rule_explanations.read().await;
3287
3288    match explanations.get(&rule_id) {
3289        Some(explanation) => Json(serde_json::json!({
3290            "explanation": explanation,
3291        }))
3292        .into_response(),
3293        None => (
3294            StatusCode::NOT_FOUND,
3295            Json(serde_json::json!({
3296                "error": "Rule explanation not found",
3297                "message": format!("No explanation found for rule ID: {}", rule_id)
3298            })),
3299        )
3300            .into_response(),
3301    }
3302}
3303
3304/// Request for learning from examples
3305#[derive(Debug, Deserialize)]
3306pub struct LearnFromExamplesRequest {
3307    /// Example request/response pairs to learn from
3308    pub examples: Vec<ExamplePairRequest>,
3309    /// Optional configuration override
3310    #[serde(default)]
3311    pub config: Option<serde_json::Value>,
3312}
3313
3314/// Example pair request format
3315#[derive(Debug, Deserialize)]
3316pub struct ExamplePairRequest {
3317    /// Request data (method, path, body, etc.)
3318    pub request: serde_json::Value,
3319    /// Response data (status_code, body, etc.)
3320    pub response: serde_json::Value,
3321}
3322
3323/// Learn behavioral rules from example pairs
3324///
3325/// This endpoint accepts example request/response pairs, generates behavioral rules
3326/// with explanations, and stores the explanations for later retrieval.
3327async fn learn_from_examples(
3328    State(state): State<ManagementState>,
3329    Json(request): Json<LearnFromExamplesRequest>,
3330) -> impl IntoResponse {
3331    use mockforge_core::intelligent_behavior::{
3332        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3333        rule_generator::{ExamplePair, RuleGenerator},
3334    };
3335
3336    if request.examples.is_empty() {
3337        return (
3338            StatusCode::BAD_REQUEST,
3339            Json(serde_json::json!({
3340                "error": "No examples provided",
3341                "message": "At least one example pair is required"
3342            })),
3343        )
3344            .into_response();
3345    }
3346
3347    // Convert request examples to ExamplePair format
3348    let example_pairs: Result<Vec<ExamplePair>, String> = request
3349        .examples
3350        .into_iter()
3351        .enumerate()
3352        .map(|(idx, ex)| {
3353            // Parse request JSON to extract method, path, body, etc.
3354            let method = ex
3355                .request
3356                .get("method")
3357                .and_then(|v| v.as_str())
3358                .map(|s| s.to_string())
3359                .unwrap_or_else(|| "GET".to_string());
3360            let path = ex
3361                .request
3362                .get("path")
3363                .and_then(|v| v.as_str())
3364                .map(|s| s.to_string())
3365                .unwrap_or_else(|| "/".to_string());
3366            let request_body = ex.request.get("body").cloned();
3367            let query_params = ex
3368                .request
3369                .get("query_params")
3370                .and_then(|v| v.as_object())
3371                .map(|obj| {
3372                    obj.iter()
3373                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3374                        .collect()
3375                })
3376                .unwrap_or_default();
3377            let headers = ex
3378                .request
3379                .get("headers")
3380                .and_then(|v| v.as_object())
3381                .map(|obj| {
3382                    obj.iter()
3383                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3384                        .collect()
3385                })
3386                .unwrap_or_default();
3387
3388            // Parse response JSON to extract status, body, etc.
3389            let status = ex
3390                .response
3391                .get("status_code")
3392                .or_else(|| ex.response.get("status"))
3393                .and_then(|v| v.as_u64())
3394                .map(|n| n as u16)
3395                .unwrap_or(200);
3396            let response_body = ex.response.get("body").cloned();
3397
3398            Ok(ExamplePair {
3399                method,
3400                path,
3401                request: request_body,
3402                status,
3403                response: response_body,
3404                query_params,
3405                headers,
3406                metadata: {
3407                    let mut meta = std::collections::HashMap::new();
3408                    meta.insert("source".to_string(), "api".to_string());
3409                    meta.insert("example_index".to_string(), idx.to_string());
3410                    meta
3411                },
3412            })
3413        })
3414        .collect();
3415
3416    let example_pairs = match example_pairs {
3417        Ok(pairs) => pairs,
3418        Err(e) => {
3419            return (
3420                StatusCode::BAD_REQUEST,
3421                Json(serde_json::json!({
3422                    "error": "Invalid examples",
3423                    "message": e
3424                })),
3425            )
3426                .into_response();
3427        }
3428    };
3429
3430    // Create behavior config (use provided config or default)
3431    let behavior_config = if let Some(config_json) = request.config {
3432        // Try to deserialize custom config, fall back to default
3433        serde_json::from_value(config_json)
3434            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3435            .behavior_model
3436    } else {
3437        BehaviorModelConfig::default()
3438    };
3439
3440    // Create rule generator
3441    let generator = RuleGenerator::new(behavior_config);
3442
3443    // Generate rules with explanations
3444    let (rules, explanations) =
3445        match generator.generate_rules_with_explanations(example_pairs).await {
3446            Ok(result) => result,
3447            Err(e) => {
3448                return (
3449                    StatusCode::INTERNAL_SERVER_ERROR,
3450                    Json(serde_json::json!({
3451                        "error": "Rule generation failed",
3452                        "message": format!("Failed to generate rules: {}", e)
3453                    })),
3454                )
3455                    .into_response();
3456            }
3457        };
3458
3459    // Store explanations in ManagementState
3460    {
3461        let mut stored_explanations = state.rule_explanations.write().await;
3462        for explanation in &explanations {
3463            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3464        }
3465    }
3466
3467    // Prepare response
3468    let response = serde_json::json!({
3469        "success": true,
3470        "rules_generated": {
3471            "consistency_rules": rules.consistency_rules.len(),
3472            "schemas": rules.schemas.len(),
3473            "state_machines": rules.state_transitions.len(),
3474            "system_prompt": !rules.system_prompt.is_empty(),
3475        },
3476        "explanations": explanations.iter().map(|e| serde_json::json!({
3477            "rule_id": e.rule_id,
3478            "rule_type": e.rule_type,
3479            "confidence": e.confidence,
3480            "reasoning": e.reasoning,
3481        })).collect::<Vec<_>>(),
3482        "total_explanations": explanations.len(),
3483    });
3484
3485    Json(response).into_response()
3486}
3487
3488fn extract_yaml_spec(text: &str) -> String {
3489    // Try to find YAML code blocks
3490    if let Some(start) = text.find("```yaml") {
3491        let yaml_start = text[start + 7..].trim_start();
3492        if let Some(end) = yaml_start.find("```") {
3493            return yaml_start[..end].trim().to_string();
3494        }
3495    }
3496    if let Some(start) = text.find("```") {
3497        let content_start = text[start + 3..].trim_start();
3498        if let Some(end) = content_start.find("```") {
3499            return content_start[..end].trim().to_string();
3500        }
3501    }
3502
3503    // Check if it starts with openapi: or asyncapi:
3504    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3505        return text.trim().to_string();
3506    }
3507
3508    // Return as-is if no code blocks found
3509    text.trim().to_string()
3510}
3511
3512/// Extract GraphQL schema from text content
3513fn extract_graphql_schema(text: &str) -> String {
3514    // Try to find GraphQL code blocks
3515    if let Some(start) = text.find("```graphql") {
3516        let schema_start = text[start + 10..].trim_start();
3517        if let Some(end) = schema_start.find("```") {
3518            return schema_start[..end].trim().to_string();
3519        }
3520    }
3521    if let Some(start) = text.find("```") {
3522        let content_start = text[start + 3..].trim_start();
3523        if let Some(end) = content_start.find("```") {
3524            return content_start[..end].trim().to_string();
3525        }
3526    }
3527
3528    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3529    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3530        return text.trim().to_string();
3531    }
3532
3533    text.trim().to_string()
3534}
3535
3536// ========== Chaos Engineering Management ==========
3537
3538/// Get current chaos engineering configuration
3539async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3540    #[cfg(feature = "chaos")]
3541    {
3542        if let Some(chaos_state) = &state.chaos_api_state {
3543            let config = chaos_state.config.read().await;
3544            // Convert ChaosConfig to JSON response format
3545            Json(serde_json::json!({
3546                "enabled": config.enabled,
3547                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3548                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3549                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3550                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3551            }))
3552            .into_response()
3553        } else {
3554            // Chaos API not available, return default
3555            Json(serde_json::json!({
3556                "enabled": false,
3557                "latency": null,
3558                "fault_injection": null,
3559                "rate_limit": null,
3560                "traffic_shaping": null,
3561            }))
3562            .into_response()
3563        }
3564    }
3565    #[cfg(not(feature = "chaos"))]
3566    {
3567        // Chaos feature not enabled
3568        Json(serde_json::json!({
3569            "enabled": false,
3570            "latency": null,
3571            "fault_injection": null,
3572            "rate_limit": null,
3573            "traffic_shaping": null,
3574        }))
3575        .into_response()
3576    }
3577}
3578
3579/// Request to update chaos configuration
3580#[derive(Debug, Deserialize)]
3581pub struct ChaosConfigUpdate {
3582    /// Whether to enable chaos engineering
3583    pub enabled: Option<bool>,
3584    /// Latency configuration
3585    pub latency: Option<serde_json::Value>,
3586    /// Fault injection configuration
3587    pub fault_injection: Option<serde_json::Value>,
3588    /// Rate limiting configuration
3589    pub rate_limit: Option<serde_json::Value>,
3590    /// Traffic shaping configuration
3591    pub traffic_shaping: Option<serde_json::Value>,
3592}
3593
3594/// Update chaos engineering configuration
3595async fn update_chaos_config(
3596    State(state): State<ManagementState>,
3597    Json(config_update): Json<ChaosConfigUpdate>,
3598) -> impl IntoResponse {
3599    #[cfg(feature = "chaos")]
3600    {
3601        if let Some(chaos_state) = &state.chaos_api_state {
3602            use mockforge_chaos::config::{
3603                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3604                TrafficShapingConfig,
3605            };
3606
3607            let mut config = chaos_state.config.write().await;
3608
3609            // Update enabled flag if provided
3610            if let Some(enabled) = config_update.enabled {
3611                config.enabled = enabled;
3612            }
3613
3614            // Update latency config if provided
3615            if let Some(latency_json) = config_update.latency {
3616                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3617                    config.latency = Some(latency);
3618                }
3619            }
3620
3621            // Update fault injection config if provided
3622            if let Some(fault_json) = config_update.fault_injection {
3623                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3624                    config.fault_injection = Some(fault);
3625                }
3626            }
3627
3628            // Update rate limit config if provided
3629            if let Some(rate_json) = config_update.rate_limit {
3630                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3631                    config.rate_limit = Some(rate);
3632                }
3633            }
3634
3635            // Update traffic shaping config if provided
3636            if let Some(traffic_json) = config_update.traffic_shaping {
3637                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3638                    config.traffic_shaping = Some(traffic);
3639                }
3640            }
3641
3642            // Reinitialize middleware injectors with new config
3643            // The middleware will pick up the changes on the next request
3644            drop(config);
3645
3646            info!("Chaos configuration updated successfully");
3647            Json(serde_json::json!({
3648                "success": true,
3649                "message": "Chaos configuration updated and applied"
3650            }))
3651            .into_response()
3652        } else {
3653            (
3654                StatusCode::SERVICE_UNAVAILABLE,
3655                Json(serde_json::json!({
3656                    "success": false,
3657                    "error": "Chaos API not available",
3658                    "message": "Chaos engineering is not enabled or configured"
3659                })),
3660            )
3661                .into_response()
3662        }
3663    }
3664    #[cfg(not(feature = "chaos"))]
3665    {
3666        (
3667            StatusCode::NOT_IMPLEMENTED,
3668            Json(serde_json::json!({
3669                "success": false,
3670                "error": "Chaos feature not enabled",
3671                "message": "Chaos engineering feature is not compiled into this build"
3672            })),
3673        )
3674            .into_response()
3675    }
3676}
3677
3678// ========== Network Profile Management ==========
3679
3680/// List available network profiles
3681async fn list_network_profiles() -> impl IntoResponse {
3682    use mockforge_core::network_profiles::NetworkProfileCatalog;
3683
3684    let catalog = NetworkProfileCatalog::default();
3685    let profiles: Vec<serde_json::Value> = catalog
3686        .list_profiles_with_description()
3687        .iter()
3688        .map(|(name, description)| {
3689            serde_json::json!({
3690                "name": name,
3691                "description": description,
3692            })
3693        })
3694        .collect();
3695
3696    Json(serde_json::json!({
3697        "profiles": profiles
3698    }))
3699    .into_response()
3700}
3701
3702#[derive(Debug, Deserialize)]
3703/// Request to apply a network profile
3704pub struct ApplyNetworkProfileRequest {
3705    /// Name of the network profile to apply
3706    pub profile_name: String,
3707}
3708
3709/// Apply a network profile
3710async fn apply_network_profile(
3711    State(state): State<ManagementState>,
3712    Json(request): Json<ApplyNetworkProfileRequest>,
3713) -> impl IntoResponse {
3714    use mockforge_core::network_profiles::NetworkProfileCatalog;
3715
3716    let catalog = NetworkProfileCatalog::default();
3717    if let Some(profile) = catalog.get(&request.profile_name) {
3718        // Apply profile to server configuration if available
3719        // NetworkProfile contains latency and traffic_shaping configs
3720        if let Some(server_config) = &state.server_config {
3721            let mut config = server_config.write().await;
3722
3723            // Apply network profile's traffic shaping to core config
3724            use mockforge_core::config::NetworkShapingConfig;
3725
3726            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3727            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3728            // which has bandwidth and burst_loss fields
3729            let network_shaping = NetworkShapingConfig {
3730                enabled: profile.traffic_shaping.bandwidth.enabled
3731                    || profile.traffic_shaping.burst_loss.enabled,
3732                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3733                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3734                max_connections: 1000, // Default value
3735            };
3736
3737            // Update chaos config if it exists, or create it
3738            // Chaos config is in observability.chaos, not core.chaos
3739            if let Some(ref mut chaos) = config.observability.chaos {
3740                chaos.traffic_shaping = Some(network_shaping);
3741            } else {
3742                // Create minimal chaos config with traffic shaping
3743                use mockforge_core::config::ChaosEngConfig;
3744                config.observability.chaos = Some(ChaosEngConfig {
3745                    enabled: true,
3746                    latency: None,
3747                    fault_injection: None,
3748                    rate_limit: None,
3749                    traffic_shaping: Some(network_shaping),
3750                    scenario: None,
3751                });
3752            }
3753
3754            info!("Network profile '{}' applied to server configuration", request.profile_name);
3755        } else {
3756            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3757        }
3758
3759        // Also update chaos API state if available
3760        #[cfg(feature = "chaos")]
3761        {
3762            if let Some(chaos_state) = &state.chaos_api_state {
3763                use mockforge_chaos::config::TrafficShapingConfig;
3764
3765                let mut chaos_config = chaos_state.config.write().await;
3766                // Apply profile's traffic shaping to chaos API state
3767                let chaos_traffic_shaping = TrafficShapingConfig {
3768                    enabled: profile.traffic_shaping.bandwidth.enabled
3769                        || profile.traffic_shaping.burst_loss.enabled,
3770                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3771                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3772                    max_connections: 0,
3773                    connection_timeout_ms: 30000,
3774                };
3775                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3776                chaos_config.enabled = true; // Enable chaos when applying a profile
3777                drop(chaos_config);
3778                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3779            }
3780        }
3781
3782        Json(serde_json::json!({
3783            "success": true,
3784            "message": format!("Network profile '{}' applied", request.profile_name),
3785            "profile": {
3786                "name": profile.name,
3787                "description": profile.description,
3788            }
3789        }))
3790        .into_response()
3791    } else {
3792        (
3793            StatusCode::NOT_FOUND,
3794            Json(serde_json::json!({
3795                "error": "Profile not found",
3796                "message": format!("Network profile '{}' not found", request.profile_name)
3797            })),
3798        )
3799            .into_response()
3800    }
3801}
3802
3803/// Build the management API router with UI Builder support
3804pub fn management_router_with_ui_builder(
3805    state: ManagementState,
3806    server_config: mockforge_core::config::ServerConfig,
3807) -> Router {
3808    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3809
3810    // Create the base management router
3811    let management = management_router(state);
3812
3813    // Create UI Builder state and router
3814    let ui_builder_state = UIBuilderState::new(server_config);
3815    let ui_builder = create_ui_builder_router(ui_builder_state);
3816
3817    // Nest UI Builder under /ui-builder
3818    management.nest("/ui-builder", ui_builder)
3819}
3820
3821/// Build management router with spec import API
3822pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3823    use crate::spec_import::{spec_import_router, SpecImportState};
3824
3825    // Create base management router
3826    let management = management_router(state);
3827
3828    // Merge with spec import router
3829    Router::new()
3830        .merge(management)
3831        .merge(spec_import_router(SpecImportState::new()))
3832}
3833
3834#[cfg(test)]
3835mod tests {
3836    use super::*;
3837
3838    #[tokio::test]
3839    async fn test_create_and_get_mock() {
3840        let state = ManagementState::new(None, None, 3000);
3841
3842        let mock = MockConfig {
3843            id: "test-1".to_string(),
3844            name: "Test Mock".to_string(),
3845            method: "GET".to_string(),
3846            path: "/test".to_string(),
3847            response: MockResponse {
3848                body: serde_json::json!({"message": "test"}),
3849                headers: None,
3850            },
3851            enabled: true,
3852            latency_ms: None,
3853            status_code: Some(200),
3854            request_match: None,
3855            priority: None,
3856            scenario: None,
3857            required_scenario_state: None,
3858            new_scenario_state: None,
3859        };
3860
3861        // Create mock
3862        {
3863            let mut mocks = state.mocks.write().await;
3864            mocks.push(mock.clone());
3865        }
3866
3867        // Get mock
3868        let mocks = state.mocks.read().await;
3869        let found = mocks.iter().find(|m| m.id == "test-1");
3870        assert!(found.is_some());
3871        assert_eq!(found.unwrap().name, "Test Mock");
3872    }
3873
3874    #[tokio::test]
3875    async fn test_server_stats() {
3876        let state = ManagementState::new(None, None, 3000);
3877
3878        // Add some mocks
3879        {
3880            let mut mocks = state.mocks.write().await;
3881            mocks.push(MockConfig {
3882                id: "1".to_string(),
3883                name: "Mock 1".to_string(),
3884                method: "GET".to_string(),
3885                path: "/test1".to_string(),
3886                response: MockResponse {
3887                    body: serde_json::json!({}),
3888                    headers: None,
3889                },
3890                enabled: true,
3891                latency_ms: None,
3892                status_code: Some(200),
3893                request_match: None,
3894                priority: None,
3895                scenario: None,
3896                required_scenario_state: None,
3897                new_scenario_state: None,
3898            });
3899            mocks.push(MockConfig {
3900                id: "2".to_string(),
3901                name: "Mock 2".to_string(),
3902                method: "POST".to_string(),
3903                path: "/test2".to_string(),
3904                response: MockResponse {
3905                    body: serde_json::json!({}),
3906                    headers: None,
3907                },
3908                enabled: false,
3909                latency_ms: None,
3910                status_code: Some(201),
3911                request_match: None,
3912                priority: None,
3913                scenario: None,
3914                required_scenario_state: None,
3915                new_scenario_state: None,
3916            });
3917        }
3918
3919        let mocks = state.mocks.read().await;
3920        assert_eq!(mocks.len(), 2);
3921        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3922    }
3923}