mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if let Some(path) = json_path.strip_prefix("$.") {
317        let parts: Vec<&str> = path.split('.').collect();
318
319        let mut current = json;
320        for part in parts {
321            if let Some(obj) = current.as_object() {
322                if let Some(value) = obj.get(part) {
323                    current = value;
324                } else {
325                    return false;
326                }
327            } else {
328                return false;
329            }
330        }
331        true
332    } else {
333        // For complex JSONPath expressions, would need a proper JSONPath library
334        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
335        false
336    }
337}
338
339/// Evaluate a custom matcher expression
340fn evaluate_custom_matcher(
341    expression: &str,
342    method: &str,
343    path: &str,
344    headers: &std::collections::HashMap<String, String>,
345    query_params: &std::collections::HashMap<String, String>,
346    body: Option<&[u8]>,
347) -> bool {
348    use regex::Regex;
349
350    let expr = expression.trim();
351
352    // Handle equality expressions (field == "value")
353    if expr.contains("==") {
354        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
355        if parts.len() != 2 {
356            return false;
357        }
358
359        let field = parts[0];
360        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
361
362        match field {
363            "method" => method == expected_value,
364            "path" => path == expected_value,
365            _ if field.starts_with("headers.") => {
366                let header_name = &field[8..];
367                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
368            }
369            _ if field.starts_with("query.") => {
370                let param_name = &field[6..];
371                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
372            }
373            _ => false,
374        }
375    }
376    // Handle regex match expressions (field =~ "pattern")
377    else if expr.contains("=~") {
378        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
379        if parts.len() != 2 {
380            return false;
381        }
382
383        let field = parts[0];
384        let pattern = parts[1].trim_matches('"').trim_matches('\'');
385
386        if let Ok(re) = Regex::new(pattern) {
387            match field {
388                "method" => re.is_match(method),
389                "path" => re.is_match(path),
390                _ if field.starts_with("headers.") => {
391                    let header_name = &field[8..];
392                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
393                }
394                _ if field.starts_with("query.") => {
395                    let param_name = &field[6..];
396                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
397                }
398                _ => false,
399            }
400        } else {
401            false
402        }
403    }
404    // Handle contains expressions (field contains "value")
405    else if expr.contains("contains") {
406        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
407        if parts.len() != 2 {
408            return false;
409        }
410
411        let field = parts[0];
412        let search_value = parts[1].trim_matches('"').trim_matches('\'');
413
414        match field {
415            "path" => path.contains(search_value),
416            _ if field.starts_with("headers.") => {
417                let header_name = &field[8..];
418                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
419            }
420            _ if field.starts_with("body") => {
421                if let Some(body_bytes) = body {
422                    let body_str = String::from_utf8_lossy(body_bytes);
423                    body_str.contains(search_value)
424                } else {
425                    false
426                }
427            }
428            _ => false,
429        }
430    } else {
431        // Unknown expression format
432        tracing::warn!("Unknown custom matcher expression format: {}", expr);
433        false
434    }
435}
436
437/// Server statistics
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ServerStats {
440    /// Server uptime in seconds
441    pub uptime_seconds: u64,
442    /// Total number of requests processed
443    pub total_requests: u64,
444    /// Number of active mock configurations
445    pub active_mocks: usize,
446    /// Number of currently enabled mocks
447    pub enabled_mocks: usize,
448    /// Number of registered API routes
449    pub registered_routes: usize,
450}
451
452/// Server configuration info
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ServerConfig {
455    /// MockForge version string
456    pub version: String,
457    /// Server port number
458    pub port: u16,
459    /// Whether an OpenAPI spec is loaded
460    pub has_openapi_spec: bool,
461    /// Optional path to the OpenAPI spec file
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub spec_path: Option<String>,
464}
465
466/// Shared state for the management API
467#[derive(Clone)]
468pub struct ManagementState {
469    /// Collection of mock configurations
470    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
471    /// Optional OpenAPI specification
472    pub spec: Option<Arc<OpenApiSpec>>,
473    /// Optional path to the OpenAPI spec file
474    pub spec_path: Option<String>,
475    /// Server port number
476    pub port: u16,
477    /// Server start time for uptime calculation
478    pub start_time: std::time::Instant,
479    /// Counter for total requests processed
480    pub request_counter: Arc<RwLock<u64>>,
481    /// Optional proxy configuration for migration pipeline
482    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
483    /// Optional SMTP registry for email mocking
484    #[cfg(feature = "smtp")]
485    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
486    /// Optional MQTT broker for message mocking
487    #[cfg(feature = "mqtt")]
488    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
489    /// Optional Kafka broker for event streaming
490    #[cfg(feature = "kafka")]
491    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
492    /// Broadcast channel for message events (MQTT & Kafka)
493    #[cfg(any(feature = "mqtt", feature = "kafka"))]
494    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
495    /// State machine manager for scenario state machines
496    pub state_machine_manager:
497        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
498    /// Optional WebSocket broadcast channel for real-time updates
499    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
500    /// Lifecycle hook registry for extensibility
501    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
502    /// Rule explanations storage (in-memory for now)
503    pub rule_explanations: Arc<
504        RwLock<
505            std::collections::HashMap<
506                String,
507                mockforge_core::intelligent_behavior::RuleExplanation,
508            >,
509        >,
510    >,
511    /// Optional chaos API state for chaos config management
512    #[cfg(feature = "chaos")]
513    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
514    /// Optional server configuration for profile application
515    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
516}
517
518impl ManagementState {
519    /// Create a new management state
520    ///
521    /// # Arguments
522    /// * `spec` - Optional OpenAPI specification
523    /// * `spec_path` - Optional path to the OpenAPI spec file
524    /// * `port` - Server port number
525    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
526        Self {
527            mocks: Arc::new(RwLock::new(Vec::new())),
528            spec,
529            spec_path,
530            port,
531            start_time: std::time::Instant::now(),
532            request_counter: Arc::new(RwLock::new(0)),
533            proxy_config: None,
534            #[cfg(feature = "smtp")]
535            smtp_registry: None,
536            #[cfg(feature = "mqtt")]
537            mqtt_broker: None,
538            #[cfg(feature = "kafka")]
539            kafka_broker: None,
540            #[cfg(any(feature = "mqtt", feature = "kafka"))]
541            message_events: {
542                let (tx, _) = broadcast::channel(1000);
543                Arc::new(tx)
544            },
545            state_machine_manager: Arc::new(RwLock::new(
546                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
547            )),
548            ws_broadcast: None,
549            lifecycle_hooks: None,
550            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
551            #[cfg(feature = "chaos")]
552            chaos_api_state: None,
553            server_config: None,
554        }
555    }
556
557    /// Add lifecycle hook registry to management state
558    pub fn with_lifecycle_hooks(
559        mut self,
560        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
561    ) -> Self {
562        self.lifecycle_hooks = Some(hooks);
563        self
564    }
565
566    /// Add WebSocket broadcast channel to management state
567    pub fn with_ws_broadcast(
568        mut self,
569        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
570    ) -> Self {
571        self.ws_broadcast = Some(ws_broadcast);
572        self
573    }
574
575    /// Add proxy configuration to management state
576    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
577        self.proxy_config = Some(proxy_config);
578        self
579    }
580
581    #[cfg(feature = "smtp")]
582    /// Add SMTP registry to management state
583    pub fn with_smtp_registry(
584        mut self,
585        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
586    ) -> Self {
587        self.smtp_registry = Some(smtp_registry);
588        self
589    }
590
591    #[cfg(feature = "mqtt")]
592    /// Add MQTT broker to management state
593    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
594        self.mqtt_broker = Some(mqtt_broker);
595        self
596    }
597
598    #[cfg(feature = "kafka")]
599    /// Add Kafka broker to management state
600    pub fn with_kafka_broker(
601        mut self,
602        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
603    ) -> Self {
604        self.kafka_broker = Some(kafka_broker);
605        self
606    }
607
608    #[cfg(feature = "chaos")]
609    /// Add chaos API state to management state
610    pub fn with_chaos_api_state(
611        mut self,
612        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
613    ) -> Self {
614        self.chaos_api_state = Some(chaos_api_state);
615        self
616    }
617
618    /// Add server configuration to management state
619    pub fn with_server_config(
620        mut self,
621        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
622    ) -> Self {
623        self.server_config = Some(server_config);
624        self
625    }
626}
627
628/// List all mocks
629async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
630    let mocks = state.mocks.read().await;
631    Json(serde_json::json!({
632        "mocks": *mocks,
633        "total": mocks.len(),
634        "enabled": mocks.iter().filter(|m| m.enabled).count()
635    }))
636}
637
638/// Get a specific mock by ID
639async fn get_mock(
640    State(state): State<ManagementState>,
641    Path(id): Path<String>,
642) -> Result<Json<MockConfig>, StatusCode> {
643    let mocks = state.mocks.read().await;
644    mocks
645        .iter()
646        .find(|m| m.id == id)
647        .cloned()
648        .map(Json)
649        .ok_or(StatusCode::NOT_FOUND)
650}
651
652/// Create a new mock
653async fn create_mock(
654    State(state): State<ManagementState>,
655    Json(mut mock): Json<MockConfig>,
656) -> Result<Json<MockConfig>, StatusCode> {
657    let mut mocks = state.mocks.write().await;
658
659    // Generate ID if not provided
660    if mock.id.is_empty() {
661        mock.id = uuid::Uuid::new_v4().to_string();
662    }
663
664    // Check for duplicate ID
665    if mocks.iter().any(|m| m.id == mock.id) {
666        return Err(StatusCode::CONFLICT);
667    }
668
669    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
670
671    // Invoke lifecycle hooks
672    if let Some(hooks) = &state.lifecycle_hooks {
673        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
674            id: mock.id.clone(),
675            name: mock.name.clone(),
676            config: serde_json::to_value(&mock).unwrap_or_default(),
677        };
678        hooks.invoke_mock_created(&event).await;
679    }
680
681    mocks.push(mock.clone());
682
683    // Broadcast WebSocket event
684    if let Some(tx) = &state.ws_broadcast {
685        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
686    }
687
688    Ok(Json(mock))
689}
690
691/// Update an existing mock
692async fn update_mock(
693    State(state): State<ManagementState>,
694    Path(id): Path<String>,
695    Json(updated_mock): Json<MockConfig>,
696) -> Result<Json<MockConfig>, StatusCode> {
697    let mut mocks = state.mocks.write().await;
698
699    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
700
701    // Get old mock for comparison
702    let old_mock = mocks[position].clone();
703
704    info!("Updating mock: {}", id);
705    mocks[position] = updated_mock.clone();
706
707    // Invoke lifecycle hooks
708    if let Some(hooks) = &state.lifecycle_hooks {
709        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
710            id: updated_mock.id.clone(),
711            name: updated_mock.name.clone(),
712            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
713        };
714        hooks.invoke_mock_updated(&event).await;
715
716        // Check if enabled state changed
717        if old_mock.enabled != updated_mock.enabled {
718            let state_event = if updated_mock.enabled {
719                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
720                    id: updated_mock.id.clone(),
721                }
722            } else {
723                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
724                    id: updated_mock.id.clone(),
725                }
726            };
727            hooks.invoke_mock_state_changed(&state_event).await;
728        }
729    }
730
731    // Broadcast WebSocket event
732    if let Some(tx) = &state.ws_broadcast {
733        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
734    }
735
736    Ok(Json(updated_mock))
737}
738
739/// Delete a mock
740async fn delete_mock(
741    State(state): State<ManagementState>,
742    Path(id): Path<String>,
743) -> Result<StatusCode, StatusCode> {
744    let mut mocks = state.mocks.write().await;
745
746    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
747
748    // Get mock info before deletion for lifecycle hooks
749    let deleted_mock = mocks[position].clone();
750
751    info!("Deleting mock: {}", id);
752    mocks.remove(position);
753
754    // Invoke lifecycle hooks
755    if let Some(hooks) = &state.lifecycle_hooks {
756        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
757            id: deleted_mock.id.clone(),
758            name: deleted_mock.name.clone(),
759        };
760        hooks.invoke_mock_deleted(&event).await;
761    }
762
763    // Broadcast WebSocket event
764    if let Some(tx) = &state.ws_broadcast {
765        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
766    }
767
768    Ok(StatusCode::NO_CONTENT)
769}
770
771/// Request to validate configuration
772#[derive(Debug, Deserialize)]
773pub struct ValidateConfigRequest {
774    /// Configuration to validate (as JSON)
775    pub config: serde_json::Value,
776    /// Format of the configuration ("json" or "yaml")
777    #[serde(default = "default_format")]
778    pub format: String,
779}
780
781fn default_format() -> String {
782    "json".to_string()
783}
784
785/// Validate configuration without applying it
786async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
787    use mockforge_core::config::ServerConfig;
788
789    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
790        "yaml" | "yml" => {
791            let yaml_str = match serde_json::to_string(&request.config) {
792                Ok(s) => s,
793                Err(e) => {
794                    return (
795                        StatusCode::BAD_REQUEST,
796                        Json(serde_json::json!({
797                            "valid": false,
798                            "error": format!("Failed to convert to string: {}", e),
799                            "message": "Configuration validation failed"
800                        })),
801                    )
802                        .into_response();
803                }
804            };
805            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
806        }
807        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
808    };
809
810    match config_result {
811        Ok(_) => Json(serde_json::json!({
812            "valid": true,
813            "message": "Configuration is valid"
814        }))
815        .into_response(),
816        Err(e) => (
817            StatusCode::BAD_REQUEST,
818            Json(serde_json::json!({
819                "valid": false,
820                "error": format!("Invalid configuration: {}", e),
821                "message": "Configuration validation failed"
822            })),
823        )
824            .into_response(),
825    }
826}
827
828/// Request for bulk configuration update
829#[derive(Debug, Deserialize)]
830pub struct BulkConfigUpdateRequest {
831    /// Partial configuration updates (only specified fields will be updated)
832    pub updates: serde_json::Value,
833}
834
835/// Bulk update configuration
836///
837/// This endpoint allows updating multiple configuration options at once.
838/// Only the specified fields in the updates object will be modified.
839///
840/// Configuration updates are applied to the server configuration if available
841/// in ManagementState. Changes take effect immediately for supported settings.
842async fn bulk_update_config(
843    State(state): State<ManagementState>,
844    Json(request): Json<BulkConfigUpdateRequest>,
845) -> impl IntoResponse {
846    // Validate the updates structure
847    if !request.updates.is_object() {
848        return (
849            StatusCode::BAD_REQUEST,
850            Json(serde_json::json!({
851                "error": "Invalid request",
852                "message": "Updates must be a JSON object"
853            })),
854        )
855            .into_response();
856    }
857
858    // Try to validate as partial ServerConfig
859    use mockforge_core::config::ServerConfig;
860
861    // Create a minimal valid config and try to merge updates
862    let base_config = ServerConfig::default();
863    let base_json = match serde_json::to_value(&base_config) {
864        Ok(v) => v,
865        Err(e) => {
866            return (
867                StatusCode::INTERNAL_SERVER_ERROR,
868                Json(serde_json::json!({
869                    "error": "Internal error",
870                    "message": format!("Failed to serialize base config: {}", e)
871                })),
872            )
873                .into_response();
874        }
875    };
876
877    // Merge updates into base config (simplified merge)
878    let mut merged = base_json.clone();
879    if let (Some(merged_obj), Some(updates_obj)) =
880        (merged.as_object_mut(), request.updates.as_object())
881    {
882        for (key, value) in updates_obj {
883            merged_obj.insert(key.clone(), value.clone());
884        }
885    }
886
887    // Validate the merged config
888    match serde_json::from_value::<ServerConfig>(merged) {
889        Ok(_) => {
890            // Config is valid
891            // Note: Runtime application of config changes would require:
892            // 1. Storing ServerConfig in ManagementState
893            // 2. Implementing hot-reload mechanism for server configuration
894            // 3. Updating router state and middleware based on new config
895            // For now, this endpoint only validates the configuration structure
896            Json(serde_json::json!({
897                "success": true,
898                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
899                "updates_received": request.updates,
900                "validated": true
901            }))
902            .into_response()
903        }
904        Err(e) => (
905            StatusCode::BAD_REQUEST,
906            Json(serde_json::json!({
907                "error": "Invalid configuration",
908                "message": format!("Configuration validation failed: {}", e),
909                "validated": false
910            })),
911        )
912            .into_response(),
913    }
914}
915
916/// Get server statistics
917async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
918    let mocks = state.mocks.read().await;
919    let request_count = *state.request_counter.read().await;
920
921    Json(ServerStats {
922        uptime_seconds: state.start_time.elapsed().as_secs(),
923        total_requests: request_count,
924        active_mocks: mocks.len(),
925        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
926        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
927    })
928}
929
930/// Get server configuration
931async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
932    Json(ServerConfig {
933        version: env!("CARGO_PKG_VERSION").to_string(),
934        port: state.port,
935        has_openapi_spec: state.spec.is_some(),
936        spec_path: state.spec_path.clone(),
937    })
938}
939
940/// Health check endpoint
941async fn health_check() -> Json<serde_json::Value> {
942    Json(serde_json::json!({
943        "status": "healthy",
944        "service": "mockforge-management",
945        "timestamp": chrono::Utc::now().to_rfc3339()
946    }))
947}
948
949/// Export format for mock configurations
950#[derive(Debug, Clone, Serialize, Deserialize)]
951#[serde(rename_all = "lowercase")]
952pub enum ExportFormat {
953    /// JSON format
954    Json,
955    /// YAML format
956    Yaml,
957}
958
959/// Export mocks in specified format
960async fn export_mocks(
961    State(state): State<ManagementState>,
962    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
963) -> Result<(StatusCode, String), StatusCode> {
964    let mocks = state.mocks.read().await;
965
966    let format = params
967        .get("format")
968        .map(|f| match f.as_str() {
969            "yaml" | "yml" => ExportFormat::Yaml,
970            _ => ExportFormat::Json,
971        })
972        .unwrap_or(ExportFormat::Json);
973
974    match format {
975        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
976            .map(|json| (StatusCode::OK, json))
977            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
979            .map(|yaml| (StatusCode::OK, yaml))
980            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
981    }
982}
983
984/// Import mocks from JSON/YAML
985async fn import_mocks(
986    State(state): State<ManagementState>,
987    Json(mocks): Json<Vec<MockConfig>>,
988) -> impl IntoResponse {
989    let mut current_mocks = state.mocks.write().await;
990    current_mocks.clear();
991    current_mocks.extend(mocks);
992    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
993}
994
995#[cfg(feature = "smtp")]
996/// List SMTP emails in mailbox
997async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
998    if let Some(ref smtp_registry) = state.smtp_registry {
999        match smtp_registry.get_emails() {
1000            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1001            Err(e) => (
1002                StatusCode::INTERNAL_SERVER_ERROR,
1003                Json(serde_json::json!({
1004                    "error": "Failed to retrieve emails",
1005                    "message": e.to_string()
1006                })),
1007            ),
1008        }
1009    } else {
1010        (
1011            StatusCode::NOT_IMPLEMENTED,
1012            Json(serde_json::json!({
1013                "error": "SMTP mailbox management not available",
1014                "message": "SMTP server is not enabled or registry not available."
1015            })),
1016        )
1017    }
1018}
1019
1020/// Get specific SMTP email
1021#[cfg(feature = "smtp")]
1022async fn get_smtp_email(
1023    State(state): State<ManagementState>,
1024    Path(id): Path<String>,
1025) -> impl IntoResponse {
1026    if let Some(ref smtp_registry) = state.smtp_registry {
1027        match smtp_registry.get_email_by_id(&id) {
1028            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1029            Ok(None) => (
1030                StatusCode::NOT_FOUND,
1031                Json(serde_json::json!({
1032                    "error": "Email not found",
1033                    "id": id
1034                })),
1035            ),
1036            Err(e) => (
1037                StatusCode::INTERNAL_SERVER_ERROR,
1038                Json(serde_json::json!({
1039                    "error": "Failed to retrieve email",
1040                    "message": e.to_string()
1041                })),
1042            ),
1043        }
1044    } else {
1045        (
1046            StatusCode::NOT_IMPLEMENTED,
1047            Json(serde_json::json!({
1048                "error": "SMTP mailbox management not available",
1049                "message": "SMTP server is not enabled or registry not available."
1050            })),
1051        )
1052    }
1053}
1054
1055/// Clear SMTP mailbox
1056#[cfg(feature = "smtp")]
1057async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1058    if let Some(ref smtp_registry) = state.smtp_registry {
1059        match smtp_registry.clear_mailbox() {
1060            Ok(()) => (
1061                StatusCode::OK,
1062                Json(serde_json::json!({
1063                    "message": "Mailbox cleared successfully"
1064                })),
1065            ),
1066            Err(e) => (
1067                StatusCode::INTERNAL_SERVER_ERROR,
1068                Json(serde_json::json!({
1069                    "error": "Failed to clear mailbox",
1070                    "message": e.to_string()
1071                })),
1072            ),
1073        }
1074    } else {
1075        (
1076            StatusCode::NOT_IMPLEMENTED,
1077            Json(serde_json::json!({
1078                "error": "SMTP mailbox management not available",
1079                "message": "SMTP server is not enabled or registry not available."
1080            })),
1081        )
1082    }
1083}
1084
1085/// Export SMTP mailbox
1086#[cfg(feature = "smtp")]
1087async fn export_smtp_mailbox(
1088    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1089) -> impl IntoResponse {
1090    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1091    (
1092        StatusCode::NOT_IMPLEMENTED,
1093        Json(serde_json::json!({
1094            "error": "SMTP mailbox management not available via HTTP API",
1095            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1096            "requested_format": format
1097        })),
1098    )
1099}
1100
1101/// Search SMTP emails
1102#[cfg(feature = "smtp")]
1103async fn search_smtp_emails(
1104    State(state): State<ManagementState>,
1105    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1106) -> impl IntoResponse {
1107    if let Some(ref smtp_registry) = state.smtp_registry {
1108        let filters = EmailSearchFilters {
1109            sender: params.get("sender").cloned(),
1110            recipient: params.get("recipient").cloned(),
1111            subject: params.get("subject").cloned(),
1112            body: params.get("body").cloned(),
1113            since: params
1114                .get("since")
1115                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1116                .map(|dt| dt.with_timezone(&chrono::Utc)),
1117            until: params
1118                .get("until")
1119                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1120                .map(|dt| dt.with_timezone(&chrono::Utc)),
1121            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1122            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1123        };
1124
1125        match smtp_registry.search_emails(filters) {
1126            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1127            Err(e) => (
1128                StatusCode::INTERNAL_SERVER_ERROR,
1129                Json(serde_json::json!({
1130                    "error": "Failed to search emails",
1131                    "message": e.to_string()
1132                })),
1133            ),
1134        }
1135    } else {
1136        (
1137            StatusCode::NOT_IMPLEMENTED,
1138            Json(serde_json::json!({
1139                "error": "SMTP mailbox management not available",
1140                "message": "SMTP server is not enabled or registry not available."
1141            })),
1142        )
1143    }
1144}
1145
1146/// MQTT broker statistics
1147#[cfg(feature = "mqtt")]
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1149pub struct MqttBrokerStats {
1150    /// Number of connected MQTT clients
1151    pub connected_clients: usize,
1152    /// Number of active MQTT topics
1153    pub active_topics: usize,
1154    /// Number of retained messages
1155    pub retained_messages: usize,
1156    /// Total number of subscriptions
1157    pub total_subscriptions: usize,
1158}
1159
1160/// MQTT management handlers
1161#[cfg(feature = "mqtt")]
1162async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1163    if let Some(broker) = &state.mqtt_broker {
1164        let connected_clients = broker.get_connected_clients().await.len();
1165        let active_topics = broker.get_active_topics().await.len();
1166        let stats = broker.get_topic_stats().await;
1167
1168        let broker_stats = MqttBrokerStats {
1169            connected_clients,
1170            active_topics,
1171            retained_messages: stats.retained_messages,
1172            total_subscriptions: stats.total_subscriptions,
1173        };
1174
1175        Json(broker_stats).into_response()
1176    } else {
1177        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1178    }
1179}
1180
1181#[cfg(feature = "mqtt")]
1182async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1183    if let Some(broker) = &state.mqtt_broker {
1184        let clients = broker.get_connected_clients().await;
1185        Json(serde_json::json!({
1186            "clients": clients
1187        }))
1188        .into_response()
1189    } else {
1190        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191    }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1196    if let Some(broker) = &state.mqtt_broker {
1197        let topics = broker.get_active_topics().await;
1198        Json(serde_json::json!({
1199            "topics": topics
1200        }))
1201        .into_response()
1202    } else {
1203        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204    }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn disconnect_mqtt_client(
1209    State(state): State<ManagementState>,
1210    Path(client_id): Path<String>,
1211) -> impl IntoResponse {
1212    if let Some(broker) = &state.mqtt_broker {
1213        match broker.disconnect_client(&client_id).await {
1214            Ok(_) => {
1215                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1216            }
1217            Err(e) => {
1218                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1219                    .into_response()
1220            }
1221        }
1222    } else {
1223        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1224    }
1225}
1226
1227// ========== MQTT Publish Handler ==========
1228
1229#[cfg(feature = "mqtt")]
1230/// Request to publish a single MQTT message
1231#[derive(Debug, Deserialize)]
1232pub struct MqttPublishRequest {
1233    /// Topic to publish to
1234    pub topic: String,
1235    /// Message payload (string or JSON)
1236    pub payload: String,
1237    /// QoS level (0, 1, or 2)
1238    #[serde(default = "default_qos")]
1239    pub qos: u8,
1240    /// Whether to retain the message
1241    #[serde(default)]
1242    pub retain: bool,
1243}
1244
1245#[cfg(feature = "mqtt")]
1246fn default_qos() -> u8 {
1247    0
1248}
1249
1250#[cfg(feature = "mqtt")]
1251/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1252async fn publish_mqtt_message_handler(
1253    State(state): State<ManagementState>,
1254    Json(request): Json<serde_json::Value>,
1255) -> impl IntoResponse {
1256    // Extract fields from JSON manually
1257    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1258    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1259    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1260    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1261
1262    if topic.is_none() || payload.is_none() {
1263        return (
1264            StatusCode::BAD_REQUEST,
1265            Json(serde_json::json!({
1266                "error": "Invalid request",
1267                "message": "Missing required fields: topic and payload"
1268            })),
1269        );
1270    }
1271
1272    let topic = topic.unwrap();
1273    let payload = payload.unwrap();
1274
1275    if let Some(broker) = &state.mqtt_broker {
1276        // Validate QoS
1277        if qos > 2 {
1278            return (
1279                StatusCode::BAD_REQUEST,
1280                Json(serde_json::json!({
1281                    "error": "Invalid QoS",
1282                    "message": "QoS must be 0, 1, or 2"
1283                })),
1284            );
1285        }
1286
1287        // Convert payload to bytes
1288        let payload_bytes = payload.as_bytes().to_vec();
1289        let client_id = "mockforge-management-api".to_string();
1290
1291        let publish_result = broker
1292            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1293            .await
1294            .map_err(|e| format!("{}", e));
1295
1296        match publish_result {
1297            Ok(_) => {
1298                // Emit message event for real-time monitoring
1299                let event = MessageEvent::Mqtt(MqttMessageEvent {
1300                    topic: topic.clone(),
1301                    payload: payload.clone(),
1302                    qos,
1303                    retain,
1304                    timestamp: chrono::Utc::now().to_rfc3339(),
1305                });
1306                let _ = state.message_events.send(event);
1307
1308                (
1309                    StatusCode::OK,
1310                    Json(serde_json::json!({
1311                        "success": true,
1312                        "message": format!("Message published to topic '{}'", topic),
1313                        "topic": topic,
1314                        "qos": qos,
1315                        "retain": retain
1316                    })),
1317                )
1318            }
1319            Err(error_msg) => (
1320                StatusCode::INTERNAL_SERVER_ERROR,
1321                Json(serde_json::json!({
1322                    "error": "Failed to publish message",
1323                    "message": error_msg
1324                })),
1325            ),
1326        }
1327    } else {
1328        (
1329            StatusCode::SERVICE_UNAVAILABLE,
1330            Json(serde_json::json!({
1331                "error": "MQTT broker not available",
1332                "message": "MQTT broker is not enabled or not available."
1333            })),
1334        )
1335    }
1336}
1337
1338#[cfg(not(feature = "mqtt"))]
1339/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1340async fn publish_mqtt_message_handler(
1341    State(_state): State<ManagementState>,
1342    Json(_request): Json<serde_json::Value>,
1343) -> impl IntoResponse {
1344    (
1345        StatusCode::SERVICE_UNAVAILABLE,
1346        Json(serde_json::json!({
1347            "error": "MQTT feature not enabled",
1348            "message": "MQTT support is not compiled into this build"
1349        })),
1350    )
1351}
1352
1353#[cfg(feature = "mqtt")]
1354/// Request to publish multiple MQTT messages
1355#[derive(Debug, Deserialize)]
1356pub struct MqttBatchPublishRequest {
1357    /// List of messages to publish
1358    pub messages: Vec<MqttPublishRequest>,
1359    /// Delay between messages in milliseconds
1360    #[serde(default = "default_delay")]
1361    pub delay_ms: u64,
1362}
1363
1364#[cfg(feature = "mqtt")]
1365fn default_delay() -> u64 {
1366    100
1367}
1368
1369#[cfg(feature = "mqtt")]
1370/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1371async fn publish_mqtt_batch_handler(
1372    State(state): State<ManagementState>,
1373    Json(request): Json<serde_json::Value>,
1374) -> impl IntoResponse {
1375    // Extract fields from JSON manually
1376    let messages_json = request.get("messages").and_then(|v| v.as_array());
1377    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1378
1379    if messages_json.is_none() {
1380        return (
1381            StatusCode::BAD_REQUEST,
1382            Json(serde_json::json!({
1383                "error": "Invalid request",
1384                "message": "Missing required field: messages"
1385            })),
1386        );
1387    }
1388
1389    let messages_json = messages_json.unwrap();
1390
1391    if let Some(broker) = &state.mqtt_broker {
1392        if messages_json.is_empty() {
1393            return (
1394                StatusCode::BAD_REQUEST,
1395                Json(serde_json::json!({
1396                    "error": "Empty batch",
1397                    "message": "At least one message is required"
1398                })),
1399            );
1400        }
1401
1402        let mut results = Vec::new();
1403        let client_id = "mockforge-management-api".to_string();
1404
1405        for (index, msg_json) in messages_json.iter().enumerate() {
1406            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1407            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1408            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1409            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1410
1411            if topic.is_none() || payload.is_none() {
1412                results.push(serde_json::json!({
1413                    "index": index,
1414                    "success": false,
1415                    "error": "Missing required fields: topic and payload"
1416                }));
1417                continue;
1418            }
1419
1420            let topic = topic.unwrap();
1421            let payload = payload.unwrap();
1422
1423            // Validate QoS
1424            if qos > 2 {
1425                results.push(serde_json::json!({
1426                    "index": index,
1427                    "success": false,
1428                    "error": "Invalid QoS (must be 0, 1, or 2)"
1429                }));
1430                continue;
1431            }
1432
1433            // Convert payload to bytes
1434            let payload_bytes = payload.as_bytes().to_vec();
1435
1436            let publish_result = broker
1437                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1438                .await
1439                .map_err(|e| format!("{}", e));
1440
1441            match publish_result {
1442                Ok(_) => {
1443                    // Emit message event
1444                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1445                        topic: topic.clone(),
1446                        payload: payload.clone(),
1447                        qos,
1448                        retain,
1449                        timestamp: chrono::Utc::now().to_rfc3339(),
1450                    });
1451                    let _ = state.message_events.send(event);
1452
1453                    results.push(serde_json::json!({
1454                        "index": index,
1455                        "success": true,
1456                        "topic": topic,
1457                        "qos": qos
1458                    }));
1459                }
1460                Err(error_msg) => {
1461                    results.push(serde_json::json!({
1462                        "index": index,
1463                        "success": false,
1464                        "error": error_msg
1465                    }));
1466                }
1467            }
1468
1469            // Add delay between messages (except for the last one)
1470            if index < messages_json.len() - 1 && delay_ms > 0 {
1471                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1472            }
1473        }
1474
1475        let success_count =
1476            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1477
1478        (
1479            StatusCode::OK,
1480            Json(serde_json::json!({
1481                "success": true,
1482                "total": messages_json.len(),
1483                "succeeded": success_count,
1484                "failed": messages_json.len() - success_count,
1485                "results": results
1486            })),
1487        )
1488    } else {
1489        (
1490            StatusCode::SERVICE_UNAVAILABLE,
1491            Json(serde_json::json!({
1492                "error": "MQTT broker not available",
1493                "message": "MQTT broker is not enabled or not available."
1494            })),
1495        )
1496    }
1497}
1498
1499#[cfg(not(feature = "mqtt"))]
1500/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1501async fn publish_mqtt_batch_handler(
1502    State(_state): State<ManagementState>,
1503    Json(_request): Json<serde_json::Value>,
1504) -> impl IntoResponse {
1505    (
1506        StatusCode::SERVICE_UNAVAILABLE,
1507        Json(serde_json::json!({
1508            "error": "MQTT feature not enabled",
1509            "message": "MQTT support is not compiled into this build"
1510        })),
1511    )
1512}
1513
1514// Migration pipeline handlers
1515
1516/// Request to set migration mode
1517#[derive(Debug, Deserialize)]
1518struct SetMigrationModeRequest {
1519    mode: String,
1520}
1521
1522/// Get all migration routes
1523async fn get_migration_routes(
1524    State(state): State<ManagementState>,
1525) -> Result<Json<serde_json::Value>, StatusCode> {
1526    let proxy_config = match &state.proxy_config {
1527        Some(config) => config,
1528        None => {
1529            return Ok(Json(serde_json::json!({
1530                "error": "Migration not configured. Proxy config not available."
1531            })));
1532        }
1533    };
1534
1535    let config = proxy_config.read().await;
1536    let routes = config.get_migration_routes();
1537
1538    Ok(Json(serde_json::json!({
1539        "routes": routes
1540    })))
1541}
1542
1543/// Toggle a route's migration mode
1544async fn toggle_route_migration(
1545    State(state): State<ManagementState>,
1546    Path(pattern): Path<String>,
1547) -> Result<Json<serde_json::Value>, StatusCode> {
1548    let proxy_config = match &state.proxy_config {
1549        Some(config) => config,
1550        None => {
1551            return Ok(Json(serde_json::json!({
1552                "error": "Migration not configured. Proxy config not available."
1553            })));
1554        }
1555    };
1556
1557    let mut config = proxy_config.write().await;
1558    let new_mode = match config.toggle_route_migration(&pattern) {
1559        Some(mode) => mode,
1560        None => {
1561            return Ok(Json(serde_json::json!({
1562                "error": format!("Route pattern not found: {}", pattern)
1563            })));
1564        }
1565    };
1566
1567    Ok(Json(serde_json::json!({
1568        "pattern": pattern,
1569        "mode": format!("{:?}", new_mode).to_lowercase()
1570    })))
1571}
1572
1573/// Set a route's migration mode explicitly
1574async fn set_route_migration_mode(
1575    State(state): State<ManagementState>,
1576    Path(pattern): Path<String>,
1577    Json(request): Json<SetMigrationModeRequest>,
1578) -> Result<Json<serde_json::Value>, StatusCode> {
1579    let proxy_config = match &state.proxy_config {
1580        Some(config) => config,
1581        None => {
1582            return Ok(Json(serde_json::json!({
1583                "error": "Migration not configured. Proxy config not available."
1584            })));
1585        }
1586    };
1587
1588    use mockforge_core::proxy::config::MigrationMode;
1589    let mode = match request.mode.to_lowercase().as_str() {
1590        "mock" => MigrationMode::Mock,
1591        "shadow" => MigrationMode::Shadow,
1592        "real" => MigrationMode::Real,
1593        "auto" => MigrationMode::Auto,
1594        _ => {
1595            return Ok(Json(serde_json::json!({
1596                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1597            })));
1598        }
1599    };
1600
1601    let mut config = proxy_config.write().await;
1602    let updated = config.update_rule_migration_mode(&pattern, mode);
1603
1604    if !updated {
1605        return Ok(Json(serde_json::json!({
1606            "error": format!("Route pattern not found: {}", pattern)
1607        })));
1608    }
1609
1610    Ok(Json(serde_json::json!({
1611        "pattern": pattern,
1612        "mode": format!("{:?}", mode).to_lowercase()
1613    })))
1614}
1615
1616/// Toggle a group's migration mode
1617async fn toggle_group_migration(
1618    State(state): State<ManagementState>,
1619    Path(group): Path<String>,
1620) -> Result<Json<serde_json::Value>, StatusCode> {
1621    let proxy_config = match &state.proxy_config {
1622        Some(config) => config,
1623        None => {
1624            return Ok(Json(serde_json::json!({
1625                "error": "Migration not configured. Proxy config not available."
1626            })));
1627        }
1628    };
1629
1630    let mut config = proxy_config.write().await;
1631    let new_mode = config.toggle_group_migration(&group);
1632
1633    Ok(Json(serde_json::json!({
1634        "group": group,
1635        "mode": format!("{:?}", new_mode).to_lowercase()
1636    })))
1637}
1638
1639/// Set a group's migration mode explicitly
1640async fn set_group_migration_mode(
1641    State(state): State<ManagementState>,
1642    Path(group): Path<String>,
1643    Json(request): Json<SetMigrationModeRequest>,
1644) -> Result<Json<serde_json::Value>, StatusCode> {
1645    let proxy_config = match &state.proxy_config {
1646        Some(config) => config,
1647        None => {
1648            return Ok(Json(serde_json::json!({
1649                "error": "Migration not configured. Proxy config not available."
1650            })));
1651        }
1652    };
1653
1654    use mockforge_core::proxy::config::MigrationMode;
1655    let mode = match request.mode.to_lowercase().as_str() {
1656        "mock" => MigrationMode::Mock,
1657        "shadow" => MigrationMode::Shadow,
1658        "real" => MigrationMode::Real,
1659        "auto" => MigrationMode::Auto,
1660        _ => {
1661            return Ok(Json(serde_json::json!({
1662                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1663            })));
1664        }
1665    };
1666
1667    let mut config = proxy_config.write().await;
1668    config.update_group_migration_mode(&group, mode);
1669
1670    Ok(Json(serde_json::json!({
1671        "group": group,
1672        "mode": format!("{:?}", mode).to_lowercase()
1673    })))
1674}
1675
1676/// Get all migration groups
1677async fn get_migration_groups(
1678    State(state): State<ManagementState>,
1679) -> Result<Json<serde_json::Value>, StatusCode> {
1680    let proxy_config = match &state.proxy_config {
1681        Some(config) => config,
1682        None => {
1683            return Ok(Json(serde_json::json!({
1684                "error": "Migration not configured. Proxy config not available."
1685            })));
1686        }
1687    };
1688
1689    let config = proxy_config.read().await;
1690    let groups = config.get_migration_groups();
1691
1692    // Convert to JSON-serializable format
1693    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1694        .into_iter()
1695        .map(|(name, info)| {
1696            (
1697                name,
1698                serde_json::json!({
1699                    "name": info.name,
1700                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1701                    "route_count": info.route_count
1702                }),
1703            )
1704        })
1705        .collect();
1706
1707    Ok(Json(serde_json::json!(groups_json)))
1708}
1709
1710/// Get overall migration status
1711async fn get_migration_status(
1712    State(state): State<ManagementState>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714    let proxy_config = match &state.proxy_config {
1715        Some(config) => config,
1716        None => {
1717            return Ok(Json(serde_json::json!({
1718                "error": "Migration not configured. Proxy config not available."
1719            })));
1720        }
1721    };
1722
1723    let config = proxy_config.read().await;
1724    let routes = config.get_migration_routes();
1725    let groups = config.get_migration_groups();
1726
1727    let mut mock_count = 0;
1728    let mut shadow_count = 0;
1729    let mut real_count = 0;
1730    let mut auto_count = 0;
1731
1732    for route in &routes {
1733        match route.migration_mode {
1734            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1735            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1736            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1737            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1738        }
1739    }
1740
1741    Ok(Json(serde_json::json!({
1742        "total_routes": routes.len(),
1743        "mock_routes": mock_count,
1744        "shadow_routes": shadow_count,
1745        "real_routes": real_count,
1746        "auto_routes": auto_count,
1747        "total_groups": groups.len(),
1748        "migration_enabled": config.migration_enabled
1749    })))
1750}
1751
1752// ========== Proxy Replacement Rules Management ==========
1753
1754/// Request body for creating/updating proxy replacement rules
1755#[derive(Debug, Deserialize, Serialize)]
1756pub struct ProxyRuleRequest {
1757    /// URL pattern to match (supports wildcards like "/api/users/*")
1758    pub pattern: String,
1759    /// Rule type: "request" or "response"
1760    #[serde(rename = "type")]
1761    pub rule_type: String,
1762    /// Optional status code filter for response rules
1763    #[serde(default)]
1764    pub status_codes: Vec<u16>,
1765    /// Body transformations to apply
1766    pub body_transforms: Vec<BodyTransformRequest>,
1767    /// Whether this rule is enabled
1768    #[serde(default = "default_true")]
1769    pub enabled: bool,
1770}
1771
1772/// Request body for individual body transformations
1773#[derive(Debug, Deserialize, Serialize)]
1774pub struct BodyTransformRequest {
1775    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1776    pub path: String,
1777    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1778    pub replace: String,
1779    /// Operation to perform: "replace", "add", or "remove"
1780    #[serde(default)]
1781    pub operation: String,
1782}
1783
1784/// Response format for proxy rules
1785#[derive(Debug, Serialize)]
1786pub struct ProxyRuleResponse {
1787    /// Rule ID (index in the array)
1788    pub id: usize,
1789    /// URL pattern
1790    pub pattern: String,
1791    /// Rule type
1792    #[serde(rename = "type")]
1793    pub rule_type: String,
1794    /// Status codes (for response rules)
1795    pub status_codes: Vec<u16>,
1796    /// Body transformations
1797    pub body_transforms: Vec<BodyTransformRequest>,
1798    /// Whether enabled
1799    pub enabled: bool,
1800}
1801
1802/// List all proxy replacement rules
1803async fn list_proxy_rules(
1804    State(state): State<ManagementState>,
1805) -> Result<Json<serde_json::Value>, StatusCode> {
1806    let proxy_config = match &state.proxy_config {
1807        Some(config) => config,
1808        None => {
1809            return Ok(Json(serde_json::json!({
1810                "error": "Proxy not configured. Proxy config not available."
1811            })));
1812        }
1813    };
1814
1815    let config = proxy_config.read().await;
1816
1817    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1818
1819    // Add request replacement rules
1820    for (idx, rule) in config.request_replacements.iter().enumerate() {
1821        rules.push(ProxyRuleResponse {
1822            id: idx,
1823            pattern: rule.pattern.clone(),
1824            rule_type: "request".to_string(),
1825            status_codes: Vec::new(),
1826            body_transforms: rule
1827                .body_transforms
1828                .iter()
1829                .map(|t| BodyTransformRequest {
1830                    path: t.path.clone(),
1831                    replace: t.replace.clone(),
1832                    operation: format!("{:?}", t.operation).to_lowercase(),
1833                })
1834                .collect(),
1835            enabled: rule.enabled,
1836        });
1837    }
1838
1839    // Add response replacement rules
1840    let request_count = config.request_replacements.len();
1841    for (idx, rule) in config.response_replacements.iter().enumerate() {
1842        rules.push(ProxyRuleResponse {
1843            id: request_count + idx,
1844            pattern: rule.pattern.clone(),
1845            rule_type: "response".to_string(),
1846            status_codes: rule.status_codes.clone(),
1847            body_transforms: rule
1848                .body_transforms
1849                .iter()
1850                .map(|t| BodyTransformRequest {
1851                    path: t.path.clone(),
1852                    replace: t.replace.clone(),
1853                    operation: format!("{:?}", t.operation).to_lowercase(),
1854                })
1855                .collect(),
1856            enabled: rule.enabled,
1857        });
1858    }
1859
1860    Ok(Json(serde_json::json!({
1861        "rules": rules
1862    })))
1863}
1864
1865/// Create a new proxy replacement rule
1866async fn create_proxy_rule(
1867    State(state): State<ManagementState>,
1868    Json(request): Json<ProxyRuleRequest>,
1869) -> Result<Json<serde_json::Value>, StatusCode> {
1870    let proxy_config = match &state.proxy_config {
1871        Some(config) => config,
1872        None => {
1873            return Ok(Json(serde_json::json!({
1874                "error": "Proxy not configured. Proxy config not available."
1875            })));
1876        }
1877    };
1878
1879    // Validate request
1880    if request.body_transforms.is_empty() {
1881        return Ok(Json(serde_json::json!({
1882            "error": "At least one body transform is required"
1883        })));
1884    }
1885
1886    let body_transforms: Vec<BodyTransform> = request
1887        .body_transforms
1888        .iter()
1889        .map(|t| {
1890            let op = match t.operation.as_str() {
1891                "replace" => TransformOperation::Replace,
1892                "add" => TransformOperation::Add,
1893                "remove" => TransformOperation::Remove,
1894                _ => TransformOperation::Replace,
1895            };
1896            BodyTransform {
1897                path: t.path.clone(),
1898                replace: t.replace.clone(),
1899                operation: op,
1900            }
1901        })
1902        .collect();
1903
1904    let new_rule = BodyTransformRule {
1905        pattern: request.pattern.clone(),
1906        status_codes: request.status_codes.clone(),
1907        body_transforms,
1908        enabled: request.enabled,
1909    };
1910
1911    let mut config = proxy_config.write().await;
1912
1913    let rule_id = if request.rule_type == "request" {
1914        config.request_replacements.push(new_rule);
1915        config.request_replacements.len() - 1
1916    } else if request.rule_type == "response" {
1917        config.response_replacements.push(new_rule);
1918        config.request_replacements.len() + config.response_replacements.len() - 1
1919    } else {
1920        return Ok(Json(serde_json::json!({
1921            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1922        })));
1923    };
1924
1925    Ok(Json(serde_json::json!({
1926        "id": rule_id,
1927        "message": "Rule created successfully"
1928    })))
1929}
1930
1931/// Get a specific proxy replacement rule
1932async fn get_proxy_rule(
1933    State(state): State<ManagementState>,
1934    Path(id): Path<String>,
1935) -> Result<Json<serde_json::Value>, StatusCode> {
1936    let proxy_config = match &state.proxy_config {
1937        Some(config) => config,
1938        None => {
1939            return Ok(Json(serde_json::json!({
1940                "error": "Proxy not configured. Proxy config not available."
1941            })));
1942        }
1943    };
1944
1945    let config = proxy_config.read().await;
1946    let rule_id: usize = match id.parse() {
1947        Ok(id) => id,
1948        Err(_) => {
1949            return Ok(Json(serde_json::json!({
1950                "error": format!("Invalid rule ID: {}", id)
1951            })));
1952        }
1953    };
1954
1955    let request_count = config.request_replacements.len();
1956
1957    if rule_id < request_count {
1958        // Request rule
1959        let rule = &config.request_replacements[rule_id];
1960        Ok(Json(serde_json::json!({
1961            "id": rule_id,
1962            "pattern": rule.pattern,
1963            "type": "request",
1964            "status_codes": [],
1965            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1966                "path": t.path,
1967                "replace": t.replace,
1968                "operation": format!("{:?}", t.operation).to_lowercase()
1969            })).collect::<Vec<_>>(),
1970            "enabled": rule.enabled
1971        })))
1972    } else if rule_id < request_count + config.response_replacements.len() {
1973        // Response rule
1974        let response_idx = rule_id - request_count;
1975        let rule = &config.response_replacements[response_idx];
1976        Ok(Json(serde_json::json!({
1977            "id": rule_id,
1978            "pattern": rule.pattern,
1979            "type": "response",
1980            "status_codes": rule.status_codes,
1981            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1982                "path": t.path,
1983                "replace": t.replace,
1984                "operation": format!("{:?}", t.operation).to_lowercase()
1985            })).collect::<Vec<_>>(),
1986            "enabled": rule.enabled
1987        })))
1988    } else {
1989        Ok(Json(serde_json::json!({
1990            "error": format!("Rule ID {} not found", rule_id)
1991        })))
1992    }
1993}
1994
1995/// Update a proxy replacement rule
1996async fn update_proxy_rule(
1997    State(state): State<ManagementState>,
1998    Path(id): Path<String>,
1999    Json(request): Json<ProxyRuleRequest>,
2000) -> Result<Json<serde_json::Value>, StatusCode> {
2001    let proxy_config = match &state.proxy_config {
2002        Some(config) => config,
2003        None => {
2004            return Ok(Json(serde_json::json!({
2005                "error": "Proxy not configured. Proxy config not available."
2006            })));
2007        }
2008    };
2009
2010    let mut config = proxy_config.write().await;
2011    let rule_id: usize = match id.parse() {
2012        Ok(id) => id,
2013        Err(_) => {
2014            return Ok(Json(serde_json::json!({
2015                "error": format!("Invalid rule ID: {}", id)
2016            })));
2017        }
2018    };
2019
2020    let body_transforms: Vec<BodyTransform> = request
2021        .body_transforms
2022        .iter()
2023        .map(|t| {
2024            let op = match t.operation.as_str() {
2025                "replace" => TransformOperation::Replace,
2026                "add" => TransformOperation::Add,
2027                "remove" => TransformOperation::Remove,
2028                _ => TransformOperation::Replace,
2029            };
2030            BodyTransform {
2031                path: t.path.clone(),
2032                replace: t.replace.clone(),
2033                operation: op,
2034            }
2035        })
2036        .collect();
2037
2038    let updated_rule = BodyTransformRule {
2039        pattern: request.pattern.clone(),
2040        status_codes: request.status_codes.clone(),
2041        body_transforms,
2042        enabled: request.enabled,
2043    };
2044
2045    let request_count = config.request_replacements.len();
2046
2047    if rule_id < request_count {
2048        // Update request rule
2049        config.request_replacements[rule_id] = updated_rule;
2050    } else if rule_id < request_count + config.response_replacements.len() {
2051        // Update response rule
2052        let response_idx = rule_id - request_count;
2053        config.response_replacements[response_idx] = updated_rule;
2054    } else {
2055        return Ok(Json(serde_json::json!({
2056            "error": format!("Rule ID {} not found", rule_id)
2057        })));
2058    }
2059
2060    Ok(Json(serde_json::json!({
2061        "id": rule_id,
2062        "message": "Rule updated successfully"
2063    })))
2064}
2065
2066/// Delete a proxy replacement rule
2067async fn delete_proxy_rule(
2068    State(state): State<ManagementState>,
2069    Path(id): Path<String>,
2070) -> Result<Json<serde_json::Value>, StatusCode> {
2071    let proxy_config = match &state.proxy_config {
2072        Some(config) => config,
2073        None => {
2074            return Ok(Json(serde_json::json!({
2075                "error": "Proxy not configured. Proxy config not available."
2076            })));
2077        }
2078    };
2079
2080    let mut config = proxy_config.write().await;
2081    let rule_id: usize = match id.parse() {
2082        Ok(id) => id,
2083        Err(_) => {
2084            return Ok(Json(serde_json::json!({
2085                "error": format!("Invalid rule ID: {}", id)
2086            })));
2087        }
2088    };
2089
2090    let request_count = config.request_replacements.len();
2091
2092    if rule_id < request_count {
2093        // Delete request rule
2094        config.request_replacements.remove(rule_id);
2095    } else if rule_id < request_count + config.response_replacements.len() {
2096        // Delete response rule
2097        let response_idx = rule_id - request_count;
2098        config.response_replacements.remove(response_idx);
2099    } else {
2100        return Ok(Json(serde_json::json!({
2101            "error": format!("Rule ID {} not found", rule_id)
2102        })));
2103    }
2104
2105    Ok(Json(serde_json::json!({
2106        "id": rule_id,
2107        "message": "Rule deleted successfully"
2108    })))
2109}
2110
2111/// Get recent intercepted requests/responses for inspection
2112/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2113async fn get_proxy_inspect(
2114    State(_state): State<ManagementState>,
2115    Query(params): Query<std::collections::HashMap<String, String>>,
2116) -> Result<Json<serde_json::Value>, StatusCode> {
2117    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2118
2119    // Note: Request/response inspection would require:
2120    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2121    // 2. Integrating with proxy middleware to capture traffic
2122    // 3. Implementing filtering and pagination for large volumes of traffic
2123    // For now, return an empty response structure indicating the feature is not yet implemented
2124    Ok(Json(serde_json::json!({
2125        "requests": [],
2126        "responses": [],
2127        "limit": limit,
2128        "total": 0,
2129        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2130    })))
2131}
2132
2133/// Build the management API router
2134pub fn management_router(state: ManagementState) -> Router {
2135    let router = Router::new()
2136        .route("/health", get(health_check))
2137        .route("/stats", get(get_stats))
2138        .route("/config", get(get_config))
2139        .route("/config/validate", post(validate_config))
2140        .route("/config/bulk", post(bulk_update_config))
2141        .route("/mocks", get(list_mocks))
2142        .route("/mocks", post(create_mock))
2143        .route("/mocks/{id}", get(get_mock))
2144        .route("/mocks/{id}", put(update_mock))
2145        .route("/mocks/{id}", delete(delete_mock))
2146        .route("/export", get(export_mocks))
2147        .route("/import", post(import_mocks));
2148
2149    #[cfg(feature = "smtp")]
2150    let router = router
2151        .route("/smtp/mailbox", get(list_smtp_emails))
2152        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2153        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2154        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2155        .route("/smtp/mailbox/search", get(search_smtp_emails));
2156
2157    #[cfg(not(feature = "smtp"))]
2158    let router = router;
2159
2160    // MQTT routes
2161    #[cfg(feature = "mqtt")]
2162    let router = router
2163        .route("/mqtt/stats", get(get_mqtt_stats))
2164        .route("/mqtt/clients", get(get_mqtt_clients))
2165        .route("/mqtt/topics", get(get_mqtt_topics))
2166        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2167        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2168        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2169        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2170
2171    #[cfg(not(feature = "mqtt"))]
2172    let router = router
2173        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2174        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2175
2176    #[cfg(feature = "kafka")]
2177    let router = router
2178        .route("/kafka/stats", get(get_kafka_stats))
2179        .route("/kafka/topics", get(get_kafka_topics))
2180        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2181        .route("/kafka/groups", get(get_kafka_groups))
2182        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2183        .route("/kafka/produce", post(produce_kafka_message))
2184        .route("/kafka/produce/batch", post(produce_kafka_batch))
2185        .route("/kafka/messages/stream", get(kafka_messages_stream));
2186
2187    #[cfg(not(feature = "kafka"))]
2188    let router = router;
2189
2190    // Migration pipeline routes
2191    let router = router
2192        .route("/migration/routes", get(get_migration_routes))
2193        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2194        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2195        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2196        .route("/migration/groups/{group}", put(set_group_migration_mode))
2197        .route("/migration/groups", get(get_migration_groups))
2198        .route("/migration/status", get(get_migration_status));
2199
2200    // Proxy replacement rules routes
2201    let router = router
2202        .route("/proxy/rules", get(list_proxy_rules))
2203        .route("/proxy/rules", post(create_proxy_rule))
2204        .route("/proxy/rules/{id}", get(get_proxy_rule))
2205        .route("/proxy/rules/{id}", put(update_proxy_rule))
2206        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2207        .route("/proxy/inspect", get(get_proxy_inspect));
2208
2209    // AI-powered features
2210    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2211
2212    // Snapshot diff endpoints
2213    let router = router.nest(
2214        "/snapshot-diff",
2215        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2216    );
2217
2218    #[cfg(feature = "behavioral-cloning")]
2219    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2220
2221    let router = router
2222        .route("/mockai/learn", post(learn_from_examples))
2223        .route("/mockai/rules/explanations", get(list_rule_explanations))
2224        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2225        .route("/chaos/config", get(get_chaos_config))
2226        .route("/chaos/config", post(update_chaos_config))
2227        .route("/network/profiles", get(list_network_profiles))
2228        .route("/network/profile/apply", post(apply_network_profile));
2229
2230    // State machine API routes
2231    let router =
2232        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2233
2234    router.with_state(state)
2235}
2236
2237#[cfg(feature = "kafka")]
2238#[derive(Debug, Clone, Serialize, Deserialize)]
2239pub struct KafkaBrokerStats {
2240    /// Number of topics
2241    pub topics: usize,
2242    /// Total number of partitions
2243    pub partitions: usize,
2244    /// Number of consumer groups
2245    pub consumer_groups: usize,
2246    /// Total messages produced
2247    pub messages_produced: u64,
2248    /// Total messages consumed
2249    pub messages_consumed: u64,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaTopicInfo {
2255    pub name: String,
2256    pub partitions: usize,
2257    pub replication_factor: i32,
2258}
2259
2260#[cfg(feature = "kafka")]
2261#[derive(Debug, Clone, Serialize, Deserialize)]
2262pub struct KafkaConsumerGroupInfo {
2263    pub group_id: String,
2264    pub members: usize,
2265    pub state: String,
2266}
2267
2268#[cfg(feature = "kafka")]
2269/// Get Kafka broker statistics
2270async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2271    if let Some(broker) = &state.kafka_broker {
2272        let topics = broker.topics.read().await;
2273        let consumer_groups = broker.consumer_groups.read().await;
2274
2275        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2276        // Note: Metrics access removed as metrics field is private
2277        // TODO: Add public method to KafkaMockBroker to access metrics if needed
2278
2279        let stats = KafkaBrokerStats {
2280            topics: topics.len(),
2281            partitions: total_partitions,
2282            consumer_groups: consumer_groups.groups().len(),
2283            messages_produced: 0, // Metrics not accessible
2284            messages_consumed: 0, // Metrics not accessible
2285        };
2286
2287        Json(stats).into_response()
2288    } else {
2289        (
2290            StatusCode::SERVICE_UNAVAILABLE,
2291            Json(serde_json::json!({
2292                "error": "Kafka broker not available",
2293                "message": "Kafka broker is not enabled or not available."
2294            })),
2295        )
2296            .into_response()
2297    }
2298}
2299
2300#[cfg(feature = "kafka")]
2301/// List Kafka topics
2302async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2303    if let Some(broker) = &state.kafka_broker {
2304        let topics = broker.topics.read().await;
2305        let topic_list: Vec<KafkaTopicInfo> = topics
2306            .iter()
2307            .map(|(name, topic)| KafkaTopicInfo {
2308                name: name.clone(),
2309                partitions: topic.partitions.len(),
2310                replication_factor: topic.config.replication_factor as i32,
2311            })
2312            .collect();
2313
2314        Json(serde_json::json!({
2315            "topics": topic_list
2316        }))
2317        .into_response()
2318    } else {
2319        (
2320            StatusCode::SERVICE_UNAVAILABLE,
2321            Json(serde_json::json!({
2322                "error": "Kafka broker not available",
2323                "message": "Kafka broker is not enabled or not available."
2324            })),
2325        )
2326            .into_response()
2327    }
2328}
2329
2330#[cfg(feature = "kafka")]
2331/// Get Kafka topic details
2332async fn get_kafka_topic(
2333    State(state): State<ManagementState>,
2334    Path(topic_name): Path<String>,
2335) -> impl IntoResponse {
2336    if let Some(broker) = &state.kafka_broker {
2337        let topics = broker.topics.read().await;
2338        if let Some(topic) = topics.get(&topic_name) {
2339            Json(serde_json::json!({
2340                "name": topic_name,
2341                "partitions": topic.partitions.len(),
2342                "replication_factor": topic.config.replication_factor,
2343                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2344                    "id": idx as i32,
2345                    "leader": 0,
2346                    "replicas": vec![0],
2347                    "message_count": partition.messages.len()
2348                })).collect::<Vec<_>>()
2349            })).into_response()
2350        } else {
2351            (
2352                StatusCode::NOT_FOUND,
2353                Json(serde_json::json!({
2354                    "error": "Topic not found",
2355                    "topic": topic_name
2356                })),
2357            )
2358                .into_response()
2359        }
2360    } else {
2361        (
2362            StatusCode::SERVICE_UNAVAILABLE,
2363            Json(serde_json::json!({
2364                "error": "Kafka broker not available",
2365                "message": "Kafka broker is not enabled or not available."
2366            })),
2367        )
2368            .into_response()
2369    }
2370}
2371
2372#[cfg(feature = "kafka")]
2373/// List Kafka consumer groups
2374async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2375    if let Some(broker) = &state.kafka_broker {
2376        let consumer_groups = broker.consumer_groups.read().await;
2377        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2378            .groups()
2379            .iter()
2380            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2381                group_id: group_id.clone(),
2382                members: group.members.len(),
2383                state: "Stable".to_string(), // Simplified - could be more detailed
2384            })
2385            .collect();
2386
2387        Json(serde_json::json!({
2388            "groups": groups
2389        }))
2390        .into_response()
2391    } else {
2392        (
2393            StatusCode::SERVICE_UNAVAILABLE,
2394            Json(serde_json::json!({
2395                "error": "Kafka broker not available",
2396                "message": "Kafka broker is not enabled or not available."
2397            })),
2398        )
2399            .into_response()
2400    }
2401}
2402
2403#[cfg(feature = "kafka")]
2404/// Get Kafka consumer group details
2405async fn get_kafka_group(
2406    State(state): State<ManagementState>,
2407    Path(group_id): Path<String>,
2408) -> impl IntoResponse {
2409    if let Some(broker) = &state.kafka_broker {
2410        let consumer_groups = broker.consumer_groups.read().await;
2411        if let Some(group) = consumer_groups.groups().get(&group_id) {
2412            Json(serde_json::json!({
2413                "group_id": group_id,
2414                "members": group.members.len(),
2415                "state": "Stable",
2416                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2417                    "member_id": member_id,
2418                    "client_id": member.client_id,
2419                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2420                        "topic": a.topic,
2421                        "partitions": a.partitions
2422                    })).collect::<Vec<_>>()
2423                })).collect::<Vec<_>>(),
2424                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2425                    "topic": topic,
2426                    "partition": partition,
2427                    "offset": offset
2428                })).collect::<Vec<_>>()
2429            })).into_response()
2430        } else {
2431            (
2432                StatusCode::NOT_FOUND,
2433                Json(serde_json::json!({
2434                    "error": "Consumer group not found",
2435                    "group_id": group_id
2436                })),
2437            )
2438                .into_response()
2439        }
2440    } else {
2441        (
2442            StatusCode::SERVICE_UNAVAILABLE,
2443            Json(serde_json::json!({
2444                "error": "Kafka broker not available",
2445                "message": "Kafka broker is not enabled or not available."
2446            })),
2447        )
2448            .into_response()
2449    }
2450}
2451
2452// ========== Kafka Produce Handler ==========
2453
2454#[cfg(feature = "kafka")]
2455#[derive(Debug, Deserialize)]
2456pub struct KafkaProduceRequest {
2457    /// Topic to produce to
2458    pub topic: String,
2459    /// Message key (optional)
2460    #[serde(default)]
2461    pub key: Option<String>,
2462    /// Message value (JSON string or plain string)
2463    pub value: String,
2464    /// Partition ID (optional, auto-assigned if not provided)
2465    #[serde(default)]
2466    pub partition: Option<i32>,
2467    /// Message headers (optional, key-value pairs)
2468    #[serde(default)]
2469    pub headers: Option<std::collections::HashMap<String, String>>,
2470}
2471
2472#[cfg(feature = "kafka")]
2473/// Produce a message to a Kafka topic
2474async fn produce_kafka_message(
2475    State(state): State<ManagementState>,
2476    Json(request): Json<KafkaProduceRequest>,
2477) -> impl IntoResponse {
2478    if let Some(broker) = &state.kafka_broker {
2479        let mut topics = broker.topics.write().await;
2480
2481        // Get or create the topic
2482        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2483            mockforge_kafka::topics::Topic::new(
2484                request.topic.clone(),
2485                mockforge_kafka::topics::TopicConfig::default(),
2486            )
2487        });
2488
2489        // Determine partition
2490        let partition_id = if let Some(partition) = request.partition {
2491            partition
2492        } else {
2493            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2494        };
2495
2496        // Validate partition exists
2497        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2498            return (
2499                StatusCode::BAD_REQUEST,
2500                Json(serde_json::json!({
2501                    "error": "Invalid partition",
2502                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2503                })),
2504            )
2505                .into_response();
2506        }
2507
2508        // Create the message
2509        let key_clone = request.key.clone();
2510        let headers_clone = request.headers.clone();
2511        let message = mockforge_kafka::partitions::KafkaMessage {
2512            offset: 0, // Will be set by partition.append
2513            timestamp: chrono::Utc::now().timestamp_millis(),
2514            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2515            value: request.value.as_bytes().to_vec(),
2516            headers: headers_clone
2517                .clone()
2518                .unwrap_or_default()
2519                .into_iter()
2520                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2521                .collect(),
2522        };
2523
2524        // Produce to partition
2525        match topic_entry.produce(partition_id, message).await {
2526            Ok(offset) => {
2527                // Note: Metrics recording removed as metrics field is private
2528                // TODO: Add public method to KafkaMockBroker to record metrics if needed
2529
2530                // Emit message event for real-time monitoring
2531                #[cfg(feature = "kafka")]
2532                {
2533                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2534                        topic: request.topic.clone(),
2535                        key: key_clone,
2536                        value: request.value.clone(),
2537                        partition: partition_id,
2538                        offset,
2539                        headers: headers_clone,
2540                        timestamp: chrono::Utc::now().to_rfc3339(),
2541                    });
2542                    let _ = state.message_events.send(event);
2543                }
2544
2545                Json(serde_json::json!({
2546                    "success": true,
2547                    "message": format!("Message produced to topic '{}'", request.topic),
2548                    "topic": request.topic,
2549                    "partition": partition_id,
2550                    "offset": offset
2551                }))
2552                .into_response()
2553            }
2554            Err(e) => (
2555                StatusCode::INTERNAL_SERVER_ERROR,
2556                Json(serde_json::json!({
2557                    "error": "Failed to produce message",
2558                    "message": e.to_string()
2559                })),
2560            )
2561                .into_response(),
2562        }
2563    } else {
2564        (
2565            StatusCode::SERVICE_UNAVAILABLE,
2566            Json(serde_json::json!({
2567                "error": "Kafka broker not available",
2568                "message": "Kafka broker is not enabled or not available."
2569            })),
2570        )
2571            .into_response()
2572    }
2573}
2574
2575#[cfg(feature = "kafka")]
2576#[derive(Debug, Deserialize)]
2577pub struct KafkaBatchProduceRequest {
2578    /// List of messages to produce
2579    pub messages: Vec<KafkaProduceRequest>,
2580    /// Delay between messages in milliseconds
2581    #[serde(default = "default_delay")]
2582    pub delay_ms: u64,
2583}
2584
2585#[cfg(feature = "kafka")]
2586/// Produce multiple messages to Kafka topics
2587async fn produce_kafka_batch(
2588    State(state): State<ManagementState>,
2589    Json(request): Json<KafkaBatchProduceRequest>,
2590) -> impl IntoResponse {
2591    if let Some(broker) = &state.kafka_broker {
2592        if request.messages.is_empty() {
2593            return (
2594                StatusCode::BAD_REQUEST,
2595                Json(serde_json::json!({
2596                    "error": "Empty batch",
2597                    "message": "At least one message is required"
2598                })),
2599            )
2600                .into_response();
2601        }
2602
2603        let mut results = Vec::new();
2604
2605        for (index, msg_request) in request.messages.iter().enumerate() {
2606            let mut topics = broker.topics.write().await;
2607
2608            // Get or create the topic
2609            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2610                mockforge_kafka::topics::Topic::new(
2611                    msg_request.topic.clone(),
2612                    mockforge_kafka::topics::TopicConfig::default(),
2613                )
2614            });
2615
2616            // Determine partition
2617            let partition_id = if let Some(partition) = msg_request.partition {
2618                partition
2619            } else {
2620                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2621            };
2622
2623            // Validate partition exists
2624            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2625                results.push(serde_json::json!({
2626                    "index": index,
2627                    "success": false,
2628                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2629                }));
2630                continue;
2631            }
2632
2633            // Create the message
2634            let message = mockforge_kafka::partitions::KafkaMessage {
2635                offset: 0,
2636                timestamp: chrono::Utc::now().timestamp_millis(),
2637                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2638                value: msg_request.value.as_bytes().to_vec(),
2639                headers: msg_request
2640                    .headers
2641                    .clone()
2642                    .unwrap_or_default()
2643                    .into_iter()
2644                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2645                    .collect(),
2646            };
2647
2648            // Produce to partition
2649            match topic_entry.produce(partition_id, message).await {
2650                Ok(offset) => {
2651                    // Note: Metrics recording removed as metrics field is private
2652                    // TODO: Add public method to KafkaMockBroker to record metrics if needed
2653
2654                    // Emit message event
2655                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2656                        topic: msg_request.topic.clone(),
2657                        key: msg_request.key.clone(),
2658                        value: msg_request.value.clone(),
2659                        partition: partition_id,
2660                        offset,
2661                        headers: msg_request.headers.clone(),
2662                        timestamp: chrono::Utc::now().to_rfc3339(),
2663                    });
2664                    let _ = state.message_events.send(event);
2665
2666                    results.push(serde_json::json!({
2667                        "index": index,
2668                        "success": true,
2669                        "topic": msg_request.topic,
2670                        "partition": partition_id,
2671                        "offset": offset
2672                    }));
2673                }
2674                Err(e) => {
2675                    results.push(serde_json::json!({
2676                        "index": index,
2677                        "success": false,
2678                        "error": e.to_string()
2679                    }));
2680                }
2681            }
2682
2683            // Add delay between messages (except for the last one)
2684            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2685                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2686            }
2687        }
2688
2689        let success_count =
2690            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2691
2692        Json(serde_json::json!({
2693            "success": true,
2694            "total": request.messages.len(),
2695            "succeeded": success_count,
2696            "failed": request.messages.len() - success_count,
2697            "results": results
2698        }))
2699        .into_response()
2700    } else {
2701        (
2702            StatusCode::SERVICE_UNAVAILABLE,
2703            Json(serde_json::json!({
2704                "error": "Kafka broker not available",
2705                "message": "Kafka broker is not enabled or not available."
2706            })),
2707        )
2708            .into_response()
2709    }
2710}
2711
2712// ========== Real-time Message Streaming (SSE) ==========
2713
2714#[cfg(feature = "mqtt")]
2715/// SSE stream for MQTT messages
2716async fn mqtt_messages_stream(
2717    State(state): State<ManagementState>,
2718    Query(params): Query<std::collections::HashMap<String, String>>,
2719) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2720    let rx = state.message_events.subscribe();
2721    let topic_filter = params.get("topic").cloned();
2722
2723    let stream = stream::unfold(rx, move |mut rx| {
2724        let topic_filter = topic_filter.clone();
2725
2726        async move {
2727            loop {
2728                match rx.recv().await {
2729                    Ok(MessageEvent::Mqtt(event)) => {
2730                        // Apply topic filter if specified
2731                        if let Some(filter) = &topic_filter {
2732                            if !event.topic.contains(filter) {
2733                                continue;
2734                            }
2735                        }
2736
2737                        let event_json = serde_json::json!({
2738                            "protocol": "mqtt",
2739                            "topic": event.topic,
2740                            "payload": event.payload,
2741                            "qos": event.qos,
2742                            "retain": event.retain,
2743                            "timestamp": event.timestamp,
2744                        });
2745
2746                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2747                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2748                            return Some((Ok(sse_event), rx));
2749                        }
2750                    }
2751                    #[cfg(feature = "kafka")]
2752                    Ok(MessageEvent::Kafka(_)) => {
2753                        // Skip Kafka events in MQTT stream
2754                        continue;
2755                    }
2756                    Err(broadcast::error::RecvError::Closed) => {
2757                        return None;
2758                    }
2759                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2760                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2761                        continue;
2762                    }
2763                }
2764            }
2765        }
2766    });
2767
2768    Sse::new(stream).keep_alive(
2769        axum::response::sse::KeepAlive::new()
2770            .interval(std::time::Duration::from_secs(15))
2771            .text("keep-alive-text"),
2772    )
2773}
2774
2775#[cfg(feature = "kafka")]
2776/// SSE stream for Kafka messages
2777async fn kafka_messages_stream(
2778    State(state): State<ManagementState>,
2779    Query(params): Query<std::collections::HashMap<String, String>>,
2780) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2781    let mut rx = state.message_events.subscribe();
2782    let topic_filter = params.get("topic").cloned();
2783
2784    let stream = stream::unfold(rx, move |mut rx| {
2785        let topic_filter = topic_filter.clone();
2786
2787        async move {
2788            loop {
2789                match rx.recv().await {
2790                    #[cfg(feature = "mqtt")]
2791                    Ok(MessageEvent::Mqtt(_)) => {
2792                        // Skip MQTT events in Kafka stream
2793                        continue;
2794                    }
2795                    Ok(MessageEvent::Kafka(event)) => {
2796                        // Apply topic filter if specified
2797                        if let Some(filter) = &topic_filter {
2798                            if !event.topic.contains(filter) {
2799                                continue;
2800                            }
2801                        }
2802
2803                        let event_json = serde_json::json!({
2804                            "protocol": "kafka",
2805                            "topic": event.topic,
2806                            "key": event.key,
2807                            "value": event.value,
2808                            "partition": event.partition,
2809                            "offset": event.offset,
2810                            "headers": event.headers,
2811                            "timestamp": event.timestamp,
2812                        });
2813
2814                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2815                            let sse_event =
2816                                Event::default().event("kafka_message").data(event_data);
2817                            return Some((Ok(sse_event), rx));
2818                        }
2819                    }
2820                    Err(broadcast::error::RecvError::Closed) => {
2821                        return None;
2822                    }
2823                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2824                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2825                        continue;
2826                    }
2827                }
2828            }
2829        }
2830    });
2831
2832    Sse::new(stream).keep_alive(
2833        axum::response::sse::KeepAlive::new()
2834            .interval(std::time::Duration::from_secs(15))
2835            .text("keep-alive-text"),
2836    )
2837}
2838
2839// ========== AI-Powered Features ==========
2840
2841/// Request for AI-powered API specification generation
2842#[derive(Debug, Deserialize)]
2843pub struct GenerateSpecRequest {
2844    /// Natural language description of the API to generate
2845    pub query: String,
2846    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2847    pub spec_type: String,
2848    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2849    pub api_version: Option<String>,
2850}
2851
2852/// Request for OpenAPI generation from recorded traffic
2853#[derive(Debug, Deserialize)]
2854pub struct GenerateOpenApiFromTrafficRequest {
2855    /// Path to recorder database (optional, defaults to ./recordings.db)
2856    #[serde(default)]
2857    pub database_path: Option<String>,
2858    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2859    #[serde(default)]
2860    pub since: Option<String>,
2861    /// End time for filtering (ISO 8601 format)
2862    #[serde(default)]
2863    pub until: Option<String>,
2864    /// Path pattern filter (supports wildcards, e.g., /api/*)
2865    #[serde(default)]
2866    pub path_pattern: Option<String>,
2867    /// Minimum confidence score for including paths (0.0 to 1.0)
2868    #[serde(default = "default_min_confidence")]
2869    pub min_confidence: f64,
2870}
2871
2872fn default_min_confidence() -> f64 {
2873    0.7
2874}
2875
2876/// Generate API specification from natural language using AI
2877#[cfg(feature = "data-faker")]
2878async fn generate_ai_spec(
2879    State(_state): State<ManagementState>,
2880    Json(request): Json<GenerateSpecRequest>,
2881) -> impl IntoResponse {
2882    use mockforge_data::rag::{
2883        config::{EmbeddingProvider, LlmProvider, RagConfig},
2884        engine::RagEngine,
2885        storage::{DocumentStorage, StorageFactory},
2886    };
2887    use std::sync::Arc;
2888
2889    // Build RAG config from environment variables
2890    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2891        .ok()
2892        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2893
2894    // Check if RAG is configured - require API key
2895    if api_key.is_none() {
2896        return (
2897            StatusCode::SERVICE_UNAVAILABLE,
2898            Json(serde_json::json!({
2899                "error": "AI service not configured",
2900                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2901            })),
2902        )
2903            .into_response();
2904    }
2905
2906    // Build RAG configuration
2907    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2908        .unwrap_or_else(|_| "openai".to_string())
2909        .to_lowercase();
2910
2911    let provider = match provider_str.as_str() {
2912        "openai" => LlmProvider::OpenAI,
2913        "anthropic" => LlmProvider::Anthropic,
2914        "ollama" => LlmProvider::Ollama,
2915        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2916        _ => LlmProvider::OpenAI,
2917    };
2918
2919    let api_endpoint =
2920        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2921            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2922            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2923            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2924            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2925        });
2926
2927    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2928        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2929        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2930        LlmProvider::Ollama => "llama2".to_string(),
2931        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2932    });
2933
2934    // Build RagConfig using default() and override fields
2935    let mut rag_config = RagConfig::default();
2936    rag_config.provider = provider;
2937    rag_config.api_endpoint = api_endpoint;
2938    rag_config.api_key = api_key;
2939    rag_config.model = model;
2940    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2941        .unwrap_or_else(|_| "4096".to_string())
2942        .parse()
2943        .unwrap_or(4096);
2944    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2945        .unwrap_or_else(|_| "0.3".to_string())
2946        .parse()
2947        .unwrap_or(0.3); // Lower temperature for more structured output
2948    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2949        .unwrap_or_else(|_| "60".to_string())
2950        .parse()
2951        .unwrap_or(60);
2952    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2953        .unwrap_or_else(|_| "4000".to_string())
2954        .parse()
2955        .unwrap_or(4000);
2956
2957    // Build the prompt for spec generation
2958    let spec_type_label = match request.spec_type.as_str() {
2959        "openapi" => "OpenAPI 3.0",
2960        "graphql" => "GraphQL",
2961        "asyncapi" => "AsyncAPI",
2962        _ => "OpenAPI 3.0",
2963    };
2964
2965    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2966
2967    let prompt = format!(
2968        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2969
2970User Requirements:
2971{}
2972
2973Instructions:
29741. Generate a complete, valid {} specification
29752. Include all paths, operations, request/response schemas, and components
29763. Use realistic field names and data types
29774. Include proper descriptions and examples
29785. Follow {} best practices
29796. Return ONLY the specification, no additional explanation
29807. For OpenAPI, use version {}
2981
2982Return the specification in {} format."#,
2983        spec_type_label,
2984        request.query,
2985        spec_type_label,
2986        spec_type_label,
2987        api_version,
2988        if request.spec_type == "graphql" {
2989            "GraphQL SDL"
2990        } else {
2991            "YAML"
2992        }
2993    );
2994
2995    // Create in-memory storage for RAG engine
2996    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
2997    // We need to use unsafe transmute or create a wrapper, but for now we'll use
2998    // a simpler approach: create InMemoryStorage directly
2999    use mockforge_data::rag::storage::InMemoryStorage;
3000    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3001
3002    // Create RAG engine
3003    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3004        Ok(engine) => engine,
3005        Err(e) => {
3006            return (
3007                StatusCode::INTERNAL_SERVER_ERROR,
3008                Json(serde_json::json!({
3009                    "error": "Failed to initialize RAG engine",
3010                    "message": e.to_string()
3011                })),
3012            )
3013                .into_response();
3014        }
3015    };
3016
3017    // Generate using RAG engine
3018    match rag_engine.generate(&prompt, None).await {
3019        Ok(generated_text) => {
3020            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3021            let spec = if request.spec_type == "graphql" {
3022                // For GraphQL, extract SDL
3023                extract_graphql_schema(&generated_text)
3024            } else {
3025                // For OpenAPI/AsyncAPI, extract YAML
3026                extract_yaml_spec(&generated_text)
3027            };
3028
3029            Json(serde_json::json!({
3030                "success": true,
3031                "spec": spec,
3032                "spec_type": request.spec_type,
3033            }))
3034            .into_response()
3035        }
3036        Err(e) => (
3037            StatusCode::INTERNAL_SERVER_ERROR,
3038            Json(serde_json::json!({
3039                "error": "AI generation failed",
3040                "message": e.to_string()
3041            })),
3042        )
3043            .into_response(),
3044    }
3045}
3046
3047#[cfg(not(feature = "data-faker"))]
3048async fn generate_ai_spec(
3049    State(_state): State<ManagementState>,
3050    Json(_request): Json<GenerateSpecRequest>,
3051) -> impl IntoResponse {
3052    (
3053        StatusCode::NOT_IMPLEMENTED,
3054        Json(serde_json::json!({
3055            "error": "AI features not enabled",
3056            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3057        })),
3058    )
3059        .into_response()
3060}
3061
3062/// Generate OpenAPI specification from recorded traffic
3063#[cfg(feature = "behavioral-cloning")]
3064async fn generate_openapi_from_traffic(
3065    State(_state): State<ManagementState>,
3066    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3067) -> impl IntoResponse {
3068    use chrono::{DateTime, Utc};
3069    use mockforge_core::intelligent_behavior::{
3070        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3071        IntelligentBehaviorConfig,
3072    };
3073    use mockforge_recorder::{
3074        database::RecorderDatabase,
3075        openapi_export::{QueryFilters, RecordingsToOpenApi},
3076    };
3077    use std::path::PathBuf;
3078
3079    // Determine database path
3080    let db_path = if let Some(ref path) = request.database_path {
3081        PathBuf::from(path)
3082    } else {
3083        std::env::current_dir()
3084            .unwrap_or_else(|_| PathBuf::from("."))
3085            .join("recordings.db")
3086    };
3087
3088    // Open database
3089    let db = match RecorderDatabase::new(&db_path).await {
3090        Ok(db) => db,
3091        Err(e) => {
3092            return (
3093                StatusCode::BAD_REQUEST,
3094                Json(serde_json::json!({
3095                    "error": "Database error",
3096                    "message": format!("Failed to open recorder database: {}", e)
3097                })),
3098            )
3099                .into_response();
3100        }
3101    };
3102
3103    // Parse time filters
3104    let since_dt = if let Some(ref since_str) = request.since {
3105        match DateTime::parse_from_rfc3339(since_str) {
3106            Ok(dt) => Some(dt.with_timezone(&Utc)),
3107            Err(e) => {
3108                return (
3109                    StatusCode::BAD_REQUEST,
3110                    Json(serde_json::json!({
3111                        "error": "Invalid date format",
3112                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3113                    })),
3114                )
3115                    .into_response();
3116            }
3117        }
3118    } else {
3119        None
3120    };
3121
3122    let until_dt = if let Some(ref until_str) = request.until {
3123        match DateTime::parse_from_rfc3339(until_str) {
3124            Ok(dt) => Some(dt.with_timezone(&Utc)),
3125            Err(e) => {
3126                return (
3127                    StatusCode::BAD_REQUEST,
3128                    Json(serde_json::json!({
3129                        "error": "Invalid date format",
3130                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3131                    })),
3132                )
3133                    .into_response();
3134            }
3135        }
3136    } else {
3137        None
3138    };
3139
3140    // Build query filters
3141    let query_filters = QueryFilters {
3142        since: since_dt,
3143        until: until_dt,
3144        path_pattern: request.path_pattern.clone(),
3145        min_status_code: None,
3146        max_requests: Some(1000),
3147    };
3148
3149    // Query HTTP exchanges
3150    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3151    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3152    // dependency, so we need to manually convert to the local version.
3153    let exchanges_from_recorder =
3154        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3155            Ok(exchanges) => exchanges,
3156            Err(e) => {
3157                return (
3158                    StatusCode::INTERNAL_SERVER_ERROR,
3159                    Json(serde_json::json!({
3160                        "error": "Query error",
3161                        "message": format!("Failed to query HTTP exchanges: {}", e)
3162                    })),
3163                )
3164                    .into_response();
3165            }
3166        };
3167
3168    if exchanges_from_recorder.is_empty() {
3169        return (
3170            StatusCode::NOT_FOUND,
3171            Json(serde_json::json!({
3172                "error": "No exchanges found",
3173                "message": "No HTTP exchanges found matching the specified filters"
3174            })),
3175        )
3176            .into_response();
3177    }
3178
3179    // Convert to local HttpExchange type to avoid version mismatch
3180    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3181    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3182        .into_iter()
3183        .map(|e| LocalHttpExchange {
3184            method: e.method,
3185            path: e.path,
3186            query_params: e.query_params,
3187            headers: e.headers,
3188            body: e.body,
3189            body_encoding: e.body_encoding,
3190            status_code: e.status_code,
3191            response_headers: e.response_headers,
3192            response_body: e.response_body,
3193            response_body_encoding: e.response_body_encoding,
3194            timestamp: e.timestamp,
3195        })
3196        .collect();
3197
3198    // Create OpenAPI generator config
3199    let behavior_config = IntelligentBehaviorConfig::default();
3200    let gen_config = OpenApiGenerationConfig {
3201        min_confidence: request.min_confidence,
3202        behavior_model: Some(behavior_config.behavior_model),
3203    };
3204
3205    // Generate OpenAPI spec
3206    let generator = OpenApiSpecGenerator::new(gen_config);
3207    let result = match generator.generate_from_exchanges(exchanges).await {
3208        Ok(result) => result,
3209        Err(e) => {
3210            return (
3211                StatusCode::INTERNAL_SERVER_ERROR,
3212                Json(serde_json::json!({
3213                    "error": "Generation error",
3214                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3215                })),
3216            )
3217                .into_response();
3218        }
3219    };
3220
3221    // Prepare response
3222    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3223        raw.clone()
3224    } else {
3225        match serde_json::to_value(&result.spec.spec) {
3226            Ok(json) => json,
3227            Err(e) => {
3228                return (
3229                    StatusCode::INTERNAL_SERVER_ERROR,
3230                    Json(serde_json::json!({
3231                        "error": "Serialization error",
3232                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3233                    })),
3234                )
3235                    .into_response();
3236            }
3237        }
3238    };
3239
3240    // Build response with metadata
3241    let response = serde_json::json!({
3242        "spec": spec_json,
3243        "metadata": {
3244            "requests_analyzed": result.metadata.requests_analyzed,
3245            "paths_inferred": result.metadata.paths_inferred,
3246            "path_confidence": result.metadata.path_confidence,
3247            "generated_at": result.metadata.generated_at.to_rfc3339(),
3248            "duration_ms": result.metadata.duration_ms,
3249        }
3250    });
3251
3252    Json(response).into_response()
3253}
3254
3255/// List all rule explanations
3256async fn list_rule_explanations(
3257    State(state): State<ManagementState>,
3258    Query(params): Query<std::collections::HashMap<String, String>>,
3259) -> impl IntoResponse {
3260    use mockforge_core::intelligent_behavior::RuleType;
3261
3262    let explanations = state.rule_explanations.read().await;
3263    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3264
3265    // Filter by rule type if provided
3266    if let Some(rule_type_str) = params.get("rule_type") {
3267        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3268            explanations_vec.retain(|e| e.rule_type == rule_type);
3269        }
3270    }
3271
3272    // Filter by minimum confidence if provided
3273    if let Some(min_confidence_str) = params.get("min_confidence") {
3274        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3275            explanations_vec.retain(|e| e.confidence >= min_confidence);
3276        }
3277    }
3278
3279    // Sort by confidence (descending) and then by generated_at (descending)
3280    explanations_vec.sort_by(|a, b| {
3281        b.confidence
3282            .partial_cmp(&a.confidence)
3283            .unwrap_or(std::cmp::Ordering::Equal)
3284            .then_with(|| b.generated_at.cmp(&a.generated_at))
3285    });
3286
3287    Json(serde_json::json!({
3288        "explanations": explanations_vec,
3289        "total": explanations_vec.len(),
3290    }))
3291    .into_response()
3292}
3293
3294/// Get a specific rule explanation by ID
3295async fn get_rule_explanation(
3296    State(state): State<ManagementState>,
3297    Path(rule_id): Path<String>,
3298) -> impl IntoResponse {
3299    let explanations = state.rule_explanations.read().await;
3300
3301    match explanations.get(&rule_id) {
3302        Some(explanation) => Json(serde_json::json!({
3303            "explanation": explanation,
3304        }))
3305        .into_response(),
3306        None => (
3307            StatusCode::NOT_FOUND,
3308            Json(serde_json::json!({
3309                "error": "Rule explanation not found",
3310                "message": format!("No explanation found for rule ID: {}", rule_id)
3311            })),
3312        )
3313            .into_response(),
3314    }
3315}
3316
3317/// Request for learning from examples
3318#[derive(Debug, Deserialize)]
3319pub struct LearnFromExamplesRequest {
3320    /// Example request/response pairs to learn from
3321    pub examples: Vec<ExamplePairRequest>,
3322    /// Optional configuration override
3323    #[serde(default)]
3324    pub config: Option<serde_json::Value>,
3325}
3326
3327/// Example pair request format
3328#[derive(Debug, Deserialize)]
3329pub struct ExamplePairRequest {
3330    /// Request data (method, path, body, etc.)
3331    pub request: serde_json::Value,
3332    /// Response data (status_code, body, etc.)
3333    pub response: serde_json::Value,
3334}
3335
3336/// Learn behavioral rules from example pairs
3337///
3338/// This endpoint accepts example request/response pairs, generates behavioral rules
3339/// with explanations, and stores the explanations for later retrieval.
3340async fn learn_from_examples(
3341    State(state): State<ManagementState>,
3342    Json(request): Json<LearnFromExamplesRequest>,
3343) -> impl IntoResponse {
3344    use mockforge_core::intelligent_behavior::{
3345        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3346        rule_generator::{ExamplePair, RuleGenerator},
3347    };
3348
3349    if request.examples.is_empty() {
3350        return (
3351            StatusCode::BAD_REQUEST,
3352            Json(serde_json::json!({
3353                "error": "No examples provided",
3354                "message": "At least one example pair is required"
3355            })),
3356        )
3357            .into_response();
3358    }
3359
3360    // Convert request examples to ExamplePair format
3361    let example_pairs: Result<Vec<ExamplePair>, String> = request
3362        .examples
3363        .into_iter()
3364        .enumerate()
3365        .map(|(idx, ex)| {
3366            // Parse request JSON to extract method, path, body, etc.
3367            let method = ex
3368                .request
3369                .get("method")
3370                .and_then(|v| v.as_str())
3371                .map(|s| s.to_string())
3372                .unwrap_or_else(|| "GET".to_string());
3373            let path = ex
3374                .request
3375                .get("path")
3376                .and_then(|v| v.as_str())
3377                .map(|s| s.to_string())
3378                .unwrap_or_else(|| "/".to_string());
3379            let request_body = ex.request.get("body").cloned();
3380            let query_params = ex
3381                .request
3382                .get("query_params")
3383                .and_then(|v| v.as_object())
3384                .map(|obj| {
3385                    obj.iter()
3386                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3387                        .collect()
3388                })
3389                .unwrap_or_default();
3390            let headers = ex
3391                .request
3392                .get("headers")
3393                .and_then(|v| v.as_object())
3394                .map(|obj| {
3395                    obj.iter()
3396                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3397                        .collect()
3398                })
3399                .unwrap_or_default();
3400
3401            // Parse response JSON to extract status, body, etc.
3402            let status = ex
3403                .response
3404                .get("status_code")
3405                .or_else(|| ex.response.get("status"))
3406                .and_then(|v| v.as_u64())
3407                .map(|n| n as u16)
3408                .unwrap_or(200);
3409            let response_body = ex.response.get("body").cloned();
3410
3411            Ok(ExamplePair {
3412                method,
3413                path,
3414                request: request_body,
3415                status,
3416                response: response_body,
3417                query_params,
3418                headers,
3419                metadata: {
3420                    let mut meta = std::collections::HashMap::new();
3421                    meta.insert("source".to_string(), "api".to_string());
3422                    meta.insert("example_index".to_string(), idx.to_string());
3423                    meta
3424                },
3425            })
3426        })
3427        .collect();
3428
3429    let example_pairs = match example_pairs {
3430        Ok(pairs) => pairs,
3431        Err(e) => {
3432            return (
3433                StatusCode::BAD_REQUEST,
3434                Json(serde_json::json!({
3435                    "error": "Invalid examples",
3436                    "message": e
3437                })),
3438            )
3439                .into_response();
3440        }
3441    };
3442
3443    // Create behavior config (use provided config or default)
3444    let behavior_config = if let Some(config_json) = request.config {
3445        // Try to deserialize custom config, fall back to default
3446        serde_json::from_value(config_json)
3447            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3448            .behavior_model
3449    } else {
3450        BehaviorModelConfig::default()
3451    };
3452
3453    // Create rule generator
3454    let generator = RuleGenerator::new(behavior_config);
3455
3456    // Generate rules with explanations
3457    let (rules, explanations) =
3458        match generator.generate_rules_with_explanations(example_pairs).await {
3459            Ok(result) => result,
3460            Err(e) => {
3461                return (
3462                    StatusCode::INTERNAL_SERVER_ERROR,
3463                    Json(serde_json::json!({
3464                        "error": "Rule generation failed",
3465                        "message": format!("Failed to generate rules: {}", e)
3466                    })),
3467                )
3468                    .into_response();
3469            }
3470        };
3471
3472    // Store explanations in ManagementState
3473    {
3474        let mut stored_explanations = state.rule_explanations.write().await;
3475        for explanation in &explanations {
3476            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3477        }
3478    }
3479
3480    // Prepare response
3481    let response = serde_json::json!({
3482        "success": true,
3483        "rules_generated": {
3484            "consistency_rules": rules.consistency_rules.len(),
3485            "schemas": rules.schemas.len(),
3486            "state_machines": rules.state_transitions.len(),
3487            "system_prompt": !rules.system_prompt.is_empty(),
3488        },
3489        "explanations": explanations.iter().map(|e| serde_json::json!({
3490            "rule_id": e.rule_id,
3491            "rule_type": e.rule_type,
3492            "confidence": e.confidence,
3493            "reasoning": e.reasoning,
3494        })).collect::<Vec<_>>(),
3495        "total_explanations": explanations.len(),
3496    });
3497
3498    Json(response).into_response()
3499}
3500
3501fn extract_yaml_spec(text: &str) -> String {
3502    // Try to find YAML code blocks
3503    if let Some(start) = text.find("```yaml") {
3504        let yaml_start = text[start + 7..].trim_start();
3505        if let Some(end) = yaml_start.find("```") {
3506            return yaml_start[..end].trim().to_string();
3507        }
3508    }
3509    if let Some(start) = text.find("```") {
3510        let content_start = text[start + 3..].trim_start();
3511        if let Some(end) = content_start.find("```") {
3512            return content_start[..end].trim().to_string();
3513        }
3514    }
3515
3516    // Check if it starts with openapi: or asyncapi:
3517    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3518        return text.trim().to_string();
3519    }
3520
3521    // Return as-is if no code blocks found
3522    text.trim().to_string()
3523}
3524
3525/// Extract GraphQL schema from text content
3526fn extract_graphql_schema(text: &str) -> String {
3527    // Try to find GraphQL code blocks
3528    if let Some(start) = text.find("```graphql") {
3529        let schema_start = text[start + 10..].trim_start();
3530        if let Some(end) = schema_start.find("```") {
3531            return schema_start[..end].trim().to_string();
3532        }
3533    }
3534    if let Some(start) = text.find("```") {
3535        let content_start = text[start + 3..].trim_start();
3536        if let Some(end) = content_start.find("```") {
3537            return content_start[..end].trim().to_string();
3538        }
3539    }
3540
3541    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3542    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3543        return text.trim().to_string();
3544    }
3545
3546    text.trim().to_string()
3547}
3548
3549// ========== Chaos Engineering Management ==========
3550
3551/// Get current chaos engineering configuration
3552async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3553    #[cfg(feature = "chaos")]
3554    {
3555        if let Some(chaos_state) = &state.chaos_api_state {
3556            let config = chaos_state.config.read().await;
3557            // Convert ChaosConfig to JSON response format
3558            Json(serde_json::json!({
3559                "enabled": config.enabled,
3560                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3561                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3562                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3563                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3564            }))
3565            .into_response()
3566        } else {
3567            // Chaos API not available, return default
3568            Json(serde_json::json!({
3569                "enabled": false,
3570                "latency": null,
3571                "fault_injection": null,
3572                "rate_limit": null,
3573                "traffic_shaping": null,
3574            }))
3575            .into_response()
3576        }
3577    }
3578    #[cfg(not(feature = "chaos"))]
3579    {
3580        // Chaos feature not enabled
3581        Json(serde_json::json!({
3582            "enabled": false,
3583            "latency": null,
3584            "fault_injection": null,
3585            "rate_limit": null,
3586            "traffic_shaping": null,
3587        }))
3588        .into_response()
3589    }
3590}
3591
3592/// Request to update chaos configuration
3593#[derive(Debug, Deserialize)]
3594pub struct ChaosConfigUpdate {
3595    /// Whether to enable chaos engineering
3596    pub enabled: Option<bool>,
3597    /// Latency configuration
3598    pub latency: Option<serde_json::Value>,
3599    /// Fault injection configuration
3600    pub fault_injection: Option<serde_json::Value>,
3601    /// Rate limiting configuration
3602    pub rate_limit: Option<serde_json::Value>,
3603    /// Traffic shaping configuration
3604    pub traffic_shaping: Option<serde_json::Value>,
3605}
3606
3607/// Update chaos engineering configuration
3608async fn update_chaos_config(
3609    State(state): State<ManagementState>,
3610    Json(config_update): Json<ChaosConfigUpdate>,
3611) -> impl IntoResponse {
3612    #[cfg(feature = "chaos")]
3613    {
3614        if let Some(chaos_state) = &state.chaos_api_state {
3615            use mockforge_chaos::config::{
3616                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3617                TrafficShapingConfig,
3618            };
3619
3620            let mut config = chaos_state.config.write().await;
3621
3622            // Update enabled flag if provided
3623            if let Some(enabled) = config_update.enabled {
3624                config.enabled = enabled;
3625            }
3626
3627            // Update latency config if provided
3628            if let Some(latency_json) = config_update.latency {
3629                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3630                    config.latency = Some(latency);
3631                }
3632            }
3633
3634            // Update fault injection config if provided
3635            if let Some(fault_json) = config_update.fault_injection {
3636                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3637                    config.fault_injection = Some(fault);
3638                }
3639            }
3640
3641            // Update rate limit config if provided
3642            if let Some(rate_json) = config_update.rate_limit {
3643                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3644                    config.rate_limit = Some(rate);
3645                }
3646            }
3647
3648            // Update traffic shaping config if provided
3649            if let Some(traffic_json) = config_update.traffic_shaping {
3650                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3651                    config.traffic_shaping = Some(traffic);
3652                }
3653            }
3654
3655            // Reinitialize middleware injectors with new config
3656            // The middleware will pick up the changes on the next request
3657            drop(config);
3658
3659            info!("Chaos configuration updated successfully");
3660            Json(serde_json::json!({
3661                "success": true,
3662                "message": "Chaos configuration updated and applied"
3663            }))
3664            .into_response()
3665        } else {
3666            (
3667                StatusCode::SERVICE_UNAVAILABLE,
3668                Json(serde_json::json!({
3669                    "success": false,
3670                    "error": "Chaos API not available",
3671                    "message": "Chaos engineering is not enabled or configured"
3672                })),
3673            )
3674                .into_response()
3675        }
3676    }
3677    #[cfg(not(feature = "chaos"))]
3678    {
3679        (
3680            StatusCode::NOT_IMPLEMENTED,
3681            Json(serde_json::json!({
3682                "success": false,
3683                "error": "Chaos feature not enabled",
3684                "message": "Chaos engineering feature is not compiled into this build"
3685            })),
3686        )
3687            .into_response()
3688    }
3689}
3690
3691// ========== Network Profile Management ==========
3692
3693/// List available network profiles
3694async fn list_network_profiles() -> impl IntoResponse {
3695    use mockforge_core::network_profiles::NetworkProfileCatalog;
3696
3697    let catalog = NetworkProfileCatalog::default();
3698    let profiles: Vec<serde_json::Value> = catalog
3699        .list_profiles_with_description()
3700        .iter()
3701        .map(|(name, description)| {
3702            serde_json::json!({
3703                "name": name,
3704                "description": description,
3705            })
3706        })
3707        .collect();
3708
3709    Json(serde_json::json!({
3710        "profiles": profiles
3711    }))
3712    .into_response()
3713}
3714
3715#[derive(Debug, Deserialize)]
3716/// Request to apply a network profile
3717pub struct ApplyNetworkProfileRequest {
3718    /// Name of the network profile to apply
3719    pub profile_name: String,
3720}
3721
3722/// Apply a network profile
3723async fn apply_network_profile(
3724    State(state): State<ManagementState>,
3725    Json(request): Json<ApplyNetworkProfileRequest>,
3726) -> impl IntoResponse {
3727    use mockforge_core::network_profiles::NetworkProfileCatalog;
3728
3729    let catalog = NetworkProfileCatalog::default();
3730    if let Some(profile) = catalog.get(&request.profile_name) {
3731        // Apply profile to server configuration if available
3732        // NetworkProfile contains latency and traffic_shaping configs
3733        if let Some(server_config) = &state.server_config {
3734            let mut config = server_config.write().await;
3735
3736            // Apply network profile's traffic shaping to core config
3737            use mockforge_core::config::NetworkShapingConfig;
3738
3739            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3740            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3741            // which has bandwidth and burst_loss fields
3742            let network_shaping = NetworkShapingConfig {
3743                enabled: profile.traffic_shaping.bandwidth.enabled
3744                    || profile.traffic_shaping.burst_loss.enabled,
3745                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3746                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3747                max_connections: 1000, // Default value
3748            };
3749
3750            // Update chaos config if it exists, or create it
3751            // Chaos config is in observability.chaos, not core.chaos
3752            if let Some(ref mut chaos) = config.observability.chaos {
3753                chaos.traffic_shaping = Some(network_shaping);
3754            } else {
3755                // Create minimal chaos config with traffic shaping
3756                use mockforge_core::config::ChaosEngConfig;
3757                config.observability.chaos = Some(ChaosEngConfig {
3758                    enabled: true,
3759                    latency: None,
3760                    fault_injection: None,
3761                    rate_limit: None,
3762                    traffic_shaping: Some(network_shaping),
3763                    scenario: None,
3764                });
3765            }
3766
3767            info!("Network profile '{}' applied to server configuration", request.profile_name);
3768        } else {
3769            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3770        }
3771
3772        // Also update chaos API state if available
3773        #[cfg(feature = "chaos")]
3774        {
3775            if let Some(chaos_state) = &state.chaos_api_state {
3776                use mockforge_chaos::config::TrafficShapingConfig;
3777
3778                let mut chaos_config = chaos_state.config.write().await;
3779                // Apply profile's traffic shaping to chaos API state
3780                let chaos_traffic_shaping = TrafficShapingConfig {
3781                    enabled: profile.traffic_shaping.bandwidth.enabled
3782                        || profile.traffic_shaping.burst_loss.enabled,
3783                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3784                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3785                    max_connections: 0,
3786                    connection_timeout_ms: 30000,
3787                };
3788                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3789                chaos_config.enabled = true; // Enable chaos when applying a profile
3790                drop(chaos_config);
3791                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3792            }
3793        }
3794
3795        Json(serde_json::json!({
3796            "success": true,
3797            "message": format!("Network profile '{}' applied", request.profile_name),
3798            "profile": {
3799                "name": profile.name,
3800                "description": profile.description,
3801            }
3802        }))
3803        .into_response()
3804    } else {
3805        (
3806            StatusCode::NOT_FOUND,
3807            Json(serde_json::json!({
3808                "error": "Profile not found",
3809                "message": format!("Network profile '{}' not found", request.profile_name)
3810            })),
3811        )
3812            .into_response()
3813    }
3814}
3815
3816/// Build the management API router with UI Builder support
3817pub fn management_router_with_ui_builder(
3818    state: ManagementState,
3819    server_config: mockforge_core::config::ServerConfig,
3820) -> Router {
3821    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3822
3823    // Create the base management router
3824    let management = management_router(state);
3825
3826    // Create UI Builder state and router
3827    let ui_builder_state = UIBuilderState::new(server_config);
3828    let ui_builder = create_ui_builder_router(ui_builder_state);
3829
3830    // Nest UI Builder under /ui-builder
3831    management.nest("/ui-builder", ui_builder)
3832}
3833
3834/// Build management router with spec import API
3835pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3836    use crate::spec_import::{spec_import_router, SpecImportState};
3837
3838    // Create base management router
3839    let management = management_router(state);
3840
3841    // Merge with spec import router
3842    Router::new()
3843        .merge(management)
3844        .merge(spec_import_router(SpecImportState::new()))
3845}
3846
3847#[cfg(test)]
3848mod tests {
3849    use super::*;
3850
3851    #[tokio::test]
3852    async fn test_create_and_get_mock() {
3853        let state = ManagementState::new(None, None, 3000);
3854
3855        let mock = MockConfig {
3856            id: "test-1".to_string(),
3857            name: "Test Mock".to_string(),
3858            method: "GET".to_string(),
3859            path: "/test".to_string(),
3860            response: MockResponse {
3861                body: serde_json::json!({"message": "test"}),
3862                headers: None,
3863            },
3864            enabled: true,
3865            latency_ms: None,
3866            status_code: Some(200),
3867            request_match: None,
3868            priority: None,
3869            scenario: None,
3870            required_scenario_state: None,
3871            new_scenario_state: None,
3872        };
3873
3874        // Create mock
3875        {
3876            let mut mocks = state.mocks.write().await;
3877            mocks.push(mock.clone());
3878        }
3879
3880        // Get mock
3881        let mocks = state.mocks.read().await;
3882        let found = mocks.iter().find(|m| m.id == "test-1");
3883        assert!(found.is_some());
3884        assert_eq!(found.unwrap().name, "Test Mock");
3885    }
3886
3887    #[tokio::test]
3888    async fn test_server_stats() {
3889        let state = ManagementState::new(None, None, 3000);
3890
3891        // Add some mocks
3892        {
3893            let mut mocks = state.mocks.write().await;
3894            mocks.push(MockConfig {
3895                id: "1".to_string(),
3896                name: "Mock 1".to_string(),
3897                method: "GET".to_string(),
3898                path: "/test1".to_string(),
3899                response: MockResponse {
3900                    body: serde_json::json!({}),
3901                    headers: None,
3902                },
3903                enabled: true,
3904                latency_ms: None,
3905                status_code: Some(200),
3906                request_match: None,
3907                priority: None,
3908                scenario: None,
3909                required_scenario_state: None,
3910                new_scenario_state: None,
3911            });
3912            mocks.push(MockConfig {
3913                id: "2".to_string(),
3914                name: "Mock 2".to_string(),
3915                method: "POST".to_string(),
3916                path: "/test2".to_string(),
3917                response: MockResponse {
3918                    body: serde_json::json!({}),
3919                    headers: None,
3920                },
3921                enabled: false,
3922                latency_ms: None,
3923                status_code: Some(201),
3924                request_match: None,
3925                priority: None,
3926                scenario: None,
3927                required_scenario_state: None,
3928                new_scenario_state: None,
3929            });
3930        }
3931
3932        let mocks = state.mocks.read().await;
3933        assert_eq!(mocks.len(), 2);
3934        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3935    }
3936}