mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if let Some(path) = json_path.strip_prefix("$.") {
317        let parts: Vec<&str> = path.split('.').collect();
318
319        let mut current = json;
320        for part in parts {
321            if let Some(obj) = current.as_object() {
322                if let Some(value) = obj.get(part) {
323                    current = value;
324                } else {
325                    return false;
326                }
327            } else {
328                return false;
329            }
330        }
331        true
332    } else {
333        // For complex JSONPath expressions, would need a proper JSONPath library
334        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
335        false
336    }
337}
338
339/// Evaluate a custom matcher expression
340fn evaluate_custom_matcher(
341    expression: &str,
342    method: &str,
343    path: &str,
344    headers: &std::collections::HashMap<String, String>,
345    query_params: &std::collections::HashMap<String, String>,
346    body: Option<&[u8]>,
347) -> bool {
348    use regex::Regex;
349
350    let expr = expression.trim();
351
352    // Handle equality expressions (field == "value")
353    if expr.contains("==") {
354        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
355        if parts.len() != 2 {
356            return false;
357        }
358
359        let field = parts[0];
360        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
361
362        match field {
363            "method" => method == expected_value,
364            "path" => path == expected_value,
365            _ if field.starts_with("headers.") => {
366                let header_name = &field[8..];
367                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
368            }
369            _ if field.starts_with("query.") => {
370                let param_name = &field[6..];
371                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
372            }
373            _ => false,
374        }
375    }
376    // Handle regex match expressions (field =~ "pattern")
377    else if expr.contains("=~") {
378        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
379        if parts.len() != 2 {
380            return false;
381        }
382
383        let field = parts[0];
384        let pattern = parts[1].trim_matches('"').trim_matches('\'');
385
386        if let Ok(re) = Regex::new(pattern) {
387            match field {
388                "method" => re.is_match(method),
389                "path" => re.is_match(path),
390                _ if field.starts_with("headers.") => {
391                    let header_name = &field[8..];
392                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
393                }
394                _ if field.starts_with("query.") => {
395                    let param_name = &field[6..];
396                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
397                }
398                _ => false,
399            }
400        } else {
401            false
402        }
403    }
404    // Handle contains expressions (field contains "value")
405    else if expr.contains("contains") {
406        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
407        if parts.len() != 2 {
408            return false;
409        }
410
411        let field = parts[0];
412        let search_value = parts[1].trim_matches('"').trim_matches('\'');
413
414        match field {
415            "path" => path.contains(search_value),
416            _ if field.starts_with("headers.") => {
417                let header_name = &field[8..];
418                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
419            }
420            _ if field.starts_with("body") => {
421                if let Some(body_bytes) = body {
422                    let body_str = String::from_utf8_lossy(body_bytes);
423                    body_str.contains(search_value)
424                } else {
425                    false
426                }
427            }
428            _ => false,
429        }
430    } else {
431        // Unknown expression format
432        tracing::warn!("Unknown custom matcher expression format: {}", expr);
433        false
434    }
435}
436
437/// Server statistics
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ServerStats {
440    /// Server uptime in seconds
441    pub uptime_seconds: u64,
442    /// Total number of requests processed
443    pub total_requests: u64,
444    /// Number of active mock configurations
445    pub active_mocks: usize,
446    /// Number of currently enabled mocks
447    pub enabled_mocks: usize,
448    /// Number of registered API routes
449    pub registered_routes: usize,
450}
451
452/// Server configuration info
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ServerConfig {
455    /// MockForge version string
456    pub version: String,
457    /// Server port number
458    pub port: u16,
459    /// Whether an OpenAPI spec is loaded
460    pub has_openapi_spec: bool,
461    /// Optional path to the OpenAPI spec file
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub spec_path: Option<String>,
464}
465
466/// Shared state for the management API
467#[derive(Clone)]
468pub struct ManagementState {
469    /// Collection of mock configurations
470    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
471    /// Optional OpenAPI specification
472    pub spec: Option<Arc<OpenApiSpec>>,
473    /// Optional path to the OpenAPI spec file
474    pub spec_path: Option<String>,
475    /// Server port number
476    pub port: u16,
477    /// Server start time for uptime calculation
478    pub start_time: std::time::Instant,
479    /// Counter for total requests processed
480    pub request_counter: Arc<RwLock<u64>>,
481    /// Optional proxy configuration for migration pipeline
482    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
483    /// Optional SMTP registry for email mocking
484    #[cfg(feature = "smtp")]
485    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
486    /// Optional MQTT broker for message mocking
487    #[cfg(feature = "mqtt")]
488    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
489    /// Optional Kafka broker for event streaming
490    #[cfg(feature = "kafka")]
491    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
492    /// Broadcast channel for message events (MQTT & Kafka)
493    #[cfg(any(feature = "mqtt", feature = "kafka"))]
494    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
495    /// State machine manager for scenario state machines
496    pub state_machine_manager:
497        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
498    /// Optional WebSocket broadcast channel for real-time updates
499    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
500    /// Lifecycle hook registry for extensibility
501    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
502    /// Rule explanations storage (in-memory for now)
503    pub rule_explanations: Arc<
504        RwLock<
505            std::collections::HashMap<
506                String,
507                mockforge_core::intelligent_behavior::RuleExplanation,
508            >,
509        >,
510    >,
511    /// Optional chaos API state for chaos config management
512    #[cfg(feature = "chaos")]
513    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
514    /// Optional server configuration for profile application
515    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
516}
517
518impl ManagementState {
519    /// Create a new management state
520    ///
521    /// # Arguments
522    /// * `spec` - Optional OpenAPI specification
523    /// * `spec_path` - Optional path to the OpenAPI spec file
524    /// * `port` - Server port number
525    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
526        Self {
527            mocks: Arc::new(RwLock::new(Vec::new())),
528            spec,
529            spec_path,
530            port,
531            start_time: std::time::Instant::now(),
532            request_counter: Arc::new(RwLock::new(0)),
533            proxy_config: None,
534            #[cfg(feature = "smtp")]
535            smtp_registry: None,
536            #[cfg(feature = "mqtt")]
537            mqtt_broker: None,
538            #[cfg(feature = "kafka")]
539            kafka_broker: None,
540            #[cfg(any(feature = "mqtt", feature = "kafka"))]
541            message_events: {
542                let (tx, _) = broadcast::channel(1000);
543                Arc::new(tx)
544            },
545            state_machine_manager: Arc::new(RwLock::new(
546                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
547            )),
548            ws_broadcast: None,
549            lifecycle_hooks: None,
550            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
551            #[cfg(feature = "chaos")]
552            chaos_api_state: None,
553            server_config: None,
554        }
555    }
556
557    /// Add lifecycle hook registry to management state
558    pub fn with_lifecycle_hooks(
559        mut self,
560        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
561    ) -> Self {
562        self.lifecycle_hooks = Some(hooks);
563        self
564    }
565
566    /// Add WebSocket broadcast channel to management state
567    pub fn with_ws_broadcast(
568        mut self,
569        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
570    ) -> Self {
571        self.ws_broadcast = Some(ws_broadcast);
572        self
573    }
574
575    /// Add proxy configuration to management state
576    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
577        self.proxy_config = Some(proxy_config);
578        self
579    }
580
581    #[cfg(feature = "smtp")]
582    /// Add SMTP registry to management state
583    pub fn with_smtp_registry(
584        mut self,
585        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
586    ) -> Self {
587        self.smtp_registry = Some(smtp_registry);
588        self
589    }
590
591    #[cfg(feature = "mqtt")]
592    /// Add MQTT broker to management state
593    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
594        self.mqtt_broker = Some(mqtt_broker);
595        self
596    }
597
598    #[cfg(feature = "kafka")]
599    /// Add Kafka broker to management state
600    pub fn with_kafka_broker(
601        mut self,
602        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
603    ) -> Self {
604        self.kafka_broker = Some(kafka_broker);
605        self
606    }
607
608    #[cfg(feature = "chaos")]
609    /// Add chaos API state to management state
610    pub fn with_chaos_api_state(
611        mut self,
612        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
613    ) -> Self {
614        self.chaos_api_state = Some(chaos_api_state);
615        self
616    }
617
618    /// Add server configuration to management state
619    pub fn with_server_config(
620        mut self,
621        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
622    ) -> Self {
623        self.server_config = Some(server_config);
624        self
625    }
626}
627
628/// List all mocks
629async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
630    let mocks = state.mocks.read().await;
631    Json(serde_json::json!({
632        "mocks": *mocks,
633        "total": mocks.len(),
634        "enabled": mocks.iter().filter(|m| m.enabled).count()
635    }))
636}
637
638/// Get a specific mock by ID
639async fn get_mock(
640    State(state): State<ManagementState>,
641    Path(id): Path<String>,
642) -> Result<Json<MockConfig>, StatusCode> {
643    let mocks = state.mocks.read().await;
644    mocks
645        .iter()
646        .find(|m| m.id == id)
647        .cloned()
648        .map(Json)
649        .ok_or(StatusCode::NOT_FOUND)
650}
651
652/// Create a new mock
653async fn create_mock(
654    State(state): State<ManagementState>,
655    Json(mut mock): Json<MockConfig>,
656) -> Result<Json<MockConfig>, StatusCode> {
657    let mut mocks = state.mocks.write().await;
658
659    // Generate ID if not provided
660    if mock.id.is_empty() {
661        mock.id = uuid::Uuid::new_v4().to_string();
662    }
663
664    // Check for duplicate ID
665    if mocks.iter().any(|m| m.id == mock.id) {
666        return Err(StatusCode::CONFLICT);
667    }
668
669    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
670
671    // Invoke lifecycle hooks
672    if let Some(hooks) = &state.lifecycle_hooks {
673        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
674            id: mock.id.clone(),
675            name: mock.name.clone(),
676            config: serde_json::to_value(&mock).unwrap_or_default(),
677        };
678        hooks.invoke_mock_created(&event).await;
679    }
680
681    mocks.push(mock.clone());
682
683    // Broadcast WebSocket event
684    if let Some(tx) = &state.ws_broadcast {
685        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
686    }
687
688    Ok(Json(mock))
689}
690
691/// Update an existing mock
692async fn update_mock(
693    State(state): State<ManagementState>,
694    Path(id): Path<String>,
695    Json(updated_mock): Json<MockConfig>,
696) -> Result<Json<MockConfig>, StatusCode> {
697    let mut mocks = state.mocks.write().await;
698
699    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
700
701    // Get old mock for comparison
702    let old_mock = mocks[position].clone();
703
704    info!("Updating mock: {}", id);
705    mocks[position] = updated_mock.clone();
706
707    // Invoke lifecycle hooks
708    if let Some(hooks) = &state.lifecycle_hooks {
709        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
710            id: updated_mock.id.clone(),
711            name: updated_mock.name.clone(),
712            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
713        };
714        hooks.invoke_mock_updated(&event).await;
715
716        // Check if enabled state changed
717        if old_mock.enabled != updated_mock.enabled {
718            let state_event = if updated_mock.enabled {
719                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
720                    id: updated_mock.id.clone(),
721                }
722            } else {
723                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
724                    id: updated_mock.id.clone(),
725                }
726            };
727            hooks.invoke_mock_state_changed(&state_event).await;
728        }
729    }
730
731    // Broadcast WebSocket event
732    if let Some(tx) = &state.ws_broadcast {
733        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
734    }
735
736    Ok(Json(updated_mock))
737}
738
739/// Delete a mock
740async fn delete_mock(
741    State(state): State<ManagementState>,
742    Path(id): Path<String>,
743) -> Result<StatusCode, StatusCode> {
744    let mut mocks = state.mocks.write().await;
745
746    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
747
748    // Get mock info before deletion for lifecycle hooks
749    let deleted_mock = mocks[position].clone();
750
751    info!("Deleting mock: {}", id);
752    mocks.remove(position);
753
754    // Invoke lifecycle hooks
755    if let Some(hooks) = &state.lifecycle_hooks {
756        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
757            id: deleted_mock.id.clone(),
758            name: deleted_mock.name.clone(),
759        };
760        hooks.invoke_mock_deleted(&event).await;
761    }
762
763    // Broadcast WebSocket event
764    if let Some(tx) = &state.ws_broadcast {
765        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
766    }
767
768    Ok(StatusCode::NO_CONTENT)
769}
770
771/// Request to validate configuration
772#[derive(Debug, Deserialize)]
773pub struct ValidateConfigRequest {
774    /// Configuration to validate (as JSON)
775    pub config: serde_json::Value,
776    /// Format of the configuration ("json" or "yaml")
777    #[serde(default = "default_format")]
778    pub format: String,
779}
780
781fn default_format() -> String {
782    "json".to_string()
783}
784
785/// Validate configuration without applying it
786async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
787    use mockforge_core::config::ServerConfig;
788
789    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
790        "yaml" | "yml" => {
791            let yaml_str = match serde_json::to_string(&request.config) {
792                Ok(s) => s,
793                Err(e) => {
794                    return (
795                        StatusCode::BAD_REQUEST,
796                        Json(serde_json::json!({
797                            "valid": false,
798                            "error": format!("Failed to convert to string: {}", e),
799                            "message": "Configuration validation failed"
800                        })),
801                    )
802                        .into_response();
803                }
804            };
805            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
806        }
807        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
808    };
809
810    match config_result {
811        Ok(_) => Json(serde_json::json!({
812            "valid": true,
813            "message": "Configuration is valid"
814        }))
815        .into_response(),
816        Err(e) => (
817            StatusCode::BAD_REQUEST,
818            Json(serde_json::json!({
819                "valid": false,
820                "error": format!("Invalid configuration: {}", e),
821                "message": "Configuration validation failed"
822            })),
823        )
824            .into_response(),
825    }
826}
827
828/// Request for bulk configuration update
829#[derive(Debug, Deserialize)]
830pub struct BulkConfigUpdateRequest {
831    /// Partial configuration updates (only specified fields will be updated)
832    pub updates: serde_json::Value,
833}
834
835/// Bulk update configuration
836///
837/// This endpoint allows updating multiple configuration options at once.
838/// Only the specified fields in the updates object will be modified.
839///
840/// Configuration updates are applied to the server configuration if available
841/// in ManagementState. Changes take effect immediately for supported settings.
842async fn bulk_update_config(
843    State(state): State<ManagementState>,
844    Json(request): Json<BulkConfigUpdateRequest>,
845) -> impl IntoResponse {
846    // Validate the updates structure
847    if !request.updates.is_object() {
848        return (
849            StatusCode::BAD_REQUEST,
850            Json(serde_json::json!({
851                "error": "Invalid request",
852                "message": "Updates must be a JSON object"
853            })),
854        )
855            .into_response();
856    }
857
858    // Try to validate as partial ServerConfig
859    use mockforge_core::config::ServerConfig;
860
861    // Create a minimal valid config and try to merge updates
862    let base_config = ServerConfig::default();
863    let base_json = match serde_json::to_value(&base_config) {
864        Ok(v) => v,
865        Err(e) => {
866            return (
867                StatusCode::INTERNAL_SERVER_ERROR,
868                Json(serde_json::json!({
869                    "error": "Internal error",
870                    "message": format!("Failed to serialize base config: {}", e)
871                })),
872            )
873                .into_response();
874        }
875    };
876
877    // Merge updates into base config (simplified merge)
878    let mut merged = base_json.clone();
879    if let (Some(merged_obj), Some(updates_obj)) =
880        (merged.as_object_mut(), request.updates.as_object())
881    {
882        for (key, value) in updates_obj {
883            merged_obj.insert(key.clone(), value.clone());
884        }
885    }
886
887    // Validate the merged config
888    match serde_json::from_value::<ServerConfig>(merged) {
889        Ok(_) => {
890            // Config is valid
891            // Note: Runtime application of config changes would require:
892            // 1. Storing ServerConfig in ManagementState
893            // 2. Implementing hot-reload mechanism for server configuration
894            // 3. Updating router state and middleware based on new config
895            // For now, this endpoint only validates the configuration structure
896            Json(serde_json::json!({
897                "success": true,
898                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
899                "updates_received": request.updates,
900                "validated": true
901            }))
902            .into_response()
903        }
904        Err(e) => (
905            StatusCode::BAD_REQUEST,
906            Json(serde_json::json!({
907                "error": "Invalid configuration",
908                "message": format!("Configuration validation failed: {}", e),
909                "validated": false
910            })),
911        )
912            .into_response(),
913    }
914}
915
916/// Get server statistics
917async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
918    let mocks = state.mocks.read().await;
919    let request_count = *state.request_counter.read().await;
920
921    Json(ServerStats {
922        uptime_seconds: state.start_time.elapsed().as_secs(),
923        total_requests: request_count,
924        active_mocks: mocks.len(),
925        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
926        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
927    })
928}
929
930/// Get server configuration
931async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
932    Json(ServerConfig {
933        version: env!("CARGO_PKG_VERSION").to_string(),
934        port: state.port,
935        has_openapi_spec: state.spec.is_some(),
936        spec_path: state.spec_path.clone(),
937    })
938}
939
940/// Health check endpoint
941async fn health_check() -> Json<serde_json::Value> {
942    Json(serde_json::json!({
943        "status": "healthy",
944        "service": "mockforge-management",
945        "timestamp": chrono::Utc::now().to_rfc3339()
946    }))
947}
948
949/// Export format for mock configurations
950#[derive(Debug, Clone, Serialize, Deserialize)]
951#[serde(rename_all = "lowercase")]
952pub enum ExportFormat {
953    /// JSON format
954    Json,
955    /// YAML format
956    Yaml,
957}
958
959/// Export mocks in specified format
960async fn export_mocks(
961    State(state): State<ManagementState>,
962    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
963) -> Result<(StatusCode, String), StatusCode> {
964    let mocks = state.mocks.read().await;
965
966    let format = params
967        .get("format")
968        .map(|f| match f.as_str() {
969            "yaml" | "yml" => ExportFormat::Yaml,
970            _ => ExportFormat::Json,
971        })
972        .unwrap_or(ExportFormat::Json);
973
974    match format {
975        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
976            .map(|json| (StatusCode::OK, json))
977            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
979            .map(|yaml| (StatusCode::OK, yaml))
980            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
981    }
982}
983
984/// Import mocks from JSON/YAML
985async fn import_mocks(
986    State(state): State<ManagementState>,
987    Json(mocks): Json<Vec<MockConfig>>,
988) -> impl IntoResponse {
989    let mut current_mocks = state.mocks.write().await;
990    current_mocks.clear();
991    current_mocks.extend(mocks);
992    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
993}
994
995#[cfg(feature = "smtp")]
996/// List SMTP emails in mailbox
997async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
998    if let Some(ref smtp_registry) = state.smtp_registry {
999        match smtp_registry.get_emails() {
1000            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1001            Err(e) => (
1002                StatusCode::INTERNAL_SERVER_ERROR,
1003                Json(serde_json::json!({
1004                    "error": "Failed to retrieve emails",
1005                    "message": e.to_string()
1006                })),
1007            ),
1008        }
1009    } else {
1010        (
1011            StatusCode::NOT_IMPLEMENTED,
1012            Json(serde_json::json!({
1013                "error": "SMTP mailbox management not available",
1014                "message": "SMTP server is not enabled or registry not available."
1015            })),
1016        )
1017    }
1018}
1019
1020/// Get specific SMTP email
1021#[cfg(feature = "smtp")]
1022async fn get_smtp_email(
1023    State(state): State<ManagementState>,
1024    Path(id): Path<String>,
1025) -> impl IntoResponse {
1026    if let Some(ref smtp_registry) = state.smtp_registry {
1027        match smtp_registry.get_email_by_id(&id) {
1028            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1029            Ok(None) => (
1030                StatusCode::NOT_FOUND,
1031                Json(serde_json::json!({
1032                    "error": "Email not found",
1033                    "id": id
1034                })),
1035            ),
1036            Err(e) => (
1037                StatusCode::INTERNAL_SERVER_ERROR,
1038                Json(serde_json::json!({
1039                    "error": "Failed to retrieve email",
1040                    "message": e.to_string()
1041                })),
1042            ),
1043        }
1044    } else {
1045        (
1046            StatusCode::NOT_IMPLEMENTED,
1047            Json(serde_json::json!({
1048                "error": "SMTP mailbox management not available",
1049                "message": "SMTP server is not enabled or registry not available."
1050            })),
1051        )
1052    }
1053}
1054
1055/// Clear SMTP mailbox
1056#[cfg(feature = "smtp")]
1057async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1058    if let Some(ref smtp_registry) = state.smtp_registry {
1059        match smtp_registry.clear_mailbox() {
1060            Ok(()) => (
1061                StatusCode::OK,
1062                Json(serde_json::json!({
1063                    "message": "Mailbox cleared successfully"
1064                })),
1065            ),
1066            Err(e) => (
1067                StatusCode::INTERNAL_SERVER_ERROR,
1068                Json(serde_json::json!({
1069                    "error": "Failed to clear mailbox",
1070                    "message": e.to_string()
1071                })),
1072            ),
1073        }
1074    } else {
1075        (
1076            StatusCode::NOT_IMPLEMENTED,
1077            Json(serde_json::json!({
1078                "error": "SMTP mailbox management not available",
1079                "message": "SMTP server is not enabled or registry not available."
1080            })),
1081        )
1082    }
1083}
1084
1085/// Export SMTP mailbox
1086#[cfg(feature = "smtp")]
1087async fn export_smtp_mailbox(
1088    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1089) -> impl IntoResponse {
1090    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1091    (
1092        StatusCode::NOT_IMPLEMENTED,
1093        Json(serde_json::json!({
1094            "error": "SMTP mailbox management not available via HTTP API",
1095            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1096            "requested_format": format
1097        })),
1098    )
1099}
1100
1101/// Search SMTP emails
1102#[cfg(feature = "smtp")]
1103async fn search_smtp_emails(
1104    State(state): State<ManagementState>,
1105    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1106) -> impl IntoResponse {
1107    if let Some(ref smtp_registry) = state.smtp_registry {
1108        let filters = EmailSearchFilters {
1109            sender: params.get("sender").cloned(),
1110            recipient: params.get("recipient").cloned(),
1111            subject: params.get("subject").cloned(),
1112            body: params.get("body").cloned(),
1113            since: params
1114                .get("since")
1115                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1116                .map(|dt| dt.with_timezone(&chrono::Utc)),
1117            until: params
1118                .get("until")
1119                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1120                .map(|dt| dt.with_timezone(&chrono::Utc)),
1121            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1122            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1123        };
1124
1125        match smtp_registry.search_emails(filters) {
1126            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1127            Err(e) => (
1128                StatusCode::INTERNAL_SERVER_ERROR,
1129                Json(serde_json::json!({
1130                    "error": "Failed to search emails",
1131                    "message": e.to_string()
1132                })),
1133            ),
1134        }
1135    } else {
1136        (
1137            StatusCode::NOT_IMPLEMENTED,
1138            Json(serde_json::json!({
1139                "error": "SMTP mailbox management not available",
1140                "message": "SMTP server is not enabled or registry not available."
1141            })),
1142        )
1143    }
1144}
1145
1146/// MQTT broker statistics
1147#[cfg(feature = "mqtt")]
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1149pub struct MqttBrokerStats {
1150    /// Number of connected MQTT clients
1151    pub connected_clients: usize,
1152    /// Number of active MQTT topics
1153    pub active_topics: usize,
1154    /// Number of retained messages
1155    pub retained_messages: usize,
1156    /// Total number of subscriptions
1157    pub total_subscriptions: usize,
1158}
1159
1160/// MQTT management handlers
1161#[cfg(feature = "mqtt")]
1162async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1163    if let Some(broker) = &state.mqtt_broker {
1164        let connected_clients = broker.get_connected_clients().await.len();
1165        let active_topics = broker.get_active_topics().await.len();
1166        let stats = broker.get_topic_stats().await;
1167
1168        let broker_stats = MqttBrokerStats {
1169            connected_clients,
1170            active_topics,
1171            retained_messages: stats.retained_messages,
1172            total_subscriptions: stats.total_subscriptions,
1173        };
1174
1175        Json(broker_stats).into_response()
1176    } else {
1177        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1178    }
1179}
1180
1181#[cfg(feature = "mqtt")]
1182async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1183    if let Some(broker) = &state.mqtt_broker {
1184        let clients = broker.get_connected_clients().await;
1185        Json(serde_json::json!({
1186            "clients": clients
1187        }))
1188        .into_response()
1189    } else {
1190        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191    }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1196    if let Some(broker) = &state.mqtt_broker {
1197        let topics = broker.get_active_topics().await;
1198        Json(serde_json::json!({
1199            "topics": topics
1200        }))
1201        .into_response()
1202    } else {
1203        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204    }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn disconnect_mqtt_client(
1209    State(state): State<ManagementState>,
1210    Path(client_id): Path<String>,
1211) -> impl IntoResponse {
1212    if let Some(broker) = &state.mqtt_broker {
1213        match broker.disconnect_client(&client_id).await {
1214            Ok(_) => {
1215                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1216            }
1217            Err(e) => {
1218                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1219                    .into_response()
1220            }
1221        }
1222    } else {
1223        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1224    }
1225}
1226
1227// ========== MQTT Publish Handler ==========
1228
1229#[cfg(feature = "mqtt")]
1230/// Request to publish a single MQTT message
1231#[derive(Debug, Deserialize)]
1232pub struct MqttPublishRequest {
1233    /// Topic to publish to
1234    pub topic: String,
1235    /// Message payload (string or JSON)
1236    pub payload: String,
1237    /// QoS level (0, 1, or 2)
1238    #[serde(default = "default_qos")]
1239    pub qos: u8,
1240    /// Whether to retain the message
1241    #[serde(default)]
1242    pub retain: bool,
1243}
1244
1245#[cfg(feature = "mqtt")]
1246fn default_qos() -> u8 {
1247    0
1248}
1249
1250#[cfg(feature = "mqtt")]
1251/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1252async fn publish_mqtt_message_handler(
1253    State(state): State<ManagementState>,
1254    Json(request): Json<serde_json::Value>,
1255) -> impl IntoResponse {
1256    // Extract fields from JSON manually
1257    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1258    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1259    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1260    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1261
1262    if topic.is_none() || payload.is_none() {
1263        return (
1264            StatusCode::BAD_REQUEST,
1265            Json(serde_json::json!({
1266                "error": "Invalid request",
1267                "message": "Missing required fields: topic and payload"
1268            })),
1269        );
1270    }
1271
1272    let topic = topic.unwrap();
1273    let payload = payload.unwrap();
1274
1275    if let Some(broker) = &state.mqtt_broker {
1276        // Validate QoS
1277        if qos > 2 {
1278            return (
1279                StatusCode::BAD_REQUEST,
1280                Json(serde_json::json!({
1281                    "error": "Invalid QoS",
1282                    "message": "QoS must be 0, 1, or 2"
1283                })),
1284            );
1285        }
1286
1287        // Convert payload to bytes
1288        let payload_bytes = payload.as_bytes().to_vec();
1289        let client_id = "mockforge-management-api".to_string();
1290
1291        let publish_result = broker
1292            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1293            .await
1294            .map_err(|e| format!("{}", e));
1295
1296        match publish_result {
1297            Ok(_) => {
1298                // Emit message event for real-time monitoring
1299                let event = MessageEvent::Mqtt(MqttMessageEvent {
1300                    topic: topic.clone(),
1301                    payload: payload.clone(),
1302                    qos,
1303                    retain,
1304                    timestamp: chrono::Utc::now().to_rfc3339(),
1305                });
1306                let _ = state.message_events.send(event);
1307
1308                (
1309                    StatusCode::OK,
1310                    Json(serde_json::json!({
1311                        "success": true,
1312                        "message": format!("Message published to topic '{}'", topic),
1313                        "topic": topic,
1314                        "qos": qos,
1315                        "retain": retain
1316                    })),
1317                )
1318            }
1319            Err(error_msg) => (
1320                StatusCode::INTERNAL_SERVER_ERROR,
1321                Json(serde_json::json!({
1322                    "error": "Failed to publish message",
1323                    "message": error_msg
1324                })),
1325            ),
1326        }
1327    } else {
1328        (
1329            StatusCode::SERVICE_UNAVAILABLE,
1330            Json(serde_json::json!({
1331                "error": "MQTT broker not available",
1332                "message": "MQTT broker is not enabled or not available."
1333            })),
1334        )
1335    }
1336}
1337
1338#[cfg(not(feature = "mqtt"))]
1339/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1340async fn publish_mqtt_message_handler(
1341    State(_state): State<ManagementState>,
1342    Json(_request): Json<serde_json::Value>,
1343) -> impl IntoResponse {
1344    (
1345        StatusCode::SERVICE_UNAVAILABLE,
1346        Json(serde_json::json!({
1347            "error": "MQTT feature not enabled",
1348            "message": "MQTT support is not compiled into this build"
1349        })),
1350    )
1351}
1352
1353#[cfg(feature = "mqtt")]
1354/// Request to publish multiple MQTT messages
1355#[derive(Debug, Deserialize)]
1356pub struct MqttBatchPublishRequest {
1357    /// List of messages to publish
1358    pub messages: Vec<MqttPublishRequest>,
1359    /// Delay between messages in milliseconds
1360    #[serde(default = "default_delay")]
1361    pub delay_ms: u64,
1362}
1363
1364#[cfg(feature = "mqtt")]
1365fn default_delay() -> u64 {
1366    100
1367}
1368
1369#[cfg(feature = "mqtt")]
1370/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1371async fn publish_mqtt_batch_handler(
1372    State(state): State<ManagementState>,
1373    Json(request): Json<serde_json::Value>,
1374) -> impl IntoResponse {
1375    // Extract fields from JSON manually
1376    let messages_json = request.get("messages").and_then(|v| v.as_array());
1377    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1378
1379    if messages_json.is_none() {
1380        return (
1381            StatusCode::BAD_REQUEST,
1382            Json(serde_json::json!({
1383                "error": "Invalid request",
1384                "message": "Missing required field: messages"
1385            })),
1386        );
1387    }
1388
1389    let messages_json = messages_json.unwrap();
1390
1391    if let Some(broker) = &state.mqtt_broker {
1392        if messages_json.is_empty() {
1393            return (
1394                StatusCode::BAD_REQUEST,
1395                Json(serde_json::json!({
1396                    "error": "Empty batch",
1397                    "message": "At least one message is required"
1398                })),
1399            );
1400        }
1401
1402        let mut results = Vec::new();
1403        let client_id = "mockforge-management-api".to_string();
1404
1405        for (index, msg_json) in messages_json.iter().enumerate() {
1406            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1407            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1408            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1409            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1410
1411            if topic.is_none() || payload.is_none() {
1412                results.push(serde_json::json!({
1413                    "index": index,
1414                    "success": false,
1415                    "error": "Missing required fields: topic and payload"
1416                }));
1417                continue;
1418            }
1419
1420            let topic = topic.unwrap();
1421            let payload = payload.unwrap();
1422
1423            // Validate QoS
1424            if qos > 2 {
1425                results.push(serde_json::json!({
1426                    "index": index,
1427                    "success": false,
1428                    "error": "Invalid QoS (must be 0, 1, or 2)"
1429                }));
1430                continue;
1431            }
1432
1433            // Convert payload to bytes
1434            let payload_bytes = payload.as_bytes().to_vec();
1435
1436            let publish_result = broker
1437                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1438                .await
1439                .map_err(|e| format!("{}", e));
1440
1441            match publish_result {
1442                Ok(_) => {
1443                    // Emit message event
1444                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1445                        topic: topic.clone(),
1446                        payload: payload.clone(),
1447                        qos,
1448                        retain,
1449                        timestamp: chrono::Utc::now().to_rfc3339(),
1450                    });
1451                    let _ = state.message_events.send(event);
1452
1453                    results.push(serde_json::json!({
1454                        "index": index,
1455                        "success": true,
1456                        "topic": topic,
1457                        "qos": qos
1458                    }));
1459                }
1460                Err(error_msg) => {
1461                    results.push(serde_json::json!({
1462                        "index": index,
1463                        "success": false,
1464                        "error": error_msg
1465                    }));
1466                }
1467            }
1468
1469            // Add delay between messages (except for the last one)
1470            if index < messages_json.len() - 1 && delay_ms > 0 {
1471                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1472            }
1473        }
1474
1475        let success_count =
1476            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1477
1478        (
1479            StatusCode::OK,
1480            Json(serde_json::json!({
1481                "success": true,
1482                "total": messages_json.len(),
1483                "succeeded": success_count,
1484                "failed": messages_json.len() - success_count,
1485                "results": results
1486            })),
1487        )
1488    } else {
1489        (
1490            StatusCode::SERVICE_UNAVAILABLE,
1491            Json(serde_json::json!({
1492                "error": "MQTT broker not available",
1493                "message": "MQTT broker is not enabled or not available."
1494            })),
1495        )
1496    }
1497}
1498
1499#[cfg(not(feature = "mqtt"))]
1500/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1501async fn publish_mqtt_batch_handler(
1502    State(_state): State<ManagementState>,
1503    Json(_request): Json<serde_json::Value>,
1504) -> impl IntoResponse {
1505    (
1506        StatusCode::SERVICE_UNAVAILABLE,
1507        Json(serde_json::json!({
1508            "error": "MQTT feature not enabled",
1509            "message": "MQTT support is not compiled into this build"
1510        })),
1511    )
1512}
1513
1514// Migration pipeline handlers
1515
1516/// Request to set migration mode
1517#[derive(Debug, Deserialize)]
1518struct SetMigrationModeRequest {
1519    mode: String,
1520}
1521
1522/// Get all migration routes
1523async fn get_migration_routes(
1524    State(state): State<ManagementState>,
1525) -> Result<Json<serde_json::Value>, StatusCode> {
1526    let proxy_config = match &state.proxy_config {
1527        Some(config) => config,
1528        None => {
1529            return Ok(Json(serde_json::json!({
1530                "error": "Migration not configured. Proxy config not available."
1531            })));
1532        }
1533    };
1534
1535    let config = proxy_config.read().await;
1536    let routes = config.get_migration_routes();
1537
1538    Ok(Json(serde_json::json!({
1539        "routes": routes
1540    })))
1541}
1542
1543/// Toggle a route's migration mode
1544async fn toggle_route_migration(
1545    State(state): State<ManagementState>,
1546    Path(pattern): Path<String>,
1547) -> Result<Json<serde_json::Value>, StatusCode> {
1548    let proxy_config = match &state.proxy_config {
1549        Some(config) => config,
1550        None => {
1551            return Ok(Json(serde_json::json!({
1552                "error": "Migration not configured. Proxy config not available."
1553            })));
1554        }
1555    };
1556
1557    let mut config = proxy_config.write().await;
1558    let new_mode = match config.toggle_route_migration(&pattern) {
1559        Some(mode) => mode,
1560        None => {
1561            return Ok(Json(serde_json::json!({
1562                "error": format!("Route pattern not found: {}", pattern)
1563            })));
1564        }
1565    };
1566
1567    Ok(Json(serde_json::json!({
1568        "pattern": pattern,
1569        "mode": format!("{:?}", new_mode).to_lowercase()
1570    })))
1571}
1572
1573/// Set a route's migration mode explicitly
1574async fn set_route_migration_mode(
1575    State(state): State<ManagementState>,
1576    Path(pattern): Path<String>,
1577    Json(request): Json<SetMigrationModeRequest>,
1578) -> Result<Json<serde_json::Value>, StatusCode> {
1579    let proxy_config = match &state.proxy_config {
1580        Some(config) => config,
1581        None => {
1582            return Ok(Json(serde_json::json!({
1583                "error": "Migration not configured. Proxy config not available."
1584            })));
1585        }
1586    };
1587
1588    use mockforge_core::proxy::config::MigrationMode;
1589    let mode = match request.mode.to_lowercase().as_str() {
1590        "mock" => MigrationMode::Mock,
1591        "shadow" => MigrationMode::Shadow,
1592        "real" => MigrationMode::Real,
1593        "auto" => MigrationMode::Auto,
1594        _ => {
1595            return Ok(Json(serde_json::json!({
1596                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1597            })));
1598        }
1599    };
1600
1601    let mut config = proxy_config.write().await;
1602    let updated = config.update_rule_migration_mode(&pattern, mode);
1603
1604    if !updated {
1605        return Ok(Json(serde_json::json!({
1606            "error": format!("Route pattern not found: {}", pattern)
1607        })));
1608    }
1609
1610    Ok(Json(serde_json::json!({
1611        "pattern": pattern,
1612        "mode": format!("{:?}", mode).to_lowercase()
1613    })))
1614}
1615
1616/// Toggle a group's migration mode
1617async fn toggle_group_migration(
1618    State(state): State<ManagementState>,
1619    Path(group): Path<String>,
1620) -> Result<Json<serde_json::Value>, StatusCode> {
1621    let proxy_config = match &state.proxy_config {
1622        Some(config) => config,
1623        None => {
1624            return Ok(Json(serde_json::json!({
1625                "error": "Migration not configured. Proxy config not available."
1626            })));
1627        }
1628    };
1629
1630    let mut config = proxy_config.write().await;
1631    let new_mode = config.toggle_group_migration(&group);
1632
1633    Ok(Json(serde_json::json!({
1634        "group": group,
1635        "mode": format!("{:?}", new_mode).to_lowercase()
1636    })))
1637}
1638
1639/// Set a group's migration mode explicitly
1640async fn set_group_migration_mode(
1641    State(state): State<ManagementState>,
1642    Path(group): Path<String>,
1643    Json(request): Json<SetMigrationModeRequest>,
1644) -> Result<Json<serde_json::Value>, StatusCode> {
1645    let proxy_config = match &state.proxy_config {
1646        Some(config) => config,
1647        None => {
1648            return Ok(Json(serde_json::json!({
1649                "error": "Migration not configured. Proxy config not available."
1650            })));
1651        }
1652    };
1653
1654    use mockforge_core::proxy::config::MigrationMode;
1655    let mode = match request.mode.to_lowercase().as_str() {
1656        "mock" => MigrationMode::Mock,
1657        "shadow" => MigrationMode::Shadow,
1658        "real" => MigrationMode::Real,
1659        "auto" => MigrationMode::Auto,
1660        _ => {
1661            return Ok(Json(serde_json::json!({
1662                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1663            })));
1664        }
1665    };
1666
1667    let mut config = proxy_config.write().await;
1668    config.update_group_migration_mode(&group, mode);
1669
1670    Ok(Json(serde_json::json!({
1671        "group": group,
1672        "mode": format!("{:?}", mode).to_lowercase()
1673    })))
1674}
1675
1676/// Get all migration groups
1677async fn get_migration_groups(
1678    State(state): State<ManagementState>,
1679) -> Result<Json<serde_json::Value>, StatusCode> {
1680    let proxy_config = match &state.proxy_config {
1681        Some(config) => config,
1682        None => {
1683            return Ok(Json(serde_json::json!({
1684                "error": "Migration not configured. Proxy config not available."
1685            })));
1686        }
1687    };
1688
1689    let config = proxy_config.read().await;
1690    let groups = config.get_migration_groups();
1691
1692    // Convert to JSON-serializable format
1693    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1694        .into_iter()
1695        .map(|(name, info)| {
1696            (
1697                name,
1698                serde_json::json!({
1699                    "name": info.name,
1700                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1701                    "route_count": info.route_count
1702                }),
1703            )
1704        })
1705        .collect();
1706
1707    Ok(Json(serde_json::json!(groups_json)))
1708}
1709
1710/// Get overall migration status
1711async fn get_migration_status(
1712    State(state): State<ManagementState>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714    let proxy_config = match &state.proxy_config {
1715        Some(config) => config,
1716        None => {
1717            return Ok(Json(serde_json::json!({
1718                "error": "Migration not configured. Proxy config not available."
1719            })));
1720        }
1721    };
1722
1723    let config = proxy_config.read().await;
1724    let routes = config.get_migration_routes();
1725    let groups = config.get_migration_groups();
1726
1727    let mut mock_count = 0;
1728    let mut shadow_count = 0;
1729    let mut real_count = 0;
1730    let mut auto_count = 0;
1731
1732    for route in &routes {
1733        match route.migration_mode {
1734            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1735            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1736            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1737            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1738        }
1739    }
1740
1741    Ok(Json(serde_json::json!({
1742        "total_routes": routes.len(),
1743        "mock_routes": mock_count,
1744        "shadow_routes": shadow_count,
1745        "real_routes": real_count,
1746        "auto_routes": auto_count,
1747        "total_groups": groups.len(),
1748        "migration_enabled": config.migration_enabled
1749    })))
1750}
1751
1752// ========== Proxy Replacement Rules Management ==========
1753
1754/// Request body for creating/updating proxy replacement rules
1755#[derive(Debug, Deserialize, Serialize)]
1756pub struct ProxyRuleRequest {
1757    /// URL pattern to match (supports wildcards like "/api/users/*")
1758    pub pattern: String,
1759    /// Rule type: "request" or "response"
1760    #[serde(rename = "type")]
1761    pub rule_type: String,
1762    /// Optional status code filter for response rules
1763    #[serde(default)]
1764    pub status_codes: Vec<u16>,
1765    /// Body transformations to apply
1766    pub body_transforms: Vec<BodyTransformRequest>,
1767    /// Whether this rule is enabled
1768    #[serde(default = "default_true")]
1769    pub enabled: bool,
1770}
1771
1772/// Request body for individual body transformations
1773#[derive(Debug, Deserialize, Serialize)]
1774pub struct BodyTransformRequest {
1775    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1776    pub path: String,
1777    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1778    pub replace: String,
1779    /// Operation to perform: "replace", "add", or "remove"
1780    #[serde(default)]
1781    pub operation: String,
1782}
1783
1784/// Response format for proxy rules
1785#[derive(Debug, Serialize)]
1786pub struct ProxyRuleResponse {
1787    /// Rule ID (index in the array)
1788    pub id: usize,
1789    /// URL pattern
1790    pub pattern: String,
1791    /// Rule type
1792    #[serde(rename = "type")]
1793    pub rule_type: String,
1794    /// Status codes (for response rules)
1795    pub status_codes: Vec<u16>,
1796    /// Body transformations
1797    pub body_transforms: Vec<BodyTransformRequest>,
1798    /// Whether enabled
1799    pub enabled: bool,
1800}
1801
1802/// List all proxy replacement rules
1803async fn list_proxy_rules(
1804    State(state): State<ManagementState>,
1805) -> Result<Json<serde_json::Value>, StatusCode> {
1806    let proxy_config = match &state.proxy_config {
1807        Some(config) => config,
1808        None => {
1809            return Ok(Json(serde_json::json!({
1810                "error": "Proxy not configured. Proxy config not available."
1811            })));
1812        }
1813    };
1814
1815    let config = proxy_config.read().await;
1816
1817    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1818
1819    // Add request replacement rules
1820    for (idx, rule) in config.request_replacements.iter().enumerate() {
1821        rules.push(ProxyRuleResponse {
1822            id: idx,
1823            pattern: rule.pattern.clone(),
1824            rule_type: "request".to_string(),
1825            status_codes: Vec::new(),
1826            body_transforms: rule
1827                .body_transforms
1828                .iter()
1829                .map(|t| BodyTransformRequest {
1830                    path: t.path.clone(),
1831                    replace: t.replace.clone(),
1832                    operation: format!("{:?}", t.operation).to_lowercase(),
1833                })
1834                .collect(),
1835            enabled: rule.enabled,
1836        });
1837    }
1838
1839    // Add response replacement rules
1840    let request_count = config.request_replacements.len();
1841    for (idx, rule) in config.response_replacements.iter().enumerate() {
1842        rules.push(ProxyRuleResponse {
1843            id: request_count + idx,
1844            pattern: rule.pattern.clone(),
1845            rule_type: "response".to_string(),
1846            status_codes: rule.status_codes.clone(),
1847            body_transforms: rule
1848                .body_transforms
1849                .iter()
1850                .map(|t| BodyTransformRequest {
1851                    path: t.path.clone(),
1852                    replace: t.replace.clone(),
1853                    operation: format!("{:?}", t.operation).to_lowercase(),
1854                })
1855                .collect(),
1856            enabled: rule.enabled,
1857        });
1858    }
1859
1860    Ok(Json(serde_json::json!({
1861        "rules": rules
1862    })))
1863}
1864
1865/// Create a new proxy replacement rule
1866async fn create_proxy_rule(
1867    State(state): State<ManagementState>,
1868    Json(request): Json<ProxyRuleRequest>,
1869) -> Result<Json<serde_json::Value>, StatusCode> {
1870    let proxy_config = match &state.proxy_config {
1871        Some(config) => config,
1872        None => {
1873            return Ok(Json(serde_json::json!({
1874                "error": "Proxy not configured. Proxy config not available."
1875            })));
1876        }
1877    };
1878
1879    // Validate request
1880    if request.body_transforms.is_empty() {
1881        return Ok(Json(serde_json::json!({
1882            "error": "At least one body transform is required"
1883        })));
1884    }
1885
1886    let body_transforms: Vec<BodyTransform> = request
1887        .body_transforms
1888        .iter()
1889        .map(|t| {
1890            let op = match t.operation.as_str() {
1891                "replace" => TransformOperation::Replace,
1892                "add" => TransformOperation::Add,
1893                "remove" => TransformOperation::Remove,
1894                _ => TransformOperation::Replace,
1895            };
1896            BodyTransform {
1897                path: t.path.clone(),
1898                replace: t.replace.clone(),
1899                operation: op,
1900            }
1901        })
1902        .collect();
1903
1904    let new_rule = BodyTransformRule {
1905        pattern: request.pattern.clone(),
1906        status_codes: request.status_codes.clone(),
1907        body_transforms,
1908        enabled: request.enabled,
1909    };
1910
1911    let mut config = proxy_config.write().await;
1912
1913    let rule_id = if request.rule_type == "request" {
1914        config.request_replacements.push(new_rule);
1915        config.request_replacements.len() - 1
1916    } else if request.rule_type == "response" {
1917        config.response_replacements.push(new_rule);
1918        config.request_replacements.len() + config.response_replacements.len() - 1
1919    } else {
1920        return Ok(Json(serde_json::json!({
1921            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1922        })));
1923    };
1924
1925    Ok(Json(serde_json::json!({
1926        "id": rule_id,
1927        "message": "Rule created successfully"
1928    })))
1929}
1930
1931/// Get a specific proxy replacement rule
1932async fn get_proxy_rule(
1933    State(state): State<ManagementState>,
1934    Path(id): Path<String>,
1935) -> Result<Json<serde_json::Value>, StatusCode> {
1936    let proxy_config = match &state.proxy_config {
1937        Some(config) => config,
1938        None => {
1939            return Ok(Json(serde_json::json!({
1940                "error": "Proxy not configured. Proxy config not available."
1941            })));
1942        }
1943    };
1944
1945    let config = proxy_config.read().await;
1946    let rule_id: usize = match id.parse() {
1947        Ok(id) => id,
1948        Err(_) => {
1949            return Ok(Json(serde_json::json!({
1950                "error": format!("Invalid rule ID: {}", id)
1951            })));
1952        }
1953    };
1954
1955    let request_count = config.request_replacements.len();
1956
1957    if rule_id < request_count {
1958        // Request rule
1959        let rule = &config.request_replacements[rule_id];
1960        Ok(Json(serde_json::json!({
1961            "id": rule_id,
1962            "pattern": rule.pattern,
1963            "type": "request",
1964            "status_codes": [],
1965            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1966                "path": t.path,
1967                "replace": t.replace,
1968                "operation": format!("{:?}", t.operation).to_lowercase()
1969            })).collect::<Vec<_>>(),
1970            "enabled": rule.enabled
1971        })))
1972    } else if rule_id < request_count + config.response_replacements.len() {
1973        // Response rule
1974        let response_idx = rule_id - request_count;
1975        let rule = &config.response_replacements[response_idx];
1976        Ok(Json(serde_json::json!({
1977            "id": rule_id,
1978            "pattern": rule.pattern,
1979            "type": "response",
1980            "status_codes": rule.status_codes,
1981            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1982                "path": t.path,
1983                "replace": t.replace,
1984                "operation": format!("{:?}", t.operation).to_lowercase()
1985            })).collect::<Vec<_>>(),
1986            "enabled": rule.enabled
1987        })))
1988    } else {
1989        Ok(Json(serde_json::json!({
1990            "error": format!("Rule ID {} not found", rule_id)
1991        })))
1992    }
1993}
1994
1995/// Update a proxy replacement rule
1996async fn update_proxy_rule(
1997    State(state): State<ManagementState>,
1998    Path(id): Path<String>,
1999    Json(request): Json<ProxyRuleRequest>,
2000) -> Result<Json<serde_json::Value>, StatusCode> {
2001    let proxy_config = match &state.proxy_config {
2002        Some(config) => config,
2003        None => {
2004            return Ok(Json(serde_json::json!({
2005                "error": "Proxy not configured. Proxy config not available."
2006            })));
2007        }
2008    };
2009
2010    let mut config = proxy_config.write().await;
2011    let rule_id: usize = match id.parse() {
2012        Ok(id) => id,
2013        Err(_) => {
2014            return Ok(Json(serde_json::json!({
2015                "error": format!("Invalid rule ID: {}", id)
2016            })));
2017        }
2018    };
2019
2020    let body_transforms: Vec<BodyTransform> = request
2021        .body_transforms
2022        .iter()
2023        .map(|t| {
2024            let op = match t.operation.as_str() {
2025                "replace" => TransformOperation::Replace,
2026                "add" => TransformOperation::Add,
2027                "remove" => TransformOperation::Remove,
2028                _ => TransformOperation::Replace,
2029            };
2030            BodyTransform {
2031                path: t.path.clone(),
2032                replace: t.replace.clone(),
2033                operation: op,
2034            }
2035        })
2036        .collect();
2037
2038    let updated_rule = BodyTransformRule {
2039        pattern: request.pattern.clone(),
2040        status_codes: request.status_codes.clone(),
2041        body_transforms,
2042        enabled: request.enabled,
2043    };
2044
2045    let request_count = config.request_replacements.len();
2046
2047    if rule_id < request_count {
2048        // Update request rule
2049        config.request_replacements[rule_id] = updated_rule;
2050    } else if rule_id < request_count + config.response_replacements.len() {
2051        // Update response rule
2052        let response_idx = rule_id - request_count;
2053        config.response_replacements[response_idx] = updated_rule;
2054    } else {
2055        return Ok(Json(serde_json::json!({
2056            "error": format!("Rule ID {} not found", rule_id)
2057        })));
2058    }
2059
2060    Ok(Json(serde_json::json!({
2061        "id": rule_id,
2062        "message": "Rule updated successfully"
2063    })))
2064}
2065
2066/// Delete a proxy replacement rule
2067async fn delete_proxy_rule(
2068    State(state): State<ManagementState>,
2069    Path(id): Path<String>,
2070) -> Result<Json<serde_json::Value>, StatusCode> {
2071    let proxy_config = match &state.proxy_config {
2072        Some(config) => config,
2073        None => {
2074            return Ok(Json(serde_json::json!({
2075                "error": "Proxy not configured. Proxy config not available."
2076            })));
2077        }
2078    };
2079
2080    let mut config = proxy_config.write().await;
2081    let rule_id: usize = match id.parse() {
2082        Ok(id) => id,
2083        Err(_) => {
2084            return Ok(Json(serde_json::json!({
2085                "error": format!("Invalid rule ID: {}", id)
2086            })));
2087        }
2088    };
2089
2090    let request_count = config.request_replacements.len();
2091
2092    if rule_id < request_count {
2093        // Delete request rule
2094        config.request_replacements.remove(rule_id);
2095    } else if rule_id < request_count + config.response_replacements.len() {
2096        // Delete response rule
2097        let response_idx = rule_id - request_count;
2098        config.response_replacements.remove(response_idx);
2099    } else {
2100        return Ok(Json(serde_json::json!({
2101            "error": format!("Rule ID {} not found", rule_id)
2102        })));
2103    }
2104
2105    Ok(Json(serde_json::json!({
2106        "id": rule_id,
2107        "message": "Rule deleted successfully"
2108    })))
2109}
2110
2111/// Get recent intercepted requests/responses for inspection
2112/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2113async fn get_proxy_inspect(
2114    State(_state): State<ManagementState>,
2115    Query(params): Query<std::collections::HashMap<String, String>>,
2116) -> Result<Json<serde_json::Value>, StatusCode> {
2117    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2118
2119    // Note: Request/response inspection would require:
2120    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2121    // 2. Integrating with proxy middleware to capture traffic
2122    // 3. Implementing filtering and pagination for large volumes of traffic
2123    // For now, return an empty response structure indicating the feature is not yet implemented
2124    Ok(Json(serde_json::json!({
2125        "requests": [],
2126        "responses": [],
2127        "limit": limit,
2128        "total": 0,
2129        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2130    })))
2131}
2132
2133/// Build the management API router
2134pub fn management_router(state: ManagementState) -> Router {
2135    let router = Router::new()
2136        .route("/health", get(health_check))
2137        .route("/stats", get(get_stats))
2138        .route("/config", get(get_config))
2139        .route("/config/validate", post(validate_config))
2140        .route("/config/bulk", post(bulk_update_config))
2141        .route("/mocks", get(list_mocks))
2142        .route("/mocks", post(create_mock))
2143        .route("/mocks/{id}", get(get_mock))
2144        .route("/mocks/{id}", put(update_mock))
2145        .route("/mocks/{id}", delete(delete_mock))
2146        .route("/export", get(export_mocks))
2147        .route("/import", post(import_mocks));
2148
2149    #[cfg(feature = "smtp")]
2150    let router = router
2151        .route("/smtp/mailbox", get(list_smtp_emails))
2152        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2153        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2154        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2155        .route("/smtp/mailbox/search", get(search_smtp_emails));
2156
2157    #[cfg(not(feature = "smtp"))]
2158    let router = router;
2159
2160    // MQTT routes
2161    #[cfg(feature = "mqtt")]
2162    let router = router
2163        .route("/mqtt/stats", get(get_mqtt_stats))
2164        .route("/mqtt/clients", get(get_mqtt_clients))
2165        .route("/mqtt/topics", get(get_mqtt_topics))
2166        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2167        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2168        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2169        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2170
2171    #[cfg(not(feature = "mqtt"))]
2172    let router = router
2173        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2174        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2175
2176    #[cfg(feature = "kafka")]
2177    let router = router
2178        .route("/kafka/stats", get(get_kafka_stats))
2179        .route("/kafka/topics", get(get_kafka_topics))
2180        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2181        .route("/kafka/groups", get(get_kafka_groups))
2182        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2183        .route("/kafka/produce", post(produce_kafka_message))
2184        .route("/kafka/produce/batch", post(produce_kafka_batch))
2185        .route("/kafka/messages/stream", get(kafka_messages_stream));
2186
2187    #[cfg(not(feature = "kafka"))]
2188    let router = router;
2189
2190    // Migration pipeline routes
2191    let router = router
2192        .route("/migration/routes", get(get_migration_routes))
2193        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2194        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2195        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2196        .route("/migration/groups/{group}", put(set_group_migration_mode))
2197        .route("/migration/groups", get(get_migration_groups))
2198        .route("/migration/status", get(get_migration_status));
2199
2200    // Proxy replacement rules routes
2201    let router = router
2202        .route("/proxy/rules", get(list_proxy_rules))
2203        .route("/proxy/rules", post(create_proxy_rule))
2204        .route("/proxy/rules/{id}", get(get_proxy_rule))
2205        .route("/proxy/rules/{id}", put(update_proxy_rule))
2206        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2207        .route("/proxy/inspect", get(get_proxy_inspect));
2208
2209    // AI-powered features
2210    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2211
2212    // Snapshot diff endpoints
2213    let router = router.nest(
2214        "/snapshot-diff",
2215        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2216    );
2217
2218    #[cfg(feature = "behavioral-cloning")]
2219    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2220
2221    let router = router
2222        .route("/mockai/learn", post(learn_from_examples))
2223        .route("/mockai/rules/explanations", get(list_rule_explanations))
2224        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2225        .route("/chaos/config", get(get_chaos_config))
2226        .route("/chaos/config", post(update_chaos_config))
2227        .route("/network/profiles", get(list_network_profiles))
2228        .route("/network/profile/apply", post(apply_network_profile));
2229
2230    // State machine API routes
2231    let router =
2232        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2233
2234    router.with_state(state)
2235}
2236
2237#[cfg(feature = "kafka")]
2238#[derive(Debug, Clone, Serialize, Deserialize)]
2239pub struct KafkaBrokerStats {
2240    /// Number of topics
2241    pub topics: usize,
2242    /// Total number of partitions
2243    pub partitions: usize,
2244    /// Number of consumer groups
2245    pub consumer_groups: usize,
2246    /// Total messages produced
2247    pub messages_produced: u64,
2248    /// Total messages consumed
2249    pub messages_consumed: u64,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaTopicInfo {
2255    pub name: String,
2256    pub partitions: usize,
2257    pub replication_factor: i32,
2258}
2259
2260#[cfg(feature = "kafka")]
2261#[derive(Debug, Clone, Serialize, Deserialize)]
2262pub struct KafkaConsumerGroupInfo {
2263    pub group_id: String,
2264    pub members: usize,
2265    pub state: String,
2266}
2267
2268#[cfg(feature = "kafka")]
2269/// Get Kafka broker statistics
2270async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2271    if let Some(broker) = &state.kafka_broker {
2272        let topics = broker.topics.read().await;
2273        let consumer_groups = broker.consumer_groups.read().await;
2274
2275        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2276        // Note: Metrics access removed as metrics field is private
2277        // TODO: Add public method to KafkaMockBroker to access metrics if needed
2278
2279        let stats = KafkaBrokerStats {
2280            topics: topics.len(),
2281            partitions: total_partitions,
2282            consumer_groups: consumer_groups.groups().len(),
2283            messages_produced: 0, // Metrics not accessible
2284            messages_consumed: 0, // Metrics not accessible
2285        };
2286
2287        Json(stats).into_response()
2288    } else {
2289        (
2290            StatusCode::SERVICE_UNAVAILABLE,
2291            Json(serde_json::json!({
2292                "error": "Kafka broker not available",
2293                "message": "Kafka broker is not enabled or not available."
2294            })),
2295        )
2296            .into_response()
2297    }
2298}
2299
2300#[cfg(feature = "kafka")]
2301/// List Kafka topics
2302async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2303    if let Some(broker) = &state.kafka_broker {
2304        let topics = broker.topics.read().await;
2305        let topic_list: Vec<KafkaTopicInfo> = topics
2306            .iter()
2307            .map(|(name, topic)| KafkaTopicInfo {
2308                name: name.clone(),
2309                partitions: topic.partitions.len(),
2310                replication_factor: topic.config.replication_factor as i32,
2311            })
2312            .collect();
2313
2314        Json(serde_json::json!({
2315            "topics": topic_list
2316        }))
2317        .into_response()
2318    } else {
2319        (
2320            StatusCode::SERVICE_UNAVAILABLE,
2321            Json(serde_json::json!({
2322                "error": "Kafka broker not available",
2323                "message": "Kafka broker is not enabled or not available."
2324            })),
2325        )
2326            .into_response()
2327    }
2328}
2329
2330#[cfg(feature = "kafka")]
2331/// Get Kafka topic details
2332async fn get_kafka_topic(
2333    State(state): State<ManagementState>,
2334    Path(topic_name): Path<String>,
2335) -> impl IntoResponse {
2336    if let Some(broker) = &state.kafka_broker {
2337        let topics = broker.topics.read().await;
2338        if let Some(topic) = topics.get(&topic_name) {
2339            Json(serde_json::json!({
2340                "name": topic_name,
2341                "partitions": topic.partitions.len(),
2342                "replication_factor": topic.config.replication_factor,
2343                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2344                    "id": idx as i32,
2345                    "leader": 0,
2346                    "replicas": vec![0],
2347                    "message_count": partition.messages.len()
2348                })).collect::<Vec<_>>()
2349            })).into_response()
2350        } else {
2351            (
2352                StatusCode::NOT_FOUND,
2353                Json(serde_json::json!({
2354                    "error": "Topic not found",
2355                    "topic": topic_name
2356                })),
2357            )
2358                .into_response()
2359        }
2360    } else {
2361        (
2362            StatusCode::SERVICE_UNAVAILABLE,
2363            Json(serde_json::json!({
2364                "error": "Kafka broker not available",
2365                "message": "Kafka broker is not enabled or not available."
2366            })),
2367        )
2368            .into_response()
2369    }
2370}
2371
2372#[cfg(feature = "kafka")]
2373/// List Kafka consumer groups
2374async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2375    if let Some(broker) = &state.kafka_broker {
2376        let consumer_groups = broker.consumer_groups.read().await;
2377        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2378            .groups()
2379            .iter()
2380            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2381                group_id: group_id.clone(),
2382                members: group.members.len(),
2383                state: "Stable".to_string(), // Simplified - could be more detailed
2384            })
2385            .collect();
2386
2387        Json(serde_json::json!({
2388            "groups": groups
2389        }))
2390        .into_response()
2391    } else {
2392        (
2393            StatusCode::SERVICE_UNAVAILABLE,
2394            Json(serde_json::json!({
2395                "error": "Kafka broker not available",
2396                "message": "Kafka broker is not enabled or not available."
2397            })),
2398        )
2399            .into_response()
2400    }
2401}
2402
2403#[cfg(feature = "kafka")]
2404/// Get Kafka consumer group details
2405async fn get_kafka_group(
2406    State(state): State<ManagementState>,
2407    Path(group_id): Path<String>,
2408) -> impl IntoResponse {
2409    if let Some(broker) = &state.kafka_broker {
2410        let consumer_groups = broker.consumer_groups.read().await;
2411        if let Some(group) = consumer_groups.groups().get(&group_id) {
2412            Json(serde_json::json!({
2413                "group_id": group_id,
2414                "members": group.members.len(),
2415                "state": "Stable",
2416                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2417                    "member_id": member_id,
2418                    "client_id": member.client_id,
2419                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2420                        "topic": a.topic,
2421                        "partitions": a.partitions
2422                    })).collect::<Vec<_>>()
2423                })).collect::<Vec<_>>(),
2424                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2425                    "topic": topic,
2426                    "partition": partition,
2427                    "offset": offset
2428                })).collect::<Vec<_>>()
2429            })).into_response()
2430        } else {
2431            (
2432                StatusCode::NOT_FOUND,
2433                Json(serde_json::json!({
2434                    "error": "Consumer group not found",
2435                    "group_id": group_id
2436                })),
2437            )
2438                .into_response()
2439        }
2440    } else {
2441        (
2442            StatusCode::SERVICE_UNAVAILABLE,
2443            Json(serde_json::json!({
2444                "error": "Kafka broker not available",
2445                "message": "Kafka broker is not enabled or not available."
2446            })),
2447        )
2448            .into_response()
2449    }
2450}
2451
2452// ========== Kafka Produce Handler ==========
2453
2454#[cfg(feature = "kafka")]
2455#[derive(Debug, Deserialize)]
2456pub struct KafkaProduceRequest {
2457    /// Topic to produce to
2458    pub topic: String,
2459    /// Message key (optional)
2460    #[serde(default)]
2461    pub key: Option<String>,
2462    /// Message value (JSON string or plain string)
2463    pub value: String,
2464    /// Partition ID (optional, auto-assigned if not provided)
2465    #[serde(default)]
2466    pub partition: Option<i32>,
2467    /// Message headers (optional, key-value pairs)
2468    #[serde(default)]
2469    pub headers: Option<std::collections::HashMap<String, String>>,
2470}
2471
2472#[cfg(feature = "kafka")]
2473/// Produce a message to a Kafka topic
2474async fn produce_kafka_message(
2475    State(state): State<ManagementState>,
2476    Json(request): Json<KafkaProduceRequest>,
2477) -> impl IntoResponse {
2478    if let Some(broker) = &state.kafka_broker {
2479        let mut topics = broker.topics.write().await;
2480
2481        // Get or create the topic
2482        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2483            mockforge_kafka::topics::Topic::new(request.topic.clone(), mockforge_kafka::topics::TopicConfig::default())
2484        });
2485
2486        // Determine partition
2487        let partition_id = if let Some(partition) = request.partition {
2488            partition
2489        } else {
2490            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2491        };
2492
2493        // Validate partition exists
2494        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2495            return (
2496                StatusCode::BAD_REQUEST,
2497                Json(serde_json::json!({
2498                    "error": "Invalid partition",
2499                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2500                })),
2501            )
2502                .into_response();
2503        }
2504
2505        // Create the message
2506        let key_clone = request.key.clone();
2507        let headers_clone = request.headers.clone();
2508        let message = mockforge_kafka::partitions::KafkaMessage {
2509            offset: 0, // Will be set by partition.append
2510            timestamp: chrono::Utc::now().timestamp_millis(),
2511            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2512            value: request.value.as_bytes().to_vec(),
2513            headers: headers_clone
2514                .clone()
2515                .unwrap_or_default()
2516                .into_iter()
2517                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2518                .collect(),
2519        };
2520
2521        // Produce to partition
2522        match topic_entry.produce(partition_id, message).await {
2523            Ok(offset) => {
2524                // Note: Metrics recording removed as metrics field is private
2525                // TODO: Add public method to KafkaMockBroker to record metrics if needed
2526
2527                // Emit message event for real-time monitoring
2528                #[cfg(feature = "kafka")]
2529                {
2530                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2531                        topic: request.topic.clone(),
2532                        key: key_clone,
2533                        value: request.value.clone(),
2534                        partition: partition_id,
2535                        offset,
2536                        headers: headers_clone,
2537                        timestamp: chrono::Utc::now().to_rfc3339(),
2538                    });
2539                    let _ = state.message_events.send(event);
2540                }
2541
2542                Json(serde_json::json!({
2543                    "success": true,
2544                    "message": format!("Message produced to topic '{}'", request.topic),
2545                    "topic": request.topic,
2546                    "partition": partition_id,
2547                    "offset": offset
2548                }))
2549                .into_response()
2550            }
2551            Err(e) => (
2552                StatusCode::INTERNAL_SERVER_ERROR,
2553                Json(serde_json::json!({
2554                    "error": "Failed to produce message",
2555                    "message": e.to_string()
2556                })),
2557            )
2558                .into_response(),
2559        }
2560    } else {
2561        (
2562            StatusCode::SERVICE_UNAVAILABLE,
2563            Json(serde_json::json!({
2564                "error": "Kafka broker not available",
2565                "message": "Kafka broker is not enabled or not available."
2566            })),
2567        )
2568            .into_response()
2569    }
2570}
2571
2572#[cfg(feature = "kafka")]
2573#[derive(Debug, Deserialize)]
2574pub struct KafkaBatchProduceRequest {
2575    /// List of messages to produce
2576    pub messages: Vec<KafkaProduceRequest>,
2577    /// Delay between messages in milliseconds
2578    #[serde(default = "default_delay")]
2579    pub delay_ms: u64,
2580}
2581
2582#[cfg(feature = "kafka")]
2583/// Produce multiple messages to Kafka topics
2584async fn produce_kafka_batch(
2585    State(state): State<ManagementState>,
2586    Json(request): Json<KafkaBatchProduceRequest>,
2587) -> impl IntoResponse {
2588    if let Some(broker) = &state.kafka_broker {
2589        if request.messages.is_empty() {
2590            return (
2591                StatusCode::BAD_REQUEST,
2592                Json(serde_json::json!({
2593                    "error": "Empty batch",
2594                    "message": "At least one message is required"
2595                })),
2596            )
2597                .into_response();
2598        }
2599
2600        let mut results = Vec::new();
2601
2602        for (index, msg_request) in request.messages.iter().enumerate() {
2603            let mut topics = broker.topics.write().await;
2604
2605            // Get or create the topic
2606            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2607                mockforge_kafka::topics::Topic::new(
2608                    msg_request.topic.clone(),
2609                    mockforge_kafka::topics::TopicConfig::default(),
2610                )
2611            });
2612
2613            // Determine partition
2614            let partition_id = if let Some(partition) = msg_request.partition {
2615                partition
2616            } else {
2617                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2618            };
2619
2620            // Validate partition exists
2621            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2622                results.push(serde_json::json!({
2623                    "index": index,
2624                    "success": false,
2625                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2626                }));
2627                continue;
2628            }
2629
2630            // Create the message
2631            let message = mockforge_kafka::partitions::KafkaMessage {
2632                offset: 0,
2633                timestamp: chrono::Utc::now().timestamp_millis(),
2634                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2635                value: msg_request.value.as_bytes().to_vec(),
2636                headers: msg_request
2637                    .headers
2638                    .clone()
2639                    .unwrap_or_default()
2640                    .into_iter()
2641                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2642                    .collect(),
2643            };
2644
2645            // Produce to partition
2646            match topic_entry.produce(partition_id, message).await {
2647                Ok(offset) => {
2648                    // Note: Metrics recording removed as metrics field is private
2649                    // TODO: Add public method to KafkaMockBroker to record metrics if needed
2650
2651                    // Emit message event
2652                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2653                        topic: msg_request.topic.clone(),
2654                        key: msg_request.key.clone(),
2655                        value: msg_request.value.clone(),
2656                        partition: partition_id,
2657                        offset,
2658                        headers: msg_request.headers.clone(),
2659                        timestamp: chrono::Utc::now().to_rfc3339(),
2660                    });
2661                    let _ = state.message_events.send(event);
2662
2663                    results.push(serde_json::json!({
2664                        "index": index,
2665                        "success": true,
2666                        "topic": msg_request.topic,
2667                        "partition": partition_id,
2668                        "offset": offset
2669                    }));
2670                }
2671                Err(e) => {
2672                    results.push(serde_json::json!({
2673                        "index": index,
2674                        "success": false,
2675                        "error": e.to_string()
2676                    }));
2677                }
2678            }
2679
2680            // Add delay between messages (except for the last one)
2681            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2682                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2683            }
2684        }
2685
2686        let success_count =
2687            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2688
2689        Json(serde_json::json!({
2690            "success": true,
2691            "total": request.messages.len(),
2692            "succeeded": success_count,
2693            "failed": request.messages.len() - success_count,
2694            "results": results
2695        }))
2696        .into_response()
2697    } else {
2698        (
2699            StatusCode::SERVICE_UNAVAILABLE,
2700            Json(serde_json::json!({
2701                "error": "Kafka broker not available",
2702                "message": "Kafka broker is not enabled or not available."
2703            })),
2704        )
2705            .into_response()
2706    }
2707}
2708
2709// ========== Real-time Message Streaming (SSE) ==========
2710
2711#[cfg(feature = "mqtt")]
2712/// SSE stream for MQTT messages
2713async fn mqtt_messages_stream(
2714    State(state): State<ManagementState>,
2715    Query(params): Query<std::collections::HashMap<String, String>>,
2716) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2717    let rx = state.message_events.subscribe();
2718    let topic_filter = params.get("topic").cloned();
2719
2720    let stream = stream::unfold(rx, move |mut rx| {
2721        let topic_filter = topic_filter.clone();
2722
2723        async move {
2724            loop {
2725                match rx.recv().await {
2726                    Ok(MessageEvent::Mqtt(event)) => {
2727                        // Apply topic filter if specified
2728                        if let Some(filter) = &topic_filter {
2729                            if !event.topic.contains(filter) {
2730                                continue;
2731                            }
2732                        }
2733
2734                        let event_json = serde_json::json!({
2735                            "protocol": "mqtt",
2736                            "topic": event.topic,
2737                            "payload": event.payload,
2738                            "qos": event.qos,
2739                            "retain": event.retain,
2740                            "timestamp": event.timestamp,
2741                        });
2742
2743                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2744                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2745                            return Some((Ok(sse_event), rx));
2746                        }
2747                    }
2748                    #[cfg(feature = "kafka")]
2749                    Ok(MessageEvent::Kafka(_)) => {
2750                        // Skip Kafka events in MQTT stream
2751                        continue;
2752                    }
2753                    Err(broadcast::error::RecvError::Closed) => {
2754                        return None;
2755                    }
2756                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2757                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2758                        continue;
2759                    }
2760                }
2761            }
2762        }
2763    });
2764
2765    Sse::new(stream).keep_alive(
2766        axum::response::sse::KeepAlive::new()
2767            .interval(std::time::Duration::from_secs(15))
2768            .text("keep-alive-text"),
2769    )
2770}
2771
2772#[cfg(feature = "kafka")]
2773/// SSE stream for Kafka messages
2774async fn kafka_messages_stream(
2775    State(state): State<ManagementState>,
2776    Query(params): Query<std::collections::HashMap<String, String>>,
2777) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2778    let mut rx = state.message_events.subscribe();
2779    let topic_filter = params.get("topic").cloned();
2780
2781    let stream = stream::unfold(rx, move |mut rx| {
2782        let topic_filter = topic_filter.clone();
2783
2784        async move {
2785            loop {
2786                match rx.recv().await {
2787                    #[cfg(feature = "mqtt")]
2788                    Ok(MessageEvent::Mqtt(_)) => {
2789                        // Skip MQTT events in Kafka stream
2790                        continue;
2791                    }
2792                    Ok(MessageEvent::Kafka(event)) => {
2793                        // Apply topic filter if specified
2794                        if let Some(filter) = &topic_filter {
2795                            if !event.topic.contains(filter) {
2796                                continue;
2797                            }
2798                        }
2799
2800                        let event_json = serde_json::json!({
2801                            "protocol": "kafka",
2802                            "topic": event.topic,
2803                            "key": event.key,
2804                            "value": event.value,
2805                            "partition": event.partition,
2806                            "offset": event.offset,
2807                            "headers": event.headers,
2808                            "timestamp": event.timestamp,
2809                        });
2810
2811                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2812                            let sse_event =
2813                                Event::default().event("kafka_message").data(event_data);
2814                            return Some((Ok(sse_event), rx));
2815                        }
2816                    }
2817                    Err(broadcast::error::RecvError::Closed) => {
2818                        return None;
2819                    }
2820                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2821                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2822                        continue;
2823                    }
2824                }
2825            }
2826        }
2827    });
2828
2829    Sse::new(stream).keep_alive(
2830        axum::response::sse::KeepAlive::new()
2831            .interval(std::time::Duration::from_secs(15))
2832            .text("keep-alive-text"),
2833    )
2834}
2835
2836// ========== AI-Powered Features ==========
2837
2838/// Request for AI-powered API specification generation
2839#[derive(Debug, Deserialize)]
2840pub struct GenerateSpecRequest {
2841    /// Natural language description of the API to generate
2842    pub query: String,
2843    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2844    pub spec_type: String,
2845    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2846    pub api_version: Option<String>,
2847}
2848
2849/// Request for OpenAPI generation from recorded traffic
2850#[derive(Debug, Deserialize)]
2851pub struct GenerateOpenApiFromTrafficRequest {
2852    /// Path to recorder database (optional, defaults to ./recordings.db)
2853    #[serde(default)]
2854    pub database_path: Option<String>,
2855    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2856    #[serde(default)]
2857    pub since: Option<String>,
2858    /// End time for filtering (ISO 8601 format)
2859    #[serde(default)]
2860    pub until: Option<String>,
2861    /// Path pattern filter (supports wildcards, e.g., /api/*)
2862    #[serde(default)]
2863    pub path_pattern: Option<String>,
2864    /// Minimum confidence score for including paths (0.0 to 1.0)
2865    #[serde(default = "default_min_confidence")]
2866    pub min_confidence: f64,
2867}
2868
2869fn default_min_confidence() -> f64 {
2870    0.7
2871}
2872
2873/// Generate API specification from natural language using AI
2874#[cfg(feature = "data-faker")]
2875async fn generate_ai_spec(
2876    State(_state): State<ManagementState>,
2877    Json(request): Json<GenerateSpecRequest>,
2878) -> impl IntoResponse {
2879    use mockforge_data::rag::{
2880        config::{EmbeddingProvider, LlmProvider, RagConfig},
2881        engine::RagEngine,
2882        storage::{DocumentStorage, StorageFactory},
2883    };
2884    use std::sync::Arc;
2885
2886    // Build RAG config from environment variables
2887    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2888        .ok()
2889        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2890
2891    // Check if RAG is configured - require API key
2892    if api_key.is_none() {
2893        return (
2894            StatusCode::SERVICE_UNAVAILABLE,
2895            Json(serde_json::json!({
2896                "error": "AI service not configured",
2897                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2898            })),
2899        )
2900            .into_response();
2901    }
2902
2903    // Build RAG configuration
2904    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2905        .unwrap_or_else(|_| "openai".to_string())
2906        .to_lowercase();
2907
2908    let provider = match provider_str.as_str() {
2909        "openai" => LlmProvider::OpenAI,
2910        "anthropic" => LlmProvider::Anthropic,
2911        "ollama" => LlmProvider::Ollama,
2912        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2913        _ => LlmProvider::OpenAI,
2914    };
2915
2916    let api_endpoint =
2917        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2918            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2919            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2920            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2921            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2922        });
2923
2924    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2925        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2926        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2927        LlmProvider::Ollama => "llama2".to_string(),
2928        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2929    });
2930
2931    // Build RagConfig using default() and override fields
2932    let mut rag_config = RagConfig::default();
2933    rag_config.provider = provider;
2934    rag_config.api_endpoint = api_endpoint;
2935    rag_config.api_key = api_key;
2936    rag_config.model = model;
2937    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2938        .unwrap_or_else(|_| "4096".to_string())
2939        .parse()
2940        .unwrap_or(4096);
2941    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2942        .unwrap_or_else(|_| "0.3".to_string())
2943        .parse()
2944        .unwrap_or(0.3); // Lower temperature for more structured output
2945    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2946        .unwrap_or_else(|_| "60".to_string())
2947        .parse()
2948        .unwrap_or(60);
2949    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2950        .unwrap_or_else(|_| "4000".to_string())
2951        .parse()
2952        .unwrap_or(4000);
2953
2954    // Build the prompt for spec generation
2955    let spec_type_label = match request.spec_type.as_str() {
2956        "openapi" => "OpenAPI 3.0",
2957        "graphql" => "GraphQL",
2958        "asyncapi" => "AsyncAPI",
2959        _ => "OpenAPI 3.0",
2960    };
2961
2962    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2963
2964    let prompt = format!(
2965        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2966
2967User Requirements:
2968{}
2969
2970Instructions:
29711. Generate a complete, valid {} specification
29722. Include all paths, operations, request/response schemas, and components
29733. Use realistic field names and data types
29744. Include proper descriptions and examples
29755. Follow {} best practices
29766. Return ONLY the specification, no additional explanation
29777. For OpenAPI, use version {}
2978
2979Return the specification in {} format."#,
2980        spec_type_label,
2981        request.query,
2982        spec_type_label,
2983        spec_type_label,
2984        api_version,
2985        if request.spec_type == "graphql" {
2986            "GraphQL SDL"
2987        } else {
2988            "YAML"
2989        }
2990    );
2991
2992    // Create in-memory storage for RAG engine
2993    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
2994    // We need to use unsafe transmute or create a wrapper, but for now we'll use
2995    // a simpler approach: create InMemoryStorage directly
2996    use mockforge_data::rag::storage::InMemoryStorage;
2997    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2998
2999    // Create RAG engine
3000    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3001        Ok(engine) => engine,
3002        Err(e) => {
3003            return (
3004                StatusCode::INTERNAL_SERVER_ERROR,
3005                Json(serde_json::json!({
3006                    "error": "Failed to initialize RAG engine",
3007                    "message": e.to_string()
3008                })),
3009            )
3010                .into_response();
3011        }
3012    };
3013
3014    // Generate using RAG engine
3015    match rag_engine.generate(&prompt, None).await {
3016        Ok(generated_text) => {
3017            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3018            let spec = if request.spec_type == "graphql" {
3019                // For GraphQL, extract SDL
3020                extract_graphql_schema(&generated_text)
3021            } else {
3022                // For OpenAPI/AsyncAPI, extract YAML
3023                extract_yaml_spec(&generated_text)
3024            };
3025
3026            Json(serde_json::json!({
3027                "success": true,
3028                "spec": spec,
3029                "spec_type": request.spec_type,
3030            }))
3031            .into_response()
3032        }
3033        Err(e) => (
3034            StatusCode::INTERNAL_SERVER_ERROR,
3035            Json(serde_json::json!({
3036                "error": "AI generation failed",
3037                "message": e.to_string()
3038            })),
3039        )
3040            .into_response(),
3041    }
3042}
3043
3044#[cfg(not(feature = "data-faker"))]
3045async fn generate_ai_spec(
3046    State(_state): State<ManagementState>,
3047    Json(_request): Json<GenerateSpecRequest>,
3048) -> impl IntoResponse {
3049    (
3050        StatusCode::NOT_IMPLEMENTED,
3051        Json(serde_json::json!({
3052            "error": "AI features not enabled",
3053            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3054        })),
3055    )
3056        .into_response()
3057}
3058
3059/// Generate OpenAPI specification from recorded traffic
3060#[cfg(feature = "behavioral-cloning")]
3061async fn generate_openapi_from_traffic(
3062    State(_state): State<ManagementState>,
3063    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3064) -> impl IntoResponse {
3065    use chrono::{DateTime, Utc};
3066    use mockforge_core::intelligent_behavior::{
3067        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3068        IntelligentBehaviorConfig,
3069    };
3070    use mockforge_recorder::{
3071        database::RecorderDatabase,
3072        openapi_export::{QueryFilters, RecordingsToOpenApi},
3073    };
3074    use std::path::PathBuf;
3075
3076    // Determine database path
3077    let db_path = if let Some(ref path) = request.database_path {
3078        PathBuf::from(path)
3079    } else {
3080        std::env::current_dir()
3081            .unwrap_or_else(|_| PathBuf::from("."))
3082            .join("recordings.db")
3083    };
3084
3085    // Open database
3086    let db = match RecorderDatabase::new(&db_path).await {
3087        Ok(db) => db,
3088        Err(e) => {
3089            return (
3090                StatusCode::BAD_REQUEST,
3091                Json(serde_json::json!({
3092                    "error": "Database error",
3093                    "message": format!("Failed to open recorder database: {}", e)
3094                })),
3095            )
3096                .into_response();
3097        }
3098    };
3099
3100    // Parse time filters
3101    let since_dt = if let Some(ref since_str) = request.since {
3102        match DateTime::parse_from_rfc3339(since_str) {
3103            Ok(dt) => Some(dt.with_timezone(&Utc)),
3104            Err(e) => {
3105                return (
3106                    StatusCode::BAD_REQUEST,
3107                    Json(serde_json::json!({
3108                        "error": "Invalid date format",
3109                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3110                    })),
3111                )
3112                    .into_response();
3113            }
3114        }
3115    } else {
3116        None
3117    };
3118
3119    let until_dt = if let Some(ref until_str) = request.until {
3120        match DateTime::parse_from_rfc3339(until_str) {
3121            Ok(dt) => Some(dt.with_timezone(&Utc)),
3122            Err(e) => {
3123                return (
3124                    StatusCode::BAD_REQUEST,
3125                    Json(serde_json::json!({
3126                        "error": "Invalid date format",
3127                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3128                    })),
3129                )
3130                    .into_response();
3131            }
3132        }
3133    } else {
3134        None
3135    };
3136
3137    // Build query filters
3138    let query_filters = QueryFilters {
3139        since: since_dt,
3140        until: until_dt,
3141        path_pattern: request.path_pattern.clone(),
3142        min_status_code: None,
3143        max_requests: Some(1000),
3144    };
3145
3146    // Query HTTP exchanges
3147    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3148    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3149    // dependency, so we need to manually convert to the local version.
3150    let exchanges_from_recorder =
3151        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3152            Ok(exchanges) => exchanges,
3153            Err(e) => {
3154                return (
3155                    StatusCode::INTERNAL_SERVER_ERROR,
3156                    Json(serde_json::json!({
3157                        "error": "Query error",
3158                        "message": format!("Failed to query HTTP exchanges: {}", e)
3159                    })),
3160                )
3161                    .into_response();
3162            }
3163        };
3164
3165    if exchanges_from_recorder.is_empty() {
3166        return (
3167            StatusCode::NOT_FOUND,
3168            Json(serde_json::json!({
3169                "error": "No exchanges found",
3170                "message": "No HTTP exchanges found matching the specified filters"
3171            })),
3172        )
3173            .into_response();
3174    }
3175
3176    // Convert to local HttpExchange type to avoid version mismatch
3177    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3178    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3179        .into_iter()
3180        .map(|e| LocalHttpExchange {
3181            method: e.method,
3182            path: e.path,
3183            query_params: e.query_params,
3184            headers: e.headers,
3185            body: e.body,
3186            body_encoding: e.body_encoding,
3187            status_code: e.status_code,
3188            response_headers: e.response_headers,
3189            response_body: e.response_body,
3190            response_body_encoding: e.response_body_encoding,
3191            timestamp: e.timestamp,
3192        })
3193        .collect();
3194
3195    // Create OpenAPI generator config
3196    let behavior_config = IntelligentBehaviorConfig::default();
3197    let gen_config = OpenApiGenerationConfig {
3198        min_confidence: request.min_confidence,
3199        behavior_model: Some(behavior_config.behavior_model),
3200    };
3201
3202    // Generate OpenAPI spec
3203    let generator = OpenApiSpecGenerator::new(gen_config);
3204    let result = match generator.generate_from_exchanges(exchanges).await {
3205        Ok(result) => result,
3206        Err(e) => {
3207            return (
3208                StatusCode::INTERNAL_SERVER_ERROR,
3209                Json(serde_json::json!({
3210                    "error": "Generation error",
3211                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3212                })),
3213            )
3214                .into_response();
3215        }
3216    };
3217
3218    // Prepare response
3219    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3220        raw.clone()
3221    } else {
3222        match serde_json::to_value(&result.spec.spec) {
3223            Ok(json) => json,
3224            Err(e) => {
3225                return (
3226                    StatusCode::INTERNAL_SERVER_ERROR,
3227                    Json(serde_json::json!({
3228                        "error": "Serialization error",
3229                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3230                    })),
3231                )
3232                    .into_response();
3233            }
3234        }
3235    };
3236
3237    // Build response with metadata
3238    let response = serde_json::json!({
3239        "spec": spec_json,
3240        "metadata": {
3241            "requests_analyzed": result.metadata.requests_analyzed,
3242            "paths_inferred": result.metadata.paths_inferred,
3243            "path_confidence": result.metadata.path_confidence,
3244            "generated_at": result.metadata.generated_at.to_rfc3339(),
3245            "duration_ms": result.metadata.duration_ms,
3246        }
3247    });
3248
3249    Json(response).into_response()
3250}
3251
3252/// List all rule explanations
3253async fn list_rule_explanations(
3254    State(state): State<ManagementState>,
3255    Query(params): Query<std::collections::HashMap<String, String>>,
3256) -> impl IntoResponse {
3257    use mockforge_core::intelligent_behavior::RuleType;
3258
3259    let explanations = state.rule_explanations.read().await;
3260    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3261
3262    // Filter by rule type if provided
3263    if let Some(rule_type_str) = params.get("rule_type") {
3264        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3265            explanations_vec.retain(|e| e.rule_type == rule_type);
3266        }
3267    }
3268
3269    // Filter by minimum confidence if provided
3270    if let Some(min_confidence_str) = params.get("min_confidence") {
3271        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3272            explanations_vec.retain(|e| e.confidence >= min_confidence);
3273        }
3274    }
3275
3276    // Sort by confidence (descending) and then by generated_at (descending)
3277    explanations_vec.sort_by(|a, b| {
3278        b.confidence
3279            .partial_cmp(&a.confidence)
3280            .unwrap_or(std::cmp::Ordering::Equal)
3281            .then_with(|| b.generated_at.cmp(&a.generated_at))
3282    });
3283
3284    Json(serde_json::json!({
3285        "explanations": explanations_vec,
3286        "total": explanations_vec.len(),
3287    }))
3288    .into_response()
3289}
3290
3291/// Get a specific rule explanation by ID
3292async fn get_rule_explanation(
3293    State(state): State<ManagementState>,
3294    Path(rule_id): Path<String>,
3295) -> impl IntoResponse {
3296    let explanations = state.rule_explanations.read().await;
3297
3298    match explanations.get(&rule_id) {
3299        Some(explanation) => Json(serde_json::json!({
3300            "explanation": explanation,
3301        }))
3302        .into_response(),
3303        None => (
3304            StatusCode::NOT_FOUND,
3305            Json(serde_json::json!({
3306                "error": "Rule explanation not found",
3307                "message": format!("No explanation found for rule ID: {}", rule_id)
3308            })),
3309        )
3310            .into_response(),
3311    }
3312}
3313
3314/// Request for learning from examples
3315#[derive(Debug, Deserialize)]
3316pub struct LearnFromExamplesRequest {
3317    /// Example request/response pairs to learn from
3318    pub examples: Vec<ExamplePairRequest>,
3319    /// Optional configuration override
3320    #[serde(default)]
3321    pub config: Option<serde_json::Value>,
3322}
3323
3324/// Example pair request format
3325#[derive(Debug, Deserialize)]
3326pub struct ExamplePairRequest {
3327    /// Request data (method, path, body, etc.)
3328    pub request: serde_json::Value,
3329    /// Response data (status_code, body, etc.)
3330    pub response: serde_json::Value,
3331}
3332
3333/// Learn behavioral rules from example pairs
3334///
3335/// This endpoint accepts example request/response pairs, generates behavioral rules
3336/// with explanations, and stores the explanations for later retrieval.
3337async fn learn_from_examples(
3338    State(state): State<ManagementState>,
3339    Json(request): Json<LearnFromExamplesRequest>,
3340) -> impl IntoResponse {
3341    use mockforge_core::intelligent_behavior::{
3342        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3343        rule_generator::{ExamplePair, RuleGenerator},
3344    };
3345
3346    if request.examples.is_empty() {
3347        return (
3348            StatusCode::BAD_REQUEST,
3349            Json(serde_json::json!({
3350                "error": "No examples provided",
3351                "message": "At least one example pair is required"
3352            })),
3353        )
3354            .into_response();
3355    }
3356
3357    // Convert request examples to ExamplePair format
3358    let example_pairs: Result<Vec<ExamplePair>, String> = request
3359        .examples
3360        .into_iter()
3361        .enumerate()
3362        .map(|(idx, ex)| {
3363            // Parse request JSON to extract method, path, body, etc.
3364            let method = ex
3365                .request
3366                .get("method")
3367                .and_then(|v| v.as_str())
3368                .map(|s| s.to_string())
3369                .unwrap_or_else(|| "GET".to_string());
3370            let path = ex
3371                .request
3372                .get("path")
3373                .and_then(|v| v.as_str())
3374                .map(|s| s.to_string())
3375                .unwrap_or_else(|| "/".to_string());
3376            let request_body = ex.request.get("body").cloned();
3377            let query_params = ex
3378                .request
3379                .get("query_params")
3380                .and_then(|v| v.as_object())
3381                .map(|obj| {
3382                    obj.iter()
3383                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3384                        .collect()
3385                })
3386                .unwrap_or_default();
3387            let headers = ex
3388                .request
3389                .get("headers")
3390                .and_then(|v| v.as_object())
3391                .map(|obj| {
3392                    obj.iter()
3393                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3394                        .collect()
3395                })
3396                .unwrap_or_default();
3397
3398            // Parse response JSON to extract status, body, etc.
3399            let status = ex
3400                .response
3401                .get("status_code")
3402                .or_else(|| ex.response.get("status"))
3403                .and_then(|v| v.as_u64())
3404                .map(|n| n as u16)
3405                .unwrap_or(200);
3406            let response_body = ex.response.get("body").cloned();
3407
3408            Ok(ExamplePair {
3409                method,
3410                path,
3411                request: request_body,
3412                status,
3413                response: response_body,
3414                query_params,
3415                headers,
3416                metadata: {
3417                    let mut meta = std::collections::HashMap::new();
3418                    meta.insert("source".to_string(), "api".to_string());
3419                    meta.insert("example_index".to_string(), idx.to_string());
3420                    meta
3421                },
3422            })
3423        })
3424        .collect();
3425
3426    let example_pairs = match example_pairs {
3427        Ok(pairs) => pairs,
3428        Err(e) => {
3429            return (
3430                StatusCode::BAD_REQUEST,
3431                Json(serde_json::json!({
3432                    "error": "Invalid examples",
3433                    "message": e
3434                })),
3435            )
3436                .into_response();
3437        }
3438    };
3439
3440    // Create behavior config (use provided config or default)
3441    let behavior_config = if let Some(config_json) = request.config {
3442        // Try to deserialize custom config, fall back to default
3443        serde_json::from_value(config_json)
3444            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3445            .behavior_model
3446    } else {
3447        BehaviorModelConfig::default()
3448    };
3449
3450    // Create rule generator
3451    let generator = RuleGenerator::new(behavior_config);
3452
3453    // Generate rules with explanations
3454    let (rules, explanations) =
3455        match generator.generate_rules_with_explanations(example_pairs).await {
3456            Ok(result) => result,
3457            Err(e) => {
3458                return (
3459                    StatusCode::INTERNAL_SERVER_ERROR,
3460                    Json(serde_json::json!({
3461                        "error": "Rule generation failed",
3462                        "message": format!("Failed to generate rules: {}", e)
3463                    })),
3464                )
3465                    .into_response();
3466            }
3467        };
3468
3469    // Store explanations in ManagementState
3470    {
3471        let mut stored_explanations = state.rule_explanations.write().await;
3472        for explanation in &explanations {
3473            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3474        }
3475    }
3476
3477    // Prepare response
3478    let response = serde_json::json!({
3479        "success": true,
3480        "rules_generated": {
3481            "consistency_rules": rules.consistency_rules.len(),
3482            "schemas": rules.schemas.len(),
3483            "state_machines": rules.state_transitions.len(),
3484            "system_prompt": !rules.system_prompt.is_empty(),
3485        },
3486        "explanations": explanations.iter().map(|e| serde_json::json!({
3487            "rule_id": e.rule_id,
3488            "rule_type": e.rule_type,
3489            "confidence": e.confidence,
3490            "reasoning": e.reasoning,
3491        })).collect::<Vec<_>>(),
3492        "total_explanations": explanations.len(),
3493    });
3494
3495    Json(response).into_response()
3496}
3497
3498fn extract_yaml_spec(text: &str) -> String {
3499    // Try to find YAML code blocks
3500    if let Some(start) = text.find("```yaml") {
3501        let yaml_start = text[start + 7..].trim_start();
3502        if let Some(end) = yaml_start.find("```") {
3503            return yaml_start[..end].trim().to_string();
3504        }
3505    }
3506    if let Some(start) = text.find("```") {
3507        let content_start = text[start + 3..].trim_start();
3508        if let Some(end) = content_start.find("```") {
3509            return content_start[..end].trim().to_string();
3510        }
3511    }
3512
3513    // Check if it starts with openapi: or asyncapi:
3514    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3515        return text.trim().to_string();
3516    }
3517
3518    // Return as-is if no code blocks found
3519    text.trim().to_string()
3520}
3521
3522/// Extract GraphQL schema from text content
3523fn extract_graphql_schema(text: &str) -> String {
3524    // Try to find GraphQL code blocks
3525    if let Some(start) = text.find("```graphql") {
3526        let schema_start = text[start + 10..].trim_start();
3527        if let Some(end) = schema_start.find("```") {
3528            return schema_start[..end].trim().to_string();
3529        }
3530    }
3531    if let Some(start) = text.find("```") {
3532        let content_start = text[start + 3..].trim_start();
3533        if let Some(end) = content_start.find("```") {
3534            return content_start[..end].trim().to_string();
3535        }
3536    }
3537
3538    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3539    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3540        return text.trim().to_string();
3541    }
3542
3543    text.trim().to_string()
3544}
3545
3546// ========== Chaos Engineering Management ==========
3547
3548/// Get current chaos engineering configuration
3549async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3550    #[cfg(feature = "chaos")]
3551    {
3552        if let Some(chaos_state) = &state.chaos_api_state {
3553            let config = chaos_state.config.read().await;
3554            // Convert ChaosConfig to JSON response format
3555            Json(serde_json::json!({
3556                "enabled": config.enabled,
3557                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3558                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3559                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3560                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3561            }))
3562            .into_response()
3563        } else {
3564            // Chaos API not available, return default
3565            Json(serde_json::json!({
3566                "enabled": false,
3567                "latency": null,
3568                "fault_injection": null,
3569                "rate_limit": null,
3570                "traffic_shaping": null,
3571            }))
3572            .into_response()
3573        }
3574    }
3575    #[cfg(not(feature = "chaos"))]
3576    {
3577        // Chaos feature not enabled
3578        Json(serde_json::json!({
3579            "enabled": false,
3580            "latency": null,
3581            "fault_injection": null,
3582            "rate_limit": null,
3583            "traffic_shaping": null,
3584        }))
3585        .into_response()
3586    }
3587}
3588
3589/// Request to update chaos configuration
3590#[derive(Debug, Deserialize)]
3591pub struct ChaosConfigUpdate {
3592    /// Whether to enable chaos engineering
3593    pub enabled: Option<bool>,
3594    /// Latency configuration
3595    pub latency: Option<serde_json::Value>,
3596    /// Fault injection configuration
3597    pub fault_injection: Option<serde_json::Value>,
3598    /// Rate limiting configuration
3599    pub rate_limit: Option<serde_json::Value>,
3600    /// Traffic shaping configuration
3601    pub traffic_shaping: Option<serde_json::Value>,
3602}
3603
3604/// Update chaos engineering configuration
3605async fn update_chaos_config(
3606    State(state): State<ManagementState>,
3607    Json(config_update): Json<ChaosConfigUpdate>,
3608) -> impl IntoResponse {
3609    #[cfg(feature = "chaos")]
3610    {
3611        if let Some(chaos_state) = &state.chaos_api_state {
3612            use mockforge_chaos::config::{
3613                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3614                TrafficShapingConfig,
3615            };
3616
3617            let mut config = chaos_state.config.write().await;
3618
3619            // Update enabled flag if provided
3620            if let Some(enabled) = config_update.enabled {
3621                config.enabled = enabled;
3622            }
3623
3624            // Update latency config if provided
3625            if let Some(latency_json) = config_update.latency {
3626                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3627                    config.latency = Some(latency);
3628                }
3629            }
3630
3631            // Update fault injection config if provided
3632            if let Some(fault_json) = config_update.fault_injection {
3633                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3634                    config.fault_injection = Some(fault);
3635                }
3636            }
3637
3638            // Update rate limit config if provided
3639            if let Some(rate_json) = config_update.rate_limit {
3640                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3641                    config.rate_limit = Some(rate);
3642                }
3643            }
3644
3645            // Update traffic shaping config if provided
3646            if let Some(traffic_json) = config_update.traffic_shaping {
3647                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3648                    config.traffic_shaping = Some(traffic);
3649                }
3650            }
3651
3652            // Reinitialize middleware injectors with new config
3653            // The middleware will pick up the changes on the next request
3654            drop(config);
3655
3656            info!("Chaos configuration updated successfully");
3657            Json(serde_json::json!({
3658                "success": true,
3659                "message": "Chaos configuration updated and applied"
3660            }))
3661            .into_response()
3662        } else {
3663            (
3664                StatusCode::SERVICE_UNAVAILABLE,
3665                Json(serde_json::json!({
3666                    "success": false,
3667                    "error": "Chaos API not available",
3668                    "message": "Chaos engineering is not enabled or configured"
3669                })),
3670            )
3671                .into_response()
3672        }
3673    }
3674    #[cfg(not(feature = "chaos"))]
3675    {
3676        (
3677            StatusCode::NOT_IMPLEMENTED,
3678            Json(serde_json::json!({
3679                "success": false,
3680                "error": "Chaos feature not enabled",
3681                "message": "Chaos engineering feature is not compiled into this build"
3682            })),
3683        )
3684            .into_response()
3685    }
3686}
3687
3688// ========== Network Profile Management ==========
3689
3690/// List available network profiles
3691async fn list_network_profiles() -> impl IntoResponse {
3692    use mockforge_core::network_profiles::NetworkProfileCatalog;
3693
3694    let catalog = NetworkProfileCatalog::default();
3695    let profiles: Vec<serde_json::Value> = catalog
3696        .list_profiles_with_description()
3697        .iter()
3698        .map(|(name, description)| {
3699            serde_json::json!({
3700                "name": name,
3701                "description": description,
3702            })
3703        })
3704        .collect();
3705
3706    Json(serde_json::json!({
3707        "profiles": profiles
3708    }))
3709    .into_response()
3710}
3711
3712#[derive(Debug, Deserialize)]
3713/// Request to apply a network profile
3714pub struct ApplyNetworkProfileRequest {
3715    /// Name of the network profile to apply
3716    pub profile_name: String,
3717}
3718
3719/// Apply a network profile
3720async fn apply_network_profile(
3721    State(state): State<ManagementState>,
3722    Json(request): Json<ApplyNetworkProfileRequest>,
3723) -> impl IntoResponse {
3724    use mockforge_core::network_profiles::NetworkProfileCatalog;
3725
3726    let catalog = NetworkProfileCatalog::default();
3727    if let Some(profile) = catalog.get(&request.profile_name) {
3728        // Apply profile to server configuration if available
3729        // NetworkProfile contains latency and traffic_shaping configs
3730        if let Some(server_config) = &state.server_config {
3731            let mut config = server_config.write().await;
3732
3733            // Apply network profile's traffic shaping to core config
3734            use mockforge_core::config::NetworkShapingConfig;
3735
3736            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3737            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3738            // which has bandwidth and burst_loss fields
3739            let network_shaping = NetworkShapingConfig {
3740                enabled: profile.traffic_shaping.bandwidth.enabled
3741                    || profile.traffic_shaping.burst_loss.enabled,
3742                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3743                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3744                max_connections: 1000, // Default value
3745            };
3746
3747            // Update chaos config if it exists, or create it
3748            // Chaos config is in observability.chaos, not core.chaos
3749            if let Some(ref mut chaos) = config.observability.chaos {
3750                chaos.traffic_shaping = Some(network_shaping);
3751            } else {
3752                // Create minimal chaos config with traffic shaping
3753                use mockforge_core::config::ChaosEngConfig;
3754                config.observability.chaos = Some(ChaosEngConfig {
3755                    enabled: true,
3756                    latency: None,
3757                    fault_injection: None,
3758                    rate_limit: None,
3759                    traffic_shaping: Some(network_shaping),
3760                    scenario: None,
3761                });
3762            }
3763
3764            info!("Network profile '{}' applied to server configuration", request.profile_name);
3765        } else {
3766            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3767        }
3768
3769        // Also update chaos API state if available
3770        #[cfg(feature = "chaos")]
3771        {
3772            if let Some(chaos_state) = &state.chaos_api_state {
3773                use mockforge_chaos::config::TrafficShapingConfig;
3774
3775                let mut chaos_config = chaos_state.config.write().await;
3776                // Apply profile's traffic shaping to chaos API state
3777                let chaos_traffic_shaping = TrafficShapingConfig {
3778                    enabled: profile.traffic_shaping.bandwidth.enabled
3779                        || profile.traffic_shaping.burst_loss.enabled,
3780                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3781                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3782                    max_connections: 0,
3783                    connection_timeout_ms: 30000,
3784                };
3785                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3786                chaos_config.enabled = true; // Enable chaos when applying a profile
3787                drop(chaos_config);
3788                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3789            }
3790        }
3791
3792        Json(serde_json::json!({
3793            "success": true,
3794            "message": format!("Network profile '{}' applied", request.profile_name),
3795            "profile": {
3796                "name": profile.name,
3797                "description": profile.description,
3798            }
3799        }))
3800        .into_response()
3801    } else {
3802        (
3803            StatusCode::NOT_FOUND,
3804            Json(serde_json::json!({
3805                "error": "Profile not found",
3806                "message": format!("Network profile '{}' not found", request.profile_name)
3807            })),
3808        )
3809            .into_response()
3810    }
3811}
3812
3813/// Build the management API router with UI Builder support
3814pub fn management_router_with_ui_builder(
3815    state: ManagementState,
3816    server_config: mockforge_core::config::ServerConfig,
3817) -> Router {
3818    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3819
3820    // Create the base management router
3821    let management = management_router(state);
3822
3823    // Create UI Builder state and router
3824    let ui_builder_state = UIBuilderState::new(server_config);
3825    let ui_builder = create_ui_builder_router(ui_builder_state);
3826
3827    // Nest UI Builder under /ui-builder
3828    management.nest("/ui-builder", ui_builder)
3829}
3830
3831/// Build management router with spec import API
3832pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3833    use crate::spec_import::{spec_import_router, SpecImportState};
3834
3835    // Create base management router
3836    let management = management_router(state);
3837
3838    // Merge with spec import router
3839    Router::new()
3840        .merge(management)
3841        .merge(spec_import_router(SpecImportState::new()))
3842}
3843
3844#[cfg(test)]
3845mod tests {
3846    use super::*;
3847
3848    #[tokio::test]
3849    async fn test_create_and_get_mock() {
3850        let state = ManagementState::new(None, None, 3000);
3851
3852        let mock = MockConfig {
3853            id: "test-1".to_string(),
3854            name: "Test Mock".to_string(),
3855            method: "GET".to_string(),
3856            path: "/test".to_string(),
3857            response: MockResponse {
3858                body: serde_json::json!({"message": "test"}),
3859                headers: None,
3860            },
3861            enabled: true,
3862            latency_ms: None,
3863            status_code: Some(200),
3864            request_match: None,
3865            priority: None,
3866            scenario: None,
3867            required_scenario_state: None,
3868            new_scenario_state: None,
3869        };
3870
3871        // Create mock
3872        {
3873            let mut mocks = state.mocks.write().await;
3874            mocks.push(mock.clone());
3875        }
3876
3877        // Get mock
3878        let mocks = state.mocks.read().await;
3879        let found = mocks.iter().find(|m| m.id == "test-1");
3880        assert!(found.is_some());
3881        assert_eq!(found.unwrap().name, "Test Mock");
3882    }
3883
3884    #[tokio::test]
3885    async fn test_server_stats() {
3886        let state = ManagementState::new(None, None, 3000);
3887
3888        // Add some mocks
3889        {
3890            let mut mocks = state.mocks.write().await;
3891            mocks.push(MockConfig {
3892                id: "1".to_string(),
3893                name: "Mock 1".to_string(),
3894                method: "GET".to_string(),
3895                path: "/test1".to_string(),
3896                response: MockResponse {
3897                    body: serde_json::json!({}),
3898                    headers: None,
3899                },
3900                enabled: true,
3901                latency_ms: None,
3902                status_code: Some(200),
3903                request_match: None,
3904                priority: None,
3905                scenario: None,
3906                required_scenario_state: None,
3907                new_scenario_state: None,
3908            });
3909            mocks.push(MockConfig {
3910                id: "2".to_string(),
3911                name: "Mock 2".to_string(),
3912                method: "POST".to_string(),
3913                path: "/test2".to_string(),
3914                response: MockResponse {
3915                    body: serde_json::json!({}),
3916                    headers: None,
3917                },
3918                enabled: false,
3919                latency_ms: None,
3920                status_code: Some(201),
3921                request_match: None,
3922                priority: None,
3923                scenario: None,
3924                required_scenario_state: None,
3925                new_scenario_state: None,
3926            });
3927        }
3928
3929        let mocks = state.mocks.read().await;
3930        assert_eq!(mocks.len(), 2);
3931        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3932    }
3933}