mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json, Response,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if json_path.starts_with("$.") {
317        let path = &json_path[2..];
318        let parts: Vec<&str> = path.split('.').collect();
319
320        let mut current = json;
321        for part in parts {
322            if let Some(obj) = current.as_object() {
323                if let Some(value) = obj.get(part) {
324                    current = value;
325                } else {
326                    return false;
327                }
328            } else {
329                return false;
330            }
331        }
332        true
333    } else {
334        // For complex JSONPath expressions, would need a proper JSONPath library
335        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336        false
337    }
338}
339
340/// Evaluate a custom matcher expression
341fn evaluate_custom_matcher(
342    expression: &str,
343    method: &str,
344    path: &str,
345    headers: &std::collections::HashMap<String, String>,
346    query_params: &std::collections::HashMap<String, String>,
347    body: Option<&[u8]>,
348) -> bool {
349    use regex::Regex;
350
351    let expr = expression.trim();
352
353    // Handle equality expressions (field == "value")
354    if expr.contains("==") {
355        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356        if parts.len() != 2 {
357            return false;
358        }
359
360        let field = parts[0];
361        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363        match field {
364            "method" => method == expected_value,
365            "path" => path == expected_value,
366            _ if field.starts_with("headers.") => {
367                let header_name = &field[8..];
368                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369            }
370            _ if field.starts_with("query.") => {
371                let param_name = &field[6..];
372                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373            }
374            _ => false,
375        }
376    }
377    // Handle regex match expressions (field =~ "pattern")
378    else if expr.contains("=~") {
379        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380        if parts.len() != 2 {
381            return false;
382        }
383
384        let field = parts[0];
385        let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387        if let Ok(re) = Regex::new(pattern) {
388            match field {
389                "method" => re.is_match(method),
390                "path" => re.is_match(path),
391                _ if field.starts_with("headers.") => {
392                    let header_name = &field[8..];
393                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394                }
395                _ if field.starts_with("query.") => {
396                    let param_name = &field[6..];
397                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398                }
399                _ => false,
400            }
401        } else {
402            false
403        }
404    }
405    // Handle contains expressions (field contains "value")
406    else if expr.contains("contains") {
407        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408        if parts.len() != 2 {
409            return false;
410        }
411
412        let field = parts[0];
413        let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415        match field {
416            "path" => path.contains(search_value),
417            _ if field.starts_with("headers.") => {
418                let header_name = &field[8..];
419                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420            }
421            _ if field.starts_with("body") => {
422                if let Some(body_bytes) = body {
423                    let body_str = String::from_utf8_lossy(body_bytes);
424                    body_str.contains(search_value)
425                } else {
426                    false
427                }
428            }
429            _ => false,
430        }
431    } else {
432        // Unknown expression format
433        tracing::warn!("Unknown custom matcher expression format: {}", expr);
434        false
435    }
436}
437
438/// Server statistics
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441    /// Server uptime in seconds
442    pub uptime_seconds: u64,
443    /// Total number of requests processed
444    pub total_requests: u64,
445    /// Number of active mock configurations
446    pub active_mocks: usize,
447    /// Number of currently enabled mocks
448    pub enabled_mocks: usize,
449    /// Number of registered API routes
450    pub registered_routes: usize,
451}
452
453/// Server configuration info
454#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456    /// MockForge version string
457    pub version: String,
458    /// Server port number
459    pub port: u16,
460    /// Whether an OpenAPI spec is loaded
461    pub has_openapi_spec: bool,
462    /// Optional path to the OpenAPI spec file
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub spec_path: Option<String>,
465}
466
467/// Shared state for the management API
468#[derive(Clone)]
469pub struct ManagementState {
470    /// Collection of mock configurations
471    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472    /// Optional OpenAPI specification
473    pub spec: Option<Arc<OpenApiSpec>>,
474    /// Optional path to the OpenAPI spec file
475    pub spec_path: Option<String>,
476    /// Server port number
477    pub port: u16,
478    /// Server start time for uptime calculation
479    pub start_time: std::time::Instant,
480    /// Counter for total requests processed
481    pub request_counter: Arc<RwLock<u64>>,
482    /// Optional proxy configuration for migration pipeline
483    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484    /// Optional SMTP registry for email mocking
485    #[cfg(feature = "smtp")]
486    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487    /// Optional MQTT broker for message mocking
488    #[cfg(feature = "mqtt")]
489    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490    /// Optional Kafka broker for event streaming
491    #[cfg(feature = "kafka")]
492    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493    /// Broadcast channel for message events (MQTT & Kafka)
494    #[cfg(any(feature = "mqtt", feature = "kafka"))]
495    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496    /// State machine manager for scenario state machines
497    pub state_machine_manager:
498        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499    /// Optional WebSocket broadcast channel for real-time updates
500    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501    /// Lifecycle hook registry for extensibility
502    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503    /// Rule explanations storage (in-memory for now)
504    pub rule_explanations: Arc<
505        RwLock<
506            std::collections::HashMap<
507                String,
508                mockforge_core::intelligent_behavior::RuleExplanation,
509            >,
510        >,
511    >,
512}
513
514impl ManagementState {
515    /// Create a new management state
516    ///
517    /// # Arguments
518    /// * `spec` - Optional OpenAPI specification
519    /// * `spec_path` - Optional path to the OpenAPI spec file
520    /// * `port` - Server port number
521    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
522        Self {
523            mocks: Arc::new(RwLock::new(Vec::new())),
524            spec,
525            spec_path,
526            port,
527            start_time: std::time::Instant::now(),
528            request_counter: Arc::new(RwLock::new(0)),
529            proxy_config: None,
530            #[cfg(feature = "smtp")]
531            smtp_registry: None,
532            #[cfg(feature = "mqtt")]
533            mqtt_broker: None,
534            #[cfg(feature = "kafka")]
535            kafka_broker: None,
536            #[cfg(any(feature = "mqtt", feature = "kafka"))]
537            message_events: {
538                let (tx, _) = broadcast::channel(1000);
539                Arc::new(tx)
540            },
541            state_machine_manager: Arc::new(RwLock::new(
542                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
543            )),
544            ws_broadcast: None,
545            lifecycle_hooks: None,
546            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
547        }
548    }
549
550    /// Add lifecycle hook registry to management state
551    pub fn with_lifecycle_hooks(
552        mut self,
553        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
554    ) -> Self {
555        self.lifecycle_hooks = Some(hooks);
556        self
557    }
558
559    /// Add WebSocket broadcast channel to management state
560    pub fn with_ws_broadcast(
561        mut self,
562        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
563    ) -> Self {
564        self.ws_broadcast = Some(ws_broadcast);
565        self
566    }
567
568    /// Add proxy configuration to management state
569    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
570        self.proxy_config = Some(proxy_config);
571        self
572    }
573
574    #[cfg(feature = "smtp")]
575    /// Add SMTP registry to management state
576    pub fn with_smtp_registry(
577        mut self,
578        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
579    ) -> Self {
580        self.smtp_registry = Some(smtp_registry);
581        self
582    }
583
584    #[cfg(feature = "mqtt")]
585    /// Add MQTT broker to management state
586    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
587        self.mqtt_broker = Some(mqtt_broker);
588        self
589    }
590
591    #[cfg(feature = "kafka")]
592    /// Add Kafka broker to management state
593    pub fn with_kafka_broker(
594        mut self,
595        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
596    ) -> Self {
597        self.kafka_broker = Some(kafka_broker);
598        self
599    }
600}
601
602/// List all mocks
603async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
604    let mocks = state.mocks.read().await;
605    Json(serde_json::json!({
606        "mocks": *mocks,
607        "total": mocks.len(),
608        "enabled": mocks.iter().filter(|m| m.enabled).count()
609    }))
610}
611
612/// Get a specific mock by ID
613async fn get_mock(
614    State(state): State<ManagementState>,
615    Path(id): Path<String>,
616) -> Result<Json<MockConfig>, StatusCode> {
617    let mocks = state.mocks.read().await;
618    mocks
619        .iter()
620        .find(|m| m.id == id)
621        .cloned()
622        .map(Json)
623        .ok_or(StatusCode::NOT_FOUND)
624}
625
626/// Create a new mock
627async fn create_mock(
628    State(state): State<ManagementState>,
629    Json(mut mock): Json<MockConfig>,
630) -> Result<Json<MockConfig>, StatusCode> {
631    let mut mocks = state.mocks.write().await;
632
633    // Generate ID if not provided
634    if mock.id.is_empty() {
635        mock.id = uuid::Uuid::new_v4().to_string();
636    }
637
638    // Check for duplicate ID
639    if mocks.iter().any(|m| m.id == mock.id) {
640        return Err(StatusCode::CONFLICT);
641    }
642
643    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
644
645    // Invoke lifecycle hooks
646    if let Some(hooks) = &state.lifecycle_hooks {
647        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
648            id: mock.id.clone(),
649            name: mock.name.clone(),
650            config: serde_json::to_value(&mock).unwrap_or_default(),
651        };
652        hooks.invoke_mock_created(&event).await;
653    }
654
655    mocks.push(mock.clone());
656
657    // Broadcast WebSocket event
658    if let Some(tx) = &state.ws_broadcast {
659        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
660    }
661
662    Ok(Json(mock))
663}
664
665/// Update an existing mock
666async fn update_mock(
667    State(state): State<ManagementState>,
668    Path(id): Path<String>,
669    Json(updated_mock): Json<MockConfig>,
670) -> Result<Json<MockConfig>, StatusCode> {
671    let mut mocks = state.mocks.write().await;
672
673    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
674
675    // Get old mock for comparison
676    let old_mock = mocks[position].clone();
677
678    info!("Updating mock: {}", id);
679    mocks[position] = updated_mock.clone();
680
681    // Invoke lifecycle hooks
682    if let Some(hooks) = &state.lifecycle_hooks {
683        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
684            id: updated_mock.id.clone(),
685            name: updated_mock.name.clone(),
686            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
687        };
688        hooks.invoke_mock_updated(&event).await;
689
690        // Check if enabled state changed
691        if old_mock.enabled != updated_mock.enabled {
692            let state_event = if updated_mock.enabled {
693                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
694                    id: updated_mock.id.clone(),
695                }
696            } else {
697                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
698                    id: updated_mock.id.clone(),
699                }
700            };
701            hooks.invoke_mock_state_changed(&state_event).await;
702        }
703    }
704
705    // Broadcast WebSocket event
706    if let Some(tx) = &state.ws_broadcast {
707        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
708    }
709
710    Ok(Json(updated_mock))
711}
712
713/// Delete a mock
714async fn delete_mock(
715    State(state): State<ManagementState>,
716    Path(id): Path<String>,
717) -> Result<StatusCode, StatusCode> {
718    let mut mocks = state.mocks.write().await;
719
720    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
721
722    // Get mock info before deletion for lifecycle hooks
723    let deleted_mock = mocks[position].clone();
724
725    info!("Deleting mock: {}", id);
726    mocks.remove(position);
727
728    // Invoke lifecycle hooks
729    if let Some(hooks) = &state.lifecycle_hooks {
730        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
731            id: deleted_mock.id.clone(),
732            name: deleted_mock.name.clone(),
733        };
734        hooks.invoke_mock_deleted(&event).await;
735    }
736
737    // Broadcast WebSocket event
738    if let Some(tx) = &state.ws_broadcast {
739        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
740    }
741
742    Ok(StatusCode::NO_CONTENT)
743}
744
745/// Request to validate configuration
746#[derive(Debug, Deserialize)]
747pub struct ValidateConfigRequest {
748    /// Configuration to validate (as JSON)
749    pub config: serde_json::Value,
750    /// Format of the configuration ("json" or "yaml")
751    #[serde(default = "default_format")]
752    pub format: String,
753}
754
755fn default_format() -> String {
756    "json".to_string()
757}
758
759/// Validate configuration without applying it
760async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
761    use mockforge_core::config::ServerConfig;
762
763    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
764        "yaml" | "yml" => {
765            let yaml_str = match serde_json::to_string(&request.config) {
766                Ok(s) => s,
767                Err(e) => {
768                    return (
769                        StatusCode::BAD_REQUEST,
770                        Json(serde_json::json!({
771                            "valid": false,
772                            "error": format!("Failed to convert to string: {}", e),
773                            "message": "Configuration validation failed"
774                        })),
775                    )
776                        .into_response();
777                }
778            };
779            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
780        }
781        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
782    };
783
784    match config_result {
785        Ok(_) => Json(serde_json::json!({
786            "valid": true,
787            "message": "Configuration is valid"
788        }))
789        .into_response(),
790        Err(e) => (
791            StatusCode::BAD_REQUEST,
792            Json(serde_json::json!({
793                "valid": false,
794                "error": format!("Invalid configuration: {}", e),
795                "message": "Configuration validation failed"
796            })),
797        )
798            .into_response(),
799    }
800}
801
802/// Request for bulk configuration update
803#[derive(Debug, Deserialize)]
804pub struct BulkConfigUpdateRequest {
805    /// Partial configuration updates (only specified fields will be updated)
806    pub updates: serde_json::Value,
807}
808
809/// Bulk update configuration
810///
811/// This endpoint allows updating multiple configuration options at once.
812/// Only the specified fields in the updates object will be modified.
813///
814/// Note: This endpoint validates the configuration structure. Full runtime
815/// application of configuration changes would require architectural changes
816/// to store and apply ServerConfig in ManagementState. For now, this provides
817/// validation and acknowledgment of the update request.
818async fn bulk_update_config(
819    State(_state): State<ManagementState>,
820    Json(request): Json<BulkConfigUpdateRequest>,
821) -> impl IntoResponse {
822    // Validate the updates structure
823    if !request.updates.is_object() {
824        return (
825            StatusCode::BAD_REQUEST,
826            Json(serde_json::json!({
827                "error": "Invalid request",
828                "message": "Updates must be a JSON object"
829            })),
830        )
831            .into_response();
832    }
833
834    // Try to validate as partial ServerConfig
835    use mockforge_core::config::ServerConfig;
836
837    // Create a minimal valid config and try to merge updates
838    let base_config = ServerConfig::default();
839    let base_json = match serde_json::to_value(&base_config) {
840        Ok(v) => v,
841        Err(e) => {
842            return (
843                StatusCode::INTERNAL_SERVER_ERROR,
844                Json(serde_json::json!({
845                    "error": "Internal error",
846                    "message": format!("Failed to serialize base config: {}", e)
847                })),
848            )
849                .into_response();
850        }
851    };
852
853    // Merge updates into base config (simplified merge)
854    let mut merged = base_json.clone();
855    if let (Some(merged_obj), Some(updates_obj)) =
856        (merged.as_object_mut(), request.updates.as_object())
857    {
858        for (key, value) in updates_obj {
859            merged_obj.insert(key.clone(), value.clone());
860        }
861    }
862
863    // Validate the merged config
864    match serde_json::from_value::<ServerConfig>(merged) {
865        Ok(_) => {
866            // Config is valid
867            // TODO: Apply config to server when ServerConfig is stored in ManagementState
868            Json(serde_json::json!({
869                "success": true,
870                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState.",
871                "updates_received": request.updates,
872                "validated": true
873            }))
874            .into_response()
875        }
876        Err(e) => (
877            StatusCode::BAD_REQUEST,
878            Json(serde_json::json!({
879                "error": "Invalid configuration",
880                "message": format!("Configuration validation failed: {}", e),
881                "validated": false
882            })),
883        )
884            .into_response(),
885    }
886}
887
888/// Get server statistics
889async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
890    let mocks = state.mocks.read().await;
891    let request_count = *state.request_counter.read().await;
892
893    Json(ServerStats {
894        uptime_seconds: state.start_time.elapsed().as_secs(),
895        total_requests: request_count,
896        active_mocks: mocks.len(),
897        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
898        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
899    })
900}
901
902/// Get server configuration
903async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
904    Json(ServerConfig {
905        version: env!("CARGO_PKG_VERSION").to_string(),
906        port: state.port,
907        has_openapi_spec: state.spec.is_some(),
908        spec_path: state.spec_path.clone(),
909    })
910}
911
912/// Health check endpoint
913async fn health_check() -> Json<serde_json::Value> {
914    Json(serde_json::json!({
915        "status": "healthy",
916        "service": "mockforge-management",
917        "timestamp": chrono::Utc::now().to_rfc3339()
918    }))
919}
920
921/// Export format for mock configurations
922#[derive(Debug, Clone, Serialize, Deserialize)]
923#[serde(rename_all = "lowercase")]
924pub enum ExportFormat {
925    /// JSON format
926    Json,
927    /// YAML format
928    Yaml,
929}
930
931/// Export mocks in specified format
932async fn export_mocks(
933    State(state): State<ManagementState>,
934    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
935) -> Result<(StatusCode, String), StatusCode> {
936    let mocks = state.mocks.read().await;
937
938    let format = params
939        .get("format")
940        .map(|f| match f.as_str() {
941            "yaml" | "yml" => ExportFormat::Yaml,
942            _ => ExportFormat::Json,
943        })
944        .unwrap_or(ExportFormat::Json);
945
946    match format {
947        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
948            .map(|json| (StatusCode::OK, json))
949            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
950        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
951            .map(|yaml| (StatusCode::OK, yaml))
952            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
953    }
954}
955
956/// Import mocks from JSON/YAML
957async fn import_mocks(
958    State(state): State<ManagementState>,
959    Json(mocks): Json<Vec<MockConfig>>,
960) -> impl IntoResponse {
961    let mut current_mocks = state.mocks.write().await;
962    current_mocks.clear();
963    current_mocks.extend(mocks);
964    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
965}
966
967#[cfg(feature = "smtp")]
968/// List SMTP emails in mailbox
969async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
970    if let Some(ref smtp_registry) = state.smtp_registry {
971        match smtp_registry.get_emails() {
972            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
973            Err(e) => (
974                StatusCode::INTERNAL_SERVER_ERROR,
975                Json(serde_json::json!({
976                    "error": "Failed to retrieve emails",
977                    "message": e.to_string()
978                })),
979            ),
980        }
981    } else {
982        (
983            StatusCode::NOT_IMPLEMENTED,
984            Json(serde_json::json!({
985                "error": "SMTP mailbox management not available",
986                "message": "SMTP server is not enabled or registry not available."
987            })),
988        )
989    }
990}
991
992/// Get specific SMTP email
993#[cfg(feature = "smtp")]
994async fn get_smtp_email(
995    State(state): State<ManagementState>,
996    Path(id): Path<String>,
997) -> impl IntoResponse {
998    if let Some(ref smtp_registry) = state.smtp_registry {
999        match smtp_registry.get_email_by_id(&id) {
1000            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1001            Ok(None) => (
1002                StatusCode::NOT_FOUND,
1003                Json(serde_json::json!({
1004                    "error": "Email not found",
1005                    "id": id
1006                })),
1007            ),
1008            Err(e) => (
1009                StatusCode::INTERNAL_SERVER_ERROR,
1010                Json(serde_json::json!({
1011                    "error": "Failed to retrieve email",
1012                    "message": e.to_string()
1013                })),
1014            ),
1015        }
1016    } else {
1017        (
1018            StatusCode::NOT_IMPLEMENTED,
1019            Json(serde_json::json!({
1020                "error": "SMTP mailbox management not available",
1021                "message": "SMTP server is not enabled or registry not available."
1022            })),
1023        )
1024    }
1025}
1026
1027/// Clear SMTP mailbox
1028#[cfg(feature = "smtp")]
1029async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1030    if let Some(ref smtp_registry) = state.smtp_registry {
1031        match smtp_registry.clear_mailbox() {
1032            Ok(()) => (
1033                StatusCode::OK,
1034                Json(serde_json::json!({
1035                    "message": "Mailbox cleared successfully"
1036                })),
1037            ),
1038            Err(e) => (
1039                StatusCode::INTERNAL_SERVER_ERROR,
1040                Json(serde_json::json!({
1041                    "error": "Failed to clear mailbox",
1042                    "message": e.to_string()
1043                })),
1044            ),
1045        }
1046    } else {
1047        (
1048            StatusCode::NOT_IMPLEMENTED,
1049            Json(serde_json::json!({
1050                "error": "SMTP mailbox management not available",
1051                "message": "SMTP server is not enabled or registry not available."
1052            })),
1053        )
1054    }
1055}
1056
1057/// Export SMTP mailbox
1058#[cfg(feature = "smtp")]
1059async fn export_smtp_mailbox(
1060    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1061) -> impl IntoResponse {
1062    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1063    (
1064        StatusCode::NOT_IMPLEMENTED,
1065        Json(serde_json::json!({
1066            "error": "SMTP mailbox management not available via HTTP API",
1067            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1068            "requested_format": format
1069        })),
1070    )
1071}
1072
1073/// Search SMTP emails
1074#[cfg(feature = "smtp")]
1075async fn search_smtp_emails(
1076    State(state): State<ManagementState>,
1077    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1078) -> impl IntoResponse {
1079    if let Some(ref smtp_registry) = state.smtp_registry {
1080        let filters = EmailSearchFilters {
1081            sender: params.get("sender").cloned(),
1082            recipient: params.get("recipient").cloned(),
1083            subject: params.get("subject").cloned(),
1084            body: params.get("body").cloned(),
1085            since: params
1086                .get("since")
1087                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1088                .map(|dt| dt.with_timezone(&chrono::Utc)),
1089            until: params
1090                .get("until")
1091                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1092                .map(|dt| dt.with_timezone(&chrono::Utc)),
1093            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1094            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1095        };
1096
1097        match smtp_registry.search_emails(filters) {
1098            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1099            Err(e) => (
1100                StatusCode::INTERNAL_SERVER_ERROR,
1101                Json(serde_json::json!({
1102                    "error": "Failed to search emails",
1103                    "message": e.to_string()
1104                })),
1105            ),
1106        }
1107    } else {
1108        (
1109            StatusCode::NOT_IMPLEMENTED,
1110            Json(serde_json::json!({
1111                "error": "SMTP mailbox management not available",
1112                "message": "SMTP server is not enabled or registry not available."
1113            })),
1114        )
1115    }
1116}
1117
1118/// MQTT broker statistics
1119#[cfg(feature = "mqtt")]
1120#[derive(Debug, Clone, Serialize, Deserialize)]
1121pub struct MqttBrokerStats {
1122    /// Number of connected MQTT clients
1123    pub connected_clients: usize,
1124    /// Number of active MQTT topics
1125    pub active_topics: usize,
1126    /// Number of retained messages
1127    pub retained_messages: usize,
1128    /// Total number of subscriptions
1129    pub total_subscriptions: usize,
1130}
1131
1132/// MQTT management handlers
1133#[cfg(feature = "mqtt")]
1134async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1135    if let Some(broker) = &state.mqtt_broker {
1136        let connected_clients = broker.get_connected_clients().await.len();
1137        let active_topics = broker.get_active_topics().await.len();
1138        let stats = broker.get_topic_stats().await;
1139
1140        let broker_stats = MqttBrokerStats {
1141            connected_clients,
1142            active_topics,
1143            retained_messages: stats.retained_messages,
1144            total_subscriptions: stats.total_subscriptions,
1145        };
1146
1147        Json(broker_stats).into_response()
1148    } else {
1149        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1150    }
1151}
1152
1153#[cfg(feature = "mqtt")]
1154async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1155    if let Some(broker) = &state.mqtt_broker {
1156        let clients = broker.get_connected_clients().await;
1157        Json(serde_json::json!({
1158            "clients": clients
1159        }))
1160        .into_response()
1161    } else {
1162        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1163    }
1164}
1165
1166#[cfg(feature = "mqtt")]
1167async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1168    if let Some(broker) = &state.mqtt_broker {
1169        let topics = broker.get_active_topics().await;
1170        Json(serde_json::json!({
1171            "topics": topics
1172        }))
1173        .into_response()
1174    } else {
1175        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1176    }
1177}
1178
1179#[cfg(feature = "mqtt")]
1180async fn disconnect_mqtt_client(
1181    State(state): State<ManagementState>,
1182    Path(client_id): Path<String>,
1183) -> impl IntoResponse {
1184    if let Some(broker) = &state.mqtt_broker {
1185        match broker.disconnect_client(&client_id).await {
1186            Ok(_) => {
1187                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1188            }
1189            Err(e) => {
1190                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1191                    .into_response()
1192            }
1193        }
1194    } else {
1195        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1196    }
1197}
1198
1199// ========== MQTT Publish Handler ==========
1200
1201#[cfg(feature = "mqtt")]
1202/// Request to publish a single MQTT message
1203#[derive(Debug, Deserialize)]
1204pub struct MqttPublishRequest {
1205    /// Topic to publish to
1206    pub topic: String,
1207    /// Message payload (string or JSON)
1208    pub payload: String,
1209    /// QoS level (0, 1, or 2)
1210    #[serde(default = "default_qos")]
1211    pub qos: u8,
1212    /// Whether to retain the message
1213    #[serde(default)]
1214    pub retain: bool,
1215}
1216
1217#[cfg(feature = "mqtt")]
1218fn default_qos() -> u8 {
1219    0
1220}
1221
1222#[cfg(feature = "mqtt")]
1223/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1224async fn publish_mqtt_message_handler(
1225    State(state): State<ManagementState>,
1226    Json(request): Json<serde_json::Value>,
1227) -> impl IntoResponse {
1228    // Extract fields from JSON manually
1229    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1230    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1231    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1232    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1233
1234    if topic.is_none() || payload.is_none() {
1235        return (
1236            StatusCode::BAD_REQUEST,
1237            Json(serde_json::json!({
1238                "error": "Invalid request",
1239                "message": "Missing required fields: topic and payload"
1240            })),
1241        );
1242    }
1243
1244    let topic = topic.unwrap();
1245    let payload = payload.unwrap();
1246
1247    if let Some(broker) = &state.mqtt_broker {
1248        // Validate QoS
1249        if qos > 2 {
1250            return (
1251                StatusCode::BAD_REQUEST,
1252                Json(serde_json::json!({
1253                    "error": "Invalid QoS",
1254                    "message": "QoS must be 0, 1, or 2"
1255                })),
1256            );
1257        }
1258
1259        // Convert payload to bytes
1260        let payload_bytes = payload.as_bytes().to_vec();
1261        let client_id = "mockforge-management-api".to_string();
1262
1263        let publish_result = broker
1264            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1265            .await
1266            .map_err(|e| format!("{}", e));
1267
1268        match publish_result {
1269            Ok(_) => {
1270                // Emit message event for real-time monitoring
1271                let event = MessageEvent::Mqtt(MqttMessageEvent {
1272                    topic: topic.clone(),
1273                    payload: payload.clone(),
1274                    qos,
1275                    retain,
1276                    timestamp: chrono::Utc::now().to_rfc3339(),
1277                });
1278                let _ = state.message_events.send(event);
1279
1280                (
1281                    StatusCode::OK,
1282                    Json(serde_json::json!({
1283                        "success": true,
1284                        "message": format!("Message published to topic '{}'", topic),
1285                        "topic": topic,
1286                        "qos": qos,
1287                        "retain": retain
1288                    })),
1289                )
1290            }
1291            Err(error_msg) => (
1292                StatusCode::INTERNAL_SERVER_ERROR,
1293                Json(serde_json::json!({
1294                    "error": "Failed to publish message",
1295                    "message": error_msg
1296                })),
1297            ),
1298        }
1299    } else {
1300        (
1301            StatusCode::SERVICE_UNAVAILABLE,
1302            Json(serde_json::json!({
1303                "error": "MQTT broker not available",
1304                "message": "MQTT broker is not enabled or not available."
1305            })),
1306        )
1307    }
1308}
1309
1310#[cfg(not(feature = "mqtt"))]
1311/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1312async fn publish_mqtt_message_handler(
1313    State(_state): State<ManagementState>,
1314    Json(_request): Json<serde_json::Value>,
1315) -> impl IntoResponse {
1316    (
1317        StatusCode::SERVICE_UNAVAILABLE,
1318        Json(serde_json::json!({
1319            "error": "MQTT feature not enabled",
1320            "message": "MQTT support is not compiled into this build"
1321        })),
1322    )
1323}
1324
1325#[cfg(feature = "mqtt")]
1326/// Request to publish multiple MQTT messages
1327#[derive(Debug, Deserialize)]
1328pub struct MqttBatchPublishRequest {
1329    /// List of messages to publish
1330    pub messages: Vec<MqttPublishRequest>,
1331    /// Delay between messages in milliseconds
1332    #[serde(default = "default_delay")]
1333    pub delay_ms: u64,
1334}
1335
1336#[cfg(feature = "mqtt")]
1337fn default_delay() -> u64 {
1338    100
1339}
1340
1341#[cfg(feature = "mqtt")]
1342/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1343async fn publish_mqtt_batch_handler(
1344    State(state): State<ManagementState>,
1345    Json(request): Json<serde_json::Value>,
1346) -> impl IntoResponse {
1347    // Extract fields from JSON manually
1348    let messages_json = request.get("messages").and_then(|v| v.as_array());
1349    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1350
1351    if messages_json.is_none() {
1352        return (
1353            StatusCode::BAD_REQUEST,
1354            Json(serde_json::json!({
1355                "error": "Invalid request",
1356                "message": "Missing required field: messages"
1357            })),
1358        );
1359    }
1360
1361    let messages_json = messages_json.unwrap();
1362
1363    if let Some(broker) = &state.mqtt_broker {
1364        if messages_json.is_empty() {
1365            return (
1366                StatusCode::BAD_REQUEST,
1367                Json(serde_json::json!({
1368                    "error": "Empty batch",
1369                    "message": "At least one message is required"
1370                })),
1371            );
1372        }
1373
1374        let mut results = Vec::new();
1375        let client_id = "mockforge-management-api".to_string();
1376
1377        for (index, msg_json) in messages_json.iter().enumerate() {
1378            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1379            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1380            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1381            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1382
1383            if topic.is_none() || payload.is_none() {
1384                results.push(serde_json::json!({
1385                    "index": index,
1386                    "success": false,
1387                    "error": "Missing required fields: topic and payload"
1388                }));
1389                continue;
1390            }
1391
1392            let topic = topic.unwrap();
1393            let payload = payload.unwrap();
1394
1395            // Validate QoS
1396            if qos > 2 {
1397                results.push(serde_json::json!({
1398                    "index": index,
1399                    "success": false,
1400                    "error": "Invalid QoS (must be 0, 1, or 2)"
1401                }));
1402                continue;
1403            }
1404
1405            // Convert payload to bytes
1406            let payload_bytes = payload.as_bytes().to_vec();
1407
1408            let publish_result = broker
1409                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1410                .await
1411                .map_err(|e| format!("{}", e));
1412
1413            match publish_result {
1414                Ok(_) => {
1415                    // Emit message event
1416                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1417                        topic: topic.clone(),
1418                        payload: payload.clone(),
1419                        qos,
1420                        retain,
1421                        timestamp: chrono::Utc::now().to_rfc3339(),
1422                    });
1423                    let _ = state.message_events.send(event);
1424
1425                    results.push(serde_json::json!({
1426                        "index": index,
1427                        "success": true,
1428                        "topic": topic,
1429                        "qos": qos
1430                    }));
1431                }
1432                Err(error_msg) => {
1433                    results.push(serde_json::json!({
1434                        "index": index,
1435                        "success": false,
1436                        "error": error_msg
1437                    }));
1438                }
1439            }
1440
1441            // Add delay between messages (except for the last one)
1442            if index < messages_json.len() - 1 && delay_ms > 0 {
1443                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1444            }
1445        }
1446
1447        let success_count =
1448            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1449
1450        (
1451            StatusCode::OK,
1452            Json(serde_json::json!({
1453                "success": true,
1454                "total": messages_json.len(),
1455                "succeeded": success_count,
1456                "failed": messages_json.len() - success_count,
1457                "results": results
1458            })),
1459        )
1460    } else {
1461        (
1462            StatusCode::SERVICE_UNAVAILABLE,
1463            Json(serde_json::json!({
1464                "error": "MQTT broker not available",
1465                "message": "MQTT broker is not enabled or not available."
1466            })),
1467        )
1468    }
1469}
1470
1471#[cfg(not(feature = "mqtt"))]
1472/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1473async fn publish_mqtt_batch_handler(
1474    State(_state): State<ManagementState>,
1475    Json(_request): Json<serde_json::Value>,
1476) -> impl IntoResponse {
1477    (
1478        StatusCode::SERVICE_UNAVAILABLE,
1479        Json(serde_json::json!({
1480            "error": "MQTT feature not enabled",
1481            "message": "MQTT support is not compiled into this build"
1482        })),
1483    )
1484}
1485
1486// Migration pipeline handlers
1487
1488/// Request to set migration mode
1489#[derive(Debug, Deserialize)]
1490struct SetMigrationModeRequest {
1491    mode: String,
1492}
1493
1494/// Get all migration routes
1495async fn get_migration_routes(
1496    State(state): State<ManagementState>,
1497) -> Result<Json<serde_json::Value>, StatusCode> {
1498    let proxy_config = match &state.proxy_config {
1499        Some(config) => config,
1500        None => {
1501            return Ok(Json(serde_json::json!({
1502                "error": "Migration not configured. Proxy config not available."
1503            })));
1504        }
1505    };
1506
1507    let config = proxy_config.read().await;
1508    let routes = config.get_migration_routes();
1509
1510    Ok(Json(serde_json::json!({
1511        "routes": routes
1512    })))
1513}
1514
1515/// Toggle a route's migration mode
1516async fn toggle_route_migration(
1517    State(state): State<ManagementState>,
1518    Path(pattern): Path<String>,
1519) -> Result<Json<serde_json::Value>, StatusCode> {
1520    let proxy_config = match &state.proxy_config {
1521        Some(config) => config,
1522        None => {
1523            return Ok(Json(serde_json::json!({
1524                "error": "Migration not configured. Proxy config not available."
1525            })));
1526        }
1527    };
1528
1529    let mut config = proxy_config.write().await;
1530    let new_mode = match config.toggle_route_migration(&pattern) {
1531        Some(mode) => mode,
1532        None => {
1533            return Ok(Json(serde_json::json!({
1534                "error": format!("Route pattern not found: {}", pattern)
1535            })));
1536        }
1537    };
1538
1539    Ok(Json(serde_json::json!({
1540        "pattern": pattern,
1541        "mode": format!("{:?}", new_mode).to_lowercase()
1542    })))
1543}
1544
1545/// Set a route's migration mode explicitly
1546async fn set_route_migration_mode(
1547    State(state): State<ManagementState>,
1548    Path(pattern): Path<String>,
1549    Json(request): Json<SetMigrationModeRequest>,
1550) -> Result<Json<serde_json::Value>, StatusCode> {
1551    let proxy_config = match &state.proxy_config {
1552        Some(config) => config,
1553        None => {
1554            return Ok(Json(serde_json::json!({
1555                "error": "Migration not configured. Proxy config not available."
1556            })));
1557        }
1558    };
1559
1560    use mockforge_core::proxy::config::MigrationMode;
1561    let mode = match request.mode.to_lowercase().as_str() {
1562        "mock" => MigrationMode::Mock,
1563        "shadow" => MigrationMode::Shadow,
1564        "real" => MigrationMode::Real,
1565        "auto" => MigrationMode::Auto,
1566        _ => {
1567            return Ok(Json(serde_json::json!({
1568                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1569            })));
1570        }
1571    };
1572
1573    let mut config = proxy_config.write().await;
1574    let updated = config.update_rule_migration_mode(&pattern, mode);
1575
1576    if !updated {
1577        return Ok(Json(serde_json::json!({
1578            "error": format!("Route pattern not found: {}", pattern)
1579        })));
1580    }
1581
1582    Ok(Json(serde_json::json!({
1583        "pattern": pattern,
1584        "mode": format!("{:?}", mode).to_lowercase()
1585    })))
1586}
1587
1588/// Toggle a group's migration mode
1589async fn toggle_group_migration(
1590    State(state): State<ManagementState>,
1591    Path(group): Path<String>,
1592) -> Result<Json<serde_json::Value>, StatusCode> {
1593    let proxy_config = match &state.proxy_config {
1594        Some(config) => config,
1595        None => {
1596            return Ok(Json(serde_json::json!({
1597                "error": "Migration not configured. Proxy config not available."
1598            })));
1599        }
1600    };
1601
1602    let mut config = proxy_config.write().await;
1603    let new_mode = config.toggle_group_migration(&group);
1604
1605    Ok(Json(serde_json::json!({
1606        "group": group,
1607        "mode": format!("{:?}", new_mode).to_lowercase()
1608    })))
1609}
1610
1611/// Set a group's migration mode explicitly
1612async fn set_group_migration_mode(
1613    State(state): State<ManagementState>,
1614    Path(group): Path<String>,
1615    Json(request): Json<SetMigrationModeRequest>,
1616) -> Result<Json<serde_json::Value>, StatusCode> {
1617    let proxy_config = match &state.proxy_config {
1618        Some(config) => config,
1619        None => {
1620            return Ok(Json(serde_json::json!({
1621                "error": "Migration not configured. Proxy config not available."
1622            })));
1623        }
1624    };
1625
1626    use mockforge_core::proxy::config::MigrationMode;
1627    let mode = match request.mode.to_lowercase().as_str() {
1628        "mock" => MigrationMode::Mock,
1629        "shadow" => MigrationMode::Shadow,
1630        "real" => MigrationMode::Real,
1631        "auto" => MigrationMode::Auto,
1632        _ => {
1633            return Ok(Json(serde_json::json!({
1634                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1635            })));
1636        }
1637    };
1638
1639    let mut config = proxy_config.write().await;
1640    config.update_group_migration_mode(&group, mode);
1641
1642    Ok(Json(serde_json::json!({
1643        "group": group,
1644        "mode": format!("{:?}", mode).to_lowercase()
1645    })))
1646}
1647
1648/// Get all migration groups
1649async fn get_migration_groups(
1650    State(state): State<ManagementState>,
1651) -> Result<Json<serde_json::Value>, StatusCode> {
1652    let proxy_config = match &state.proxy_config {
1653        Some(config) => config,
1654        None => {
1655            return Ok(Json(serde_json::json!({
1656                "error": "Migration not configured. Proxy config not available."
1657            })));
1658        }
1659    };
1660
1661    let config = proxy_config.read().await;
1662    let groups = config.get_migration_groups();
1663
1664    // Convert to JSON-serializable format
1665    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1666        .into_iter()
1667        .map(|(name, info)| {
1668            (
1669                name,
1670                serde_json::json!({
1671                    "name": info.name,
1672                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1673                    "route_count": info.route_count
1674                }),
1675            )
1676        })
1677        .collect();
1678
1679    Ok(Json(serde_json::json!(groups_json)))
1680}
1681
1682/// Get overall migration status
1683async fn get_migration_status(
1684    State(state): State<ManagementState>,
1685) -> Result<Json<serde_json::Value>, StatusCode> {
1686    let proxy_config = match &state.proxy_config {
1687        Some(config) => config,
1688        None => {
1689            return Ok(Json(serde_json::json!({
1690                "error": "Migration not configured. Proxy config not available."
1691            })));
1692        }
1693    };
1694
1695    let config = proxy_config.read().await;
1696    let routes = config.get_migration_routes();
1697    let groups = config.get_migration_groups();
1698
1699    let mut mock_count = 0;
1700    let mut shadow_count = 0;
1701    let mut real_count = 0;
1702    let mut auto_count = 0;
1703
1704    for route in &routes {
1705        match route.migration_mode {
1706            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1707            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1708            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1709            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1710        }
1711    }
1712
1713    Ok(Json(serde_json::json!({
1714        "total_routes": routes.len(),
1715        "mock_routes": mock_count,
1716        "shadow_routes": shadow_count,
1717        "real_routes": real_count,
1718        "auto_routes": auto_count,
1719        "total_groups": groups.len(),
1720        "migration_enabled": config.migration_enabled
1721    })))
1722}
1723
1724// ========== Proxy Replacement Rules Management ==========
1725
1726/// Request body for creating/updating proxy replacement rules
1727#[derive(Debug, Deserialize, Serialize)]
1728pub struct ProxyRuleRequest {
1729    /// URL pattern to match (supports wildcards like "/api/users/*")
1730    pub pattern: String,
1731    /// Rule type: "request" or "response"
1732    #[serde(rename = "type")]
1733    pub rule_type: String,
1734    /// Optional status code filter for response rules
1735    #[serde(default)]
1736    pub status_codes: Vec<u16>,
1737    /// Body transformations to apply
1738    pub body_transforms: Vec<BodyTransformRequest>,
1739    /// Whether this rule is enabled
1740    #[serde(default = "default_true")]
1741    pub enabled: bool,
1742}
1743
1744/// Request body for individual body transformations
1745#[derive(Debug, Deserialize, Serialize)]
1746pub struct BodyTransformRequest {
1747    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1748    pub path: String,
1749    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1750    pub replace: String,
1751    /// Operation to perform: "replace", "add", or "remove"
1752    #[serde(default)]
1753    pub operation: String,
1754}
1755
1756/// Response format for proxy rules
1757#[derive(Debug, Serialize)]
1758pub struct ProxyRuleResponse {
1759    /// Rule ID (index in the array)
1760    pub id: usize,
1761    /// URL pattern
1762    pub pattern: String,
1763    /// Rule type
1764    #[serde(rename = "type")]
1765    pub rule_type: String,
1766    /// Status codes (for response rules)
1767    pub status_codes: Vec<u16>,
1768    /// Body transformations
1769    pub body_transforms: Vec<BodyTransformRequest>,
1770    /// Whether enabled
1771    pub enabled: bool,
1772}
1773
1774/// List all proxy replacement rules
1775async fn list_proxy_rules(
1776    State(state): State<ManagementState>,
1777) -> Result<Json<serde_json::Value>, StatusCode> {
1778    let proxy_config = match &state.proxy_config {
1779        Some(config) => config,
1780        None => {
1781            return Ok(Json(serde_json::json!({
1782                "error": "Proxy not configured. Proxy config not available."
1783            })));
1784        }
1785    };
1786
1787    let config = proxy_config.read().await;
1788
1789    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1790
1791    // Add request replacement rules
1792    for (idx, rule) in config.request_replacements.iter().enumerate() {
1793        rules.push(ProxyRuleResponse {
1794            id: idx,
1795            pattern: rule.pattern.clone(),
1796            rule_type: "request".to_string(),
1797            status_codes: Vec::new(),
1798            body_transforms: rule
1799                .body_transforms
1800                .iter()
1801                .map(|t| BodyTransformRequest {
1802                    path: t.path.clone(),
1803                    replace: t.replace.clone(),
1804                    operation: format!("{:?}", t.operation).to_lowercase(),
1805                })
1806                .collect(),
1807            enabled: rule.enabled,
1808        });
1809    }
1810
1811    // Add response replacement rules
1812    let request_count = config.request_replacements.len();
1813    for (idx, rule) in config.response_replacements.iter().enumerate() {
1814        rules.push(ProxyRuleResponse {
1815            id: request_count + idx,
1816            pattern: rule.pattern.clone(),
1817            rule_type: "response".to_string(),
1818            status_codes: rule.status_codes.clone(),
1819            body_transforms: rule
1820                .body_transforms
1821                .iter()
1822                .map(|t| BodyTransformRequest {
1823                    path: t.path.clone(),
1824                    replace: t.replace.clone(),
1825                    operation: format!("{:?}", t.operation).to_lowercase(),
1826                })
1827                .collect(),
1828            enabled: rule.enabled,
1829        });
1830    }
1831
1832    Ok(Json(serde_json::json!({
1833        "rules": rules
1834    })))
1835}
1836
1837/// Create a new proxy replacement rule
1838async fn create_proxy_rule(
1839    State(state): State<ManagementState>,
1840    Json(request): Json<ProxyRuleRequest>,
1841) -> Result<Json<serde_json::Value>, StatusCode> {
1842    let proxy_config = match &state.proxy_config {
1843        Some(config) => config,
1844        None => {
1845            return Ok(Json(serde_json::json!({
1846                "error": "Proxy not configured. Proxy config not available."
1847            })));
1848        }
1849    };
1850
1851    // Validate request
1852    if request.body_transforms.is_empty() {
1853        return Ok(Json(serde_json::json!({
1854            "error": "At least one body transform is required"
1855        })));
1856    }
1857
1858    let body_transforms: Vec<BodyTransform> = request
1859        .body_transforms
1860        .iter()
1861        .map(|t| {
1862            let op = match t.operation.as_str() {
1863                "replace" => TransformOperation::Replace,
1864                "add" => TransformOperation::Add,
1865                "remove" => TransformOperation::Remove,
1866                _ => TransformOperation::Replace,
1867            };
1868            BodyTransform {
1869                path: t.path.clone(),
1870                replace: t.replace.clone(),
1871                operation: op,
1872            }
1873        })
1874        .collect();
1875
1876    let new_rule = BodyTransformRule {
1877        pattern: request.pattern.clone(),
1878        status_codes: request.status_codes.clone(),
1879        body_transforms,
1880        enabled: request.enabled,
1881    };
1882
1883    let mut config = proxy_config.write().await;
1884
1885    let rule_id = if request.rule_type == "request" {
1886        config.request_replacements.push(new_rule);
1887        config.request_replacements.len() - 1
1888    } else if request.rule_type == "response" {
1889        config.response_replacements.push(new_rule);
1890        config.request_replacements.len() + config.response_replacements.len() - 1
1891    } else {
1892        return Ok(Json(serde_json::json!({
1893            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1894        })));
1895    };
1896
1897    Ok(Json(serde_json::json!({
1898        "id": rule_id,
1899        "message": "Rule created successfully"
1900    })))
1901}
1902
1903/// Get a specific proxy replacement rule
1904async fn get_proxy_rule(
1905    State(state): State<ManagementState>,
1906    Path(id): Path<String>,
1907) -> Result<Json<serde_json::Value>, StatusCode> {
1908    let proxy_config = match &state.proxy_config {
1909        Some(config) => config,
1910        None => {
1911            return Ok(Json(serde_json::json!({
1912                "error": "Proxy not configured. Proxy config not available."
1913            })));
1914        }
1915    };
1916
1917    let config = proxy_config.read().await;
1918    let rule_id: usize = match id.parse() {
1919        Ok(id) => id,
1920        Err(_) => {
1921            return Ok(Json(serde_json::json!({
1922                "error": format!("Invalid rule ID: {}", id)
1923            })));
1924        }
1925    };
1926
1927    let request_count = config.request_replacements.len();
1928
1929    if rule_id < request_count {
1930        // Request rule
1931        let rule = &config.request_replacements[rule_id];
1932        Ok(Json(serde_json::json!({
1933            "id": rule_id,
1934            "pattern": rule.pattern,
1935            "type": "request",
1936            "status_codes": [],
1937            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1938                "path": t.path,
1939                "replace": t.replace,
1940                "operation": format!("{:?}", t.operation).to_lowercase()
1941            })).collect::<Vec<_>>(),
1942            "enabled": rule.enabled
1943        })))
1944    } else if rule_id < request_count + config.response_replacements.len() {
1945        // Response rule
1946        let response_idx = rule_id - request_count;
1947        let rule = &config.response_replacements[response_idx];
1948        Ok(Json(serde_json::json!({
1949            "id": rule_id,
1950            "pattern": rule.pattern,
1951            "type": "response",
1952            "status_codes": rule.status_codes,
1953            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1954                "path": t.path,
1955                "replace": t.replace,
1956                "operation": format!("{:?}", t.operation).to_lowercase()
1957            })).collect::<Vec<_>>(),
1958            "enabled": rule.enabled
1959        })))
1960    } else {
1961        Ok(Json(serde_json::json!({
1962            "error": format!("Rule ID {} not found", rule_id)
1963        })))
1964    }
1965}
1966
1967/// Update a proxy replacement rule
1968async fn update_proxy_rule(
1969    State(state): State<ManagementState>,
1970    Path(id): Path<String>,
1971    Json(request): Json<ProxyRuleRequest>,
1972) -> Result<Json<serde_json::Value>, StatusCode> {
1973    let proxy_config = match &state.proxy_config {
1974        Some(config) => config,
1975        None => {
1976            return Ok(Json(serde_json::json!({
1977                "error": "Proxy not configured. Proxy config not available."
1978            })));
1979        }
1980    };
1981
1982    let mut config = proxy_config.write().await;
1983    let rule_id: usize = match id.parse() {
1984        Ok(id) => id,
1985        Err(_) => {
1986            return Ok(Json(serde_json::json!({
1987                "error": format!("Invalid rule ID: {}", id)
1988            })));
1989        }
1990    };
1991
1992    let body_transforms: Vec<BodyTransform> = request
1993        .body_transforms
1994        .iter()
1995        .map(|t| {
1996            let op = match t.operation.as_str() {
1997                "replace" => TransformOperation::Replace,
1998                "add" => TransformOperation::Add,
1999                "remove" => TransformOperation::Remove,
2000                _ => TransformOperation::Replace,
2001            };
2002            BodyTransform {
2003                path: t.path.clone(),
2004                replace: t.replace.clone(),
2005                operation: op,
2006            }
2007        })
2008        .collect();
2009
2010    let updated_rule = BodyTransformRule {
2011        pattern: request.pattern.clone(),
2012        status_codes: request.status_codes.clone(),
2013        body_transforms,
2014        enabled: request.enabled,
2015    };
2016
2017    let request_count = config.request_replacements.len();
2018
2019    if rule_id < request_count {
2020        // Update request rule
2021        config.request_replacements[rule_id] = updated_rule;
2022    } else if rule_id < request_count + config.response_replacements.len() {
2023        // Update response rule
2024        let response_idx = rule_id - request_count;
2025        config.response_replacements[response_idx] = updated_rule;
2026    } else {
2027        return Ok(Json(serde_json::json!({
2028            "error": format!("Rule ID {} not found", rule_id)
2029        })));
2030    }
2031
2032    Ok(Json(serde_json::json!({
2033        "id": rule_id,
2034        "message": "Rule updated successfully"
2035    })))
2036}
2037
2038/// Delete a proxy replacement rule
2039async fn delete_proxy_rule(
2040    State(state): State<ManagementState>,
2041    Path(id): Path<String>,
2042) -> Result<Json<serde_json::Value>, StatusCode> {
2043    let proxy_config = match &state.proxy_config {
2044        Some(config) => config,
2045        None => {
2046            return Ok(Json(serde_json::json!({
2047                "error": "Proxy not configured. Proxy config not available."
2048            })));
2049        }
2050    };
2051
2052    let mut config = proxy_config.write().await;
2053    let rule_id: usize = match id.parse() {
2054        Ok(id) => id,
2055        Err(_) => {
2056            return Ok(Json(serde_json::json!({
2057                "error": format!("Invalid rule ID: {}", id)
2058            })));
2059        }
2060    };
2061
2062    let request_count = config.request_replacements.len();
2063
2064    if rule_id < request_count {
2065        // Delete request rule
2066        config.request_replacements.remove(rule_id);
2067    } else if rule_id < request_count + config.response_replacements.len() {
2068        // Delete response rule
2069        let response_idx = rule_id - request_count;
2070        config.response_replacements.remove(response_idx);
2071    } else {
2072        return Ok(Json(serde_json::json!({
2073            "error": format!("Rule ID {} not found", rule_id)
2074        })));
2075    }
2076
2077    Ok(Json(serde_json::json!({
2078        "id": rule_id,
2079        "message": "Rule deleted successfully"
2080    })))
2081}
2082
2083/// Get recent intercepted requests/responses for inspection
2084/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2085async fn get_proxy_inspect(
2086    State(_state): State<ManagementState>,
2087    Query(params): Query<std::collections::HashMap<String, String>>,
2088) -> Result<Json<serde_json::Value>, StatusCode> {
2089    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2090
2091    // TODO: In a full implementation, this would return actual intercepted requests/responses
2092    // For now, return a placeholder response
2093    Ok(Json(serde_json::json!({
2094        "requests": [],
2095        "responses": [],
2096        "limit": limit,
2097        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic in a future version."
2098    })))
2099}
2100
2101/// Build the management API router
2102pub fn management_router(state: ManagementState) -> Router {
2103    let router = Router::new()
2104        .route("/health", get(health_check))
2105        .route("/stats", get(get_stats))
2106        .route("/config", get(get_config))
2107        .route("/config/validate", post(validate_config))
2108        .route("/config/bulk", post(bulk_update_config))
2109        .route("/mocks", get(list_mocks))
2110        .route("/mocks", post(create_mock))
2111        .route("/mocks/{id}", get(get_mock))
2112        .route("/mocks/{id}", put(update_mock))
2113        .route("/mocks/{id}", delete(delete_mock))
2114        .route("/export", get(export_mocks))
2115        .route("/import", post(import_mocks));
2116
2117    #[cfg(feature = "smtp")]
2118    let router = router
2119        .route("/smtp/mailbox", get(list_smtp_emails))
2120        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2121        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2122        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2123        .route("/smtp/mailbox/search", get(search_smtp_emails));
2124
2125    #[cfg(not(feature = "smtp"))]
2126    let router = router;
2127
2128    // MQTT routes
2129    #[cfg(feature = "mqtt")]
2130    let router = router
2131        .route("/mqtt/stats", get(get_mqtt_stats))
2132        .route("/mqtt/clients", get(get_mqtt_clients))
2133        .route("/mqtt/topics", get(get_mqtt_topics))
2134        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2135        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2136        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2137        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2138
2139    #[cfg(not(feature = "mqtt"))]
2140    let router = router
2141        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2142        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2143
2144    #[cfg(feature = "kafka")]
2145    let router = router
2146        .route("/kafka/stats", get(get_kafka_stats))
2147        .route("/kafka/topics", get(get_kafka_topics))
2148        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2149        .route("/kafka/groups", get(get_kafka_groups))
2150        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2151        .route("/kafka/produce", post(produce_kafka_message))
2152        .route("/kafka/produce/batch", post(produce_kafka_batch))
2153        .route("/kafka/messages/stream", get(kafka_messages_stream));
2154
2155    #[cfg(not(feature = "kafka"))]
2156    let router = router;
2157
2158    // Migration pipeline routes
2159    let router = router
2160        .route("/migration/routes", get(get_migration_routes))
2161        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2162        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2163        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2164        .route("/migration/groups/{group}", put(set_group_migration_mode))
2165        .route("/migration/groups", get(get_migration_groups))
2166        .route("/migration/status", get(get_migration_status));
2167
2168    // Proxy replacement rules routes
2169    let router = router
2170        .route("/proxy/rules", get(list_proxy_rules))
2171        .route("/proxy/rules", post(create_proxy_rule))
2172        .route("/proxy/rules/{id}", get(get_proxy_rule))
2173        .route("/proxy/rules/{id}", put(update_proxy_rule))
2174        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2175        .route("/proxy/inspect", get(get_proxy_inspect));
2176
2177    // AI-powered features
2178    let router = router
2179        .route("/ai/generate-spec", post(generate_ai_spec))
2180        .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2181        .route("/mockai/learn", post(learn_from_examples))
2182        .route("/mockai/rules/explanations", get(list_rule_explanations))
2183        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2184        .route("/chaos/config", get(get_chaos_config))
2185        .route("/chaos/config", post(update_chaos_config))
2186        .route("/network/profiles", get(list_network_profiles))
2187        .route("/network/profile/apply", post(apply_network_profile));
2188
2189    // State machine API routes
2190    let router =
2191        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2192
2193    router.with_state(state)
2194}
2195
2196#[cfg(feature = "kafka")]
2197#[derive(Debug, Clone, Serialize, Deserialize)]
2198pub struct KafkaBrokerStats {
2199    /// Number of topics
2200    pub topics: usize,
2201    /// Total number of partitions
2202    pub partitions: usize,
2203    /// Number of consumer groups
2204    pub consumer_groups: usize,
2205    /// Total messages produced
2206    pub messages_produced: u64,
2207    /// Total messages consumed
2208    pub messages_consumed: u64,
2209}
2210
2211#[cfg(feature = "kafka")]
2212#[derive(Debug, Clone, Serialize, Deserialize)]
2213pub struct KafkaTopicInfo {
2214    pub name: String,
2215    pub partitions: usize,
2216    pub replication_factor: i32,
2217}
2218
2219#[cfg(feature = "kafka")]
2220#[derive(Debug, Clone, Serialize, Deserialize)]
2221pub struct KafkaConsumerGroupInfo {
2222    pub group_id: String,
2223    pub members: usize,
2224    pub state: String,
2225}
2226
2227#[cfg(feature = "kafka")]
2228/// Get Kafka broker statistics
2229async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2230    if let Some(broker) = &state.kafka_broker {
2231        let topics = broker.topics.read().await;
2232        let consumer_groups = broker.consumer_groups.read().await;
2233        let metrics = broker.metrics.clone();
2234
2235        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2236        let snapshot = metrics.snapshot();
2237        let messages_produced = snapshot.messages_produced_total;
2238        let messages_consumed = snapshot.messages_consumed_total;
2239
2240        let stats = KafkaBrokerStats {
2241            topics: topics.len(),
2242            partitions: total_partitions,
2243            consumer_groups: consumer_groups.groups().len(),
2244            messages_produced,
2245            messages_consumed,
2246        };
2247
2248        Json(stats).into_response()
2249    } else {
2250        (
2251            StatusCode::SERVICE_UNAVAILABLE,
2252            Json(serde_json::json!({
2253                "error": "Kafka broker not available",
2254                "message": "Kafka broker is not enabled or not available."
2255            })),
2256        )
2257            .into_response()
2258    }
2259}
2260
2261#[cfg(feature = "kafka")]
2262/// List Kafka topics
2263async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2264    if let Some(broker) = &state.kafka_broker {
2265        let topics = broker.topics.read().await;
2266        let topic_list: Vec<KafkaTopicInfo> = topics
2267            .iter()
2268            .map(|(name, topic)| KafkaTopicInfo {
2269                name: name.clone(),
2270                partitions: topic.partitions.len(),
2271                replication_factor: topic.config.replication_factor,
2272            })
2273            .collect();
2274
2275        Json(serde_json::json!({
2276            "topics": topic_list
2277        }))
2278        .into_response()
2279    } else {
2280        (
2281            StatusCode::SERVICE_UNAVAILABLE,
2282            Json(serde_json::json!({
2283                "error": "Kafka broker not available",
2284                "message": "Kafka broker is not enabled or not available."
2285            })),
2286        )
2287            .into_response()
2288    }
2289}
2290
2291#[cfg(feature = "kafka")]
2292/// Get Kafka topic details
2293async fn get_kafka_topic(
2294    State(state): State<ManagementState>,
2295    Path(topic_name): Path<String>,
2296) -> impl IntoResponse {
2297    if let Some(broker) = &state.kafka_broker {
2298        let topics = broker.topics.read().await;
2299        if let Some(topic) = topics.get(&topic_name) {
2300            Json(serde_json::json!({
2301                "name": topic_name,
2302                "partitions": topic.partitions.len(),
2303                "replication_factor": topic.config.replication_factor,
2304                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2305                    "id": idx as i32,
2306                    "leader": 0,
2307                    "replicas": vec![0],
2308                    "message_count": partition.messages.len()
2309                })).collect::<Vec<_>>()
2310            })).into_response()
2311        } else {
2312            (
2313                StatusCode::NOT_FOUND,
2314                Json(serde_json::json!({
2315                    "error": "Topic not found",
2316                    "topic": topic_name
2317                })),
2318            )
2319                .into_response()
2320        }
2321    } else {
2322        (
2323            StatusCode::SERVICE_UNAVAILABLE,
2324            Json(serde_json::json!({
2325                "error": "Kafka broker not available",
2326                "message": "Kafka broker is not enabled or not available."
2327            })),
2328        )
2329            .into_response()
2330    }
2331}
2332
2333#[cfg(feature = "kafka")]
2334/// List Kafka consumer groups
2335async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2336    if let Some(broker) = &state.kafka_broker {
2337        let consumer_groups = broker.consumer_groups.read().await;
2338        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2339            .groups()
2340            .iter()
2341            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2342                group_id: group_id.clone(),
2343                members: group.members.len(),
2344                state: "Stable".to_string(), // Simplified - could be more detailed
2345            })
2346            .collect();
2347
2348        Json(serde_json::json!({
2349            "groups": groups
2350        }))
2351        .into_response()
2352    } else {
2353        (
2354            StatusCode::SERVICE_UNAVAILABLE,
2355            Json(serde_json::json!({
2356                "error": "Kafka broker not available",
2357                "message": "Kafka broker is not enabled or not available."
2358            })),
2359        )
2360            .into_response()
2361    }
2362}
2363
2364#[cfg(feature = "kafka")]
2365/// Get Kafka consumer group details
2366async fn get_kafka_group(
2367    State(state): State<ManagementState>,
2368    Path(group_id): Path<String>,
2369) -> impl IntoResponse {
2370    if let Some(broker) = &state.kafka_broker {
2371        let consumer_groups = broker.consumer_groups.read().await;
2372        if let Some(group) = consumer_groups.groups().get(&group_id) {
2373            Json(serde_json::json!({
2374                "group_id": group_id,
2375                "members": group.members.len(),
2376                "state": "Stable",
2377                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2378                    "member_id": member_id,
2379                    "client_id": member.client_id,
2380                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2381                        "topic": a.topic,
2382                        "partitions": a.partitions
2383                    })).collect::<Vec<_>>()
2384                })).collect::<Vec<_>>(),
2385                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2386                    "topic": topic,
2387                    "partition": partition,
2388                    "offset": offset
2389                })).collect::<Vec<_>>()
2390            })).into_response()
2391        } else {
2392            (
2393                StatusCode::NOT_FOUND,
2394                Json(serde_json::json!({
2395                    "error": "Consumer group not found",
2396                    "group_id": group_id
2397                })),
2398            )
2399                .into_response()
2400        }
2401    } else {
2402        (
2403            StatusCode::SERVICE_UNAVAILABLE,
2404            Json(serde_json::json!({
2405                "error": "Kafka broker not available",
2406                "message": "Kafka broker is not enabled or not available."
2407            })),
2408        )
2409            .into_response()
2410    }
2411}
2412
2413// ========== Kafka Produce Handler ==========
2414
2415#[cfg(feature = "kafka")]
2416#[derive(Debug, Deserialize)]
2417pub struct KafkaProduceRequest {
2418    /// Topic to produce to
2419    pub topic: String,
2420    /// Message key (optional)
2421    #[serde(default)]
2422    pub key: Option<String>,
2423    /// Message value (JSON string or plain string)
2424    pub value: String,
2425    /// Partition ID (optional, auto-assigned if not provided)
2426    #[serde(default)]
2427    pub partition: Option<i32>,
2428    /// Message headers (optional, key-value pairs)
2429    #[serde(default)]
2430    pub headers: Option<std::collections::HashMap<String, String>>,
2431}
2432
2433#[cfg(feature = "kafka")]
2434/// Produce a message to a Kafka topic
2435async fn produce_kafka_message(
2436    State(state): State<ManagementState>,
2437    Json(request): Json<KafkaProduceRequest>,
2438) -> impl IntoResponse {
2439    if let Some(broker) = &state.kafka_broker {
2440        let mut topics = broker.topics.write().await;
2441
2442        // Get or create the topic
2443        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2444            crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2445        });
2446
2447        // Determine partition
2448        let partition_id = if let Some(partition) = request.partition {
2449            partition
2450        } else {
2451            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2452        };
2453
2454        // Validate partition exists
2455        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2456            return (
2457                StatusCode::BAD_REQUEST,
2458                Json(serde_json::json!({
2459                    "error": "Invalid partition",
2460                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2461                })),
2462            )
2463                .into_response();
2464        }
2465
2466        // Create the message
2467        let message = crate::partitions::KafkaMessage {
2468            offset: 0, // Will be set by partition.append
2469            timestamp: chrono::Utc::now().timestamp_millis(),
2470            key: request.key.map(|k| k.as_bytes().to_vec()),
2471            value: request.value.as_bytes().to_vec(),
2472            headers: request
2473                .headers
2474                .unwrap_or_default()
2475                .into_iter()
2476                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2477                .collect(),
2478        };
2479
2480        // Produce to partition
2481        match topic_entry.produce(partition_id, message).await {
2482            Ok(offset) => {
2483                // Record metrics
2484                broker.metrics.record_messages_produced(1);
2485
2486                // Emit message event for real-time monitoring
2487                #[cfg(feature = "kafka")]
2488                {
2489                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2490                        topic: request.topic.clone(),
2491                        key: request.key.clone(),
2492                        value: request.value.clone(),
2493                        partition: partition_id,
2494                        offset,
2495                        headers: request.headers.clone(),
2496                        timestamp: chrono::Utc::now().to_rfc3339(),
2497                    });
2498                    let _ = state.message_events.send(event);
2499                }
2500
2501                Json(serde_json::json!({
2502                    "success": true,
2503                    "message": format!("Message produced to topic '{}'", request.topic),
2504                    "topic": request.topic,
2505                    "partition": partition_id,
2506                    "offset": offset
2507                }))
2508                .into_response()
2509            }
2510            Err(e) => (
2511                StatusCode::INTERNAL_SERVER_ERROR,
2512                Json(serde_json::json!({
2513                    "error": "Failed to produce message",
2514                    "message": e.to_string()
2515                })),
2516            )
2517                .into_response(),
2518        }
2519    } else {
2520        (
2521            StatusCode::SERVICE_UNAVAILABLE,
2522            Json(serde_json::json!({
2523                "error": "Kafka broker not available",
2524                "message": "Kafka broker is not enabled or not available."
2525            })),
2526        )
2527            .into_response()
2528    }
2529}
2530
2531#[cfg(feature = "kafka")]
2532#[derive(Debug, Deserialize)]
2533pub struct KafkaBatchProduceRequest {
2534    /// List of messages to produce
2535    pub messages: Vec<KafkaProduceRequest>,
2536    /// Delay between messages in milliseconds
2537    #[serde(default = "default_delay")]
2538    pub delay_ms: u64,
2539}
2540
2541#[cfg(feature = "kafka")]
2542/// Produce multiple messages to Kafka topics
2543async fn produce_kafka_batch(
2544    State(state): State<ManagementState>,
2545    Json(request): Json<KafkaBatchProduceRequest>,
2546) -> impl IntoResponse {
2547    if let Some(broker) = &state.kafka_broker {
2548        if request.messages.is_empty() {
2549            return (
2550                StatusCode::BAD_REQUEST,
2551                Json(serde_json::json!({
2552                    "error": "Empty batch",
2553                    "message": "At least one message is required"
2554                })),
2555            )
2556                .into_response();
2557        }
2558
2559        let mut results = Vec::new();
2560
2561        for (index, msg_request) in request.messages.iter().enumerate() {
2562            let mut topics = broker.topics.write().await;
2563
2564            // Get or create the topic
2565            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2566                crate::topics::Topic::new(
2567                    msg_request.topic.clone(),
2568                    crate::topics::TopicConfig::default(),
2569                )
2570            });
2571
2572            // Determine partition
2573            let partition_id = if let Some(partition) = msg_request.partition {
2574                partition
2575            } else {
2576                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2577            };
2578
2579            // Validate partition exists
2580            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2581                results.push(serde_json::json!({
2582                    "index": index,
2583                    "success": false,
2584                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2585                }));
2586                continue;
2587            }
2588
2589            // Create the message
2590            let message = crate::partitions::KafkaMessage {
2591                offset: 0,
2592                timestamp: chrono::Utc::now().timestamp_millis(),
2593                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2594                value: msg_request.value.as_bytes().to_vec(),
2595                headers: msg_request
2596                    .headers
2597                    .clone()
2598                    .unwrap_or_default()
2599                    .into_iter()
2600                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2601                    .collect(),
2602            };
2603
2604            // Produce to partition
2605            match topic_entry.produce(partition_id, message).await {
2606                Ok(offset) => {
2607                    broker.metrics.record_messages_produced(1);
2608
2609                    // Emit message event
2610                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2611                        topic: msg_request.topic.clone(),
2612                        key: msg_request.key.clone(),
2613                        value: msg_request.value.clone(),
2614                        partition: partition_id,
2615                        offset,
2616                        headers: msg_request.headers.clone(),
2617                        timestamp: chrono::Utc::now().to_rfc3339(),
2618                    });
2619                    let _ = state.message_events.send(event);
2620
2621                    results.push(serde_json::json!({
2622                        "index": index,
2623                        "success": true,
2624                        "topic": msg_request.topic,
2625                        "partition": partition_id,
2626                        "offset": offset
2627                    }));
2628                }
2629                Err(e) => {
2630                    results.push(serde_json::json!({
2631                        "index": index,
2632                        "success": false,
2633                        "error": e.to_string()
2634                    }));
2635                }
2636            }
2637
2638            // Add delay between messages (except for the last one)
2639            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2640                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2641            }
2642        }
2643
2644        let success_count =
2645            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2646
2647        Json(serde_json::json!({
2648            "success": true,
2649            "total": request.messages.len(),
2650            "succeeded": success_count,
2651            "failed": request.messages.len() - success_count,
2652            "results": results
2653        }))
2654        .into_response()
2655    } else {
2656        (
2657            StatusCode::SERVICE_UNAVAILABLE,
2658            Json(serde_json::json!({
2659                "error": "Kafka broker not available",
2660                "message": "Kafka broker is not enabled or not available."
2661            })),
2662        )
2663            .into_response()
2664    }
2665}
2666
2667// ========== Real-time Message Streaming (SSE) ==========
2668
2669#[cfg(feature = "mqtt")]
2670/// SSE stream for MQTT messages
2671async fn mqtt_messages_stream(
2672    State(state): State<ManagementState>,
2673    Query(params): Query<std::collections::HashMap<String, String>>,
2674) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2675    let mut rx = state.message_events.subscribe();
2676    let topic_filter = params.get("topic").cloned();
2677
2678    let stream = stream::unfold(rx, move |mut rx| {
2679        let topic_filter = topic_filter.clone();
2680
2681        async move {
2682            loop {
2683                match rx.recv().await {
2684                    Ok(MessageEvent::Mqtt(event)) => {
2685                        // Apply topic filter if specified
2686                        if let Some(filter) = &topic_filter {
2687                            if !event.topic.contains(filter) {
2688                                continue;
2689                            }
2690                        }
2691
2692                        let event_json = serde_json::json!({
2693                            "protocol": "mqtt",
2694                            "topic": event.topic,
2695                            "payload": event.payload,
2696                            "qos": event.qos,
2697                            "retain": event.retain,
2698                            "timestamp": event.timestamp,
2699                        });
2700
2701                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2702                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2703                            return Some((Ok(sse_event), rx));
2704                        }
2705                    }
2706                    #[cfg(feature = "kafka")]
2707                    Ok(MessageEvent::Kafka(_)) => {
2708                        // Skip Kafka events in MQTT stream
2709                        continue;
2710                    }
2711                    Err(broadcast::error::RecvError::Closed) => {
2712                        return None;
2713                    }
2714                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2715                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2716                        continue;
2717                    }
2718                }
2719            }
2720        }
2721    });
2722
2723    Sse::new(stream).keep_alive(
2724        axum::response::sse::KeepAlive::new()
2725            .interval(std::time::Duration::from_secs(15))
2726            .text("keep-alive-text"),
2727    )
2728}
2729
2730#[cfg(feature = "kafka")]
2731/// SSE stream for Kafka messages
2732async fn kafka_messages_stream(
2733    State(state): State<ManagementState>,
2734    Query(params): Query<std::collections::HashMap<String, String>>,
2735) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2736    let mut rx = state.message_events.subscribe();
2737    let topic_filter = params.get("topic").cloned();
2738
2739    let stream = stream::unfold(rx, move |mut rx| {
2740        let topic_filter = topic_filter.clone();
2741
2742        async move {
2743            loop {
2744                match rx.recv().await {
2745                    #[cfg(feature = "mqtt")]
2746                    Ok(MessageEvent::Mqtt(_)) => {
2747                        // Skip MQTT events in Kafka stream
2748                        continue;
2749                    }
2750                    Ok(MessageEvent::Kafka(event)) => {
2751                        // Apply topic filter if specified
2752                        if let Some(filter) = &topic_filter {
2753                            if !event.topic.contains(filter) {
2754                                continue;
2755                            }
2756                        }
2757
2758                        let event_json = serde_json::json!({
2759                            "protocol": "kafka",
2760                            "topic": event.topic,
2761                            "key": event.key,
2762                            "value": event.value,
2763                            "partition": event.partition,
2764                            "offset": event.offset,
2765                            "headers": event.headers,
2766                            "timestamp": event.timestamp,
2767                        });
2768
2769                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2770                            let sse_event =
2771                                Event::default().event("kafka_message").data(event_data);
2772                            return Some((Ok(sse_event), rx));
2773                        }
2774                    }
2775                    Err(broadcast::error::RecvError::Closed) => {
2776                        return None;
2777                    }
2778                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2779                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2780                        continue;
2781                    }
2782                }
2783            }
2784        }
2785    });
2786
2787    Sse::new(stream).keep_alive(
2788        axum::response::sse::KeepAlive::new()
2789            .interval(std::time::Duration::from_secs(15))
2790            .text("keep-alive-text"),
2791    )
2792}
2793
2794// ========== AI-Powered Features ==========
2795
2796/// Request for AI-powered API specification generation
2797#[derive(Debug, Deserialize)]
2798pub struct GenerateSpecRequest {
2799    /// Natural language description of the API to generate
2800    pub query: String,
2801    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2802    pub spec_type: String,
2803    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2804    pub api_version: Option<String>,
2805}
2806
2807/// Request for OpenAPI generation from recorded traffic
2808#[derive(Debug, Deserialize)]
2809pub struct GenerateOpenApiFromTrafficRequest {
2810    /// Path to recorder database (optional, defaults to ./recordings.db)
2811    #[serde(default)]
2812    pub database_path: Option<String>,
2813    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2814    #[serde(default)]
2815    pub since: Option<String>,
2816    /// End time for filtering (ISO 8601 format)
2817    #[serde(default)]
2818    pub until: Option<String>,
2819    /// Path pattern filter (supports wildcards, e.g., /api/*)
2820    #[serde(default)]
2821    pub path_pattern: Option<String>,
2822    /// Minimum confidence score for including paths (0.0 to 1.0)
2823    #[serde(default = "default_min_confidence")]
2824    pub min_confidence: f64,
2825}
2826
2827fn default_min_confidence() -> f64 {
2828    0.7
2829}
2830
2831/// Generate API specification from natural language using AI
2832#[cfg(feature = "data-faker")]
2833async fn generate_ai_spec(
2834    State(_state): State<ManagementState>,
2835    Json(request): Json<GenerateSpecRequest>,
2836) -> impl IntoResponse {
2837    use mockforge_data::rag::{
2838        config::{EmbeddingProvider, LlmProvider, RagConfig},
2839        engine::RagEngine,
2840        storage::{DocumentStorage, StorageFactory},
2841    };
2842    use std::sync::Arc;
2843
2844    // Build RAG config from environment variables
2845    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2846        .ok()
2847        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2848
2849    // Check if RAG is configured - require API key
2850    if api_key.is_none() {
2851        return (
2852            StatusCode::SERVICE_UNAVAILABLE,
2853            Json(serde_json::json!({
2854                "error": "AI service not configured",
2855                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2856            })),
2857        )
2858            .into_response();
2859    }
2860
2861    // Build RAG configuration
2862    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2863        .unwrap_or_else(|_| "openai".to_string())
2864        .to_lowercase();
2865
2866    let provider = match provider_str.as_str() {
2867        "openai" => LlmProvider::OpenAI,
2868        "anthropic" => LlmProvider::Anthropic,
2869        "ollama" => LlmProvider::Ollama,
2870        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2871        _ => LlmProvider::OpenAI,
2872    };
2873
2874    let api_endpoint =
2875        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2876            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2877            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2878            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2879            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2880        });
2881
2882    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2883        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2884        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2885        LlmProvider::Ollama => "llama2".to_string(),
2886        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2887    });
2888
2889    // Build RagConfig using default() and override fields
2890    let mut rag_config = RagConfig::default();
2891    rag_config.provider = provider;
2892    rag_config.api_endpoint = api_endpoint;
2893    rag_config.api_key = api_key;
2894    rag_config.model = model;
2895    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2896        .unwrap_or_else(|_| "4096".to_string())
2897        .parse()
2898        .unwrap_or(4096);
2899    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2900        .unwrap_or_else(|_| "0.3".to_string())
2901        .parse()
2902        .unwrap_or(0.3); // Lower temperature for more structured output
2903    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2904        .unwrap_or_else(|_| "60".to_string())
2905        .parse()
2906        .unwrap_or(60);
2907    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2908        .unwrap_or_else(|_| "4000".to_string())
2909        .parse()
2910        .unwrap_or(4000);
2911
2912    // Build the prompt for spec generation
2913    let spec_type_label = match request.spec_type.as_str() {
2914        "openapi" => "OpenAPI 3.0",
2915        "graphql" => "GraphQL",
2916        "asyncapi" => "AsyncAPI",
2917        _ => "OpenAPI 3.0",
2918    };
2919
2920    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2921
2922    let prompt = format!(
2923        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2924
2925User Requirements:
2926{}
2927
2928Instructions:
29291. Generate a complete, valid {} specification
29302. Include all paths, operations, request/response schemas, and components
29313. Use realistic field names and data types
29324. Include proper descriptions and examples
29335. Follow {} best practices
29346. Return ONLY the specification, no additional explanation
29357. For OpenAPI, use version {}
2936
2937Return the specification in {} format."#,
2938        spec_type_label,
2939        request.query,
2940        spec_type_label,
2941        spec_type_label,
2942        api_version,
2943        if request.spec_type == "graphql" {
2944            "GraphQL SDL"
2945        } else {
2946            "YAML"
2947        }
2948    );
2949
2950    // Create in-memory storage for RAG engine
2951    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
2952    // We need to use unsafe transmute or create a wrapper, but for now we'll use
2953    // a simpler approach: create InMemoryStorage directly
2954    use mockforge_data::rag::storage::InMemoryStorage;
2955    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2956
2957    // Create RAG engine
2958    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2959        Ok(engine) => engine,
2960        Err(e) => {
2961            return (
2962                StatusCode::INTERNAL_SERVER_ERROR,
2963                Json(serde_json::json!({
2964                    "error": "Failed to initialize RAG engine",
2965                    "message": e.to_string()
2966                })),
2967            )
2968                .into_response();
2969        }
2970    };
2971
2972    // Generate using RAG engine
2973    match rag_engine.generate(&prompt, None).await {
2974        Ok(generated_text) => {
2975            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
2976            let spec = if request.spec_type == "graphql" {
2977                // For GraphQL, extract SDL
2978                extract_graphql_schema(&generated_text)
2979            } else {
2980                // For OpenAPI/AsyncAPI, extract YAML
2981                extract_yaml_spec(&generated_text)
2982            };
2983
2984            Json(serde_json::json!({
2985                "success": true,
2986                "spec": spec,
2987                "spec_type": request.spec_type,
2988            }))
2989            .into_response()
2990        }
2991        Err(e) => (
2992            StatusCode::INTERNAL_SERVER_ERROR,
2993            Json(serde_json::json!({
2994                "error": "AI generation failed",
2995                "message": e.to_string()
2996            })),
2997        )
2998            .into_response(),
2999    }
3000}
3001
3002#[cfg(not(feature = "data-faker"))]
3003async fn generate_ai_spec(
3004    State(_state): State<ManagementState>,
3005    Json(_request): Json<GenerateSpecRequest>,
3006) -> impl IntoResponse {
3007    (
3008        StatusCode::NOT_IMPLEMENTED,
3009        Json(serde_json::json!({
3010            "error": "AI features not enabled",
3011            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3012        })),
3013    )
3014        .into_response()
3015}
3016
3017/// Generate OpenAPI specification from recorded traffic
3018async fn generate_openapi_from_traffic(
3019    State(_state): State<ManagementState>,
3020    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3021) -> impl IntoResponse {
3022    use chrono::{DateTime, Utc};
3023    use mockforge_core::intelligent_behavior::{
3024        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3025        IntelligentBehaviorConfig,
3026    };
3027    use mockforge_recorder::{
3028        database::RecorderDatabase,
3029        openapi_export::{QueryFilters, RecordingsToOpenApi},
3030    };
3031    use std::path::PathBuf;
3032
3033    // Determine database path
3034    let db_path = if let Some(ref path) = request.database_path {
3035        PathBuf::from(path)
3036    } else {
3037        std::env::current_dir()
3038            .unwrap_or_else(|_| PathBuf::from("."))
3039            .join("recordings.db")
3040    };
3041
3042    // Open database
3043    let db = match RecorderDatabase::new(&db_path).await {
3044        Ok(db) => db,
3045        Err(e) => {
3046            return (
3047                StatusCode::BAD_REQUEST,
3048                Json(serde_json::json!({
3049                    "error": "Database error",
3050                    "message": format!("Failed to open recorder database: {}", e)
3051                })),
3052            )
3053                .into_response();
3054        }
3055    };
3056
3057    // Parse time filters
3058    let since_dt = if let Some(ref since_str) = request.since {
3059        match DateTime::parse_from_rfc3339(since_str) {
3060            Ok(dt) => Some(dt.with_timezone(&Utc)),
3061            Err(e) => {
3062                return (
3063                    StatusCode::BAD_REQUEST,
3064                    Json(serde_json::json!({
3065                        "error": "Invalid date format",
3066                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3067                    })),
3068                )
3069                    .into_response();
3070            }
3071        }
3072    } else {
3073        None
3074    };
3075
3076    let until_dt = if let Some(ref until_str) = request.until {
3077        match DateTime::parse_from_rfc3339(until_str) {
3078            Ok(dt) => Some(dt.with_timezone(&Utc)),
3079            Err(e) => {
3080                return (
3081                    StatusCode::BAD_REQUEST,
3082                    Json(serde_json::json!({
3083                        "error": "Invalid date format",
3084                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3085                    })),
3086                )
3087                    .into_response();
3088            }
3089        }
3090    } else {
3091        None
3092    };
3093
3094    // Build query filters
3095    let query_filters = QueryFilters {
3096        since: since_dt,
3097        until: until_dt,
3098        path_pattern: request.path_pattern.clone(),
3099        min_status_code: None,
3100        max_requests: Some(1000),
3101    };
3102
3103    // Query HTTP exchanges
3104    let exchanges = match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await
3105    {
3106        Ok(exchanges) => exchanges,
3107        Err(e) => {
3108            return (
3109                StatusCode::INTERNAL_SERVER_ERROR,
3110                Json(serde_json::json!({
3111                    "error": "Query error",
3112                    "message": format!("Failed to query HTTP exchanges: {}", e)
3113                })),
3114            )
3115                .into_response();
3116        }
3117    };
3118
3119    if exchanges.is_empty() {
3120        return (
3121            StatusCode::NOT_FOUND,
3122            Json(serde_json::json!({
3123                "error": "No exchanges found",
3124                "message": "No HTTP exchanges found matching the specified filters"
3125            })),
3126        )
3127            .into_response();
3128    }
3129
3130    // Create OpenAPI generator config
3131    let behavior_config = IntelligentBehaviorConfig::default();
3132    let gen_config = OpenApiGenerationConfig {
3133        min_confidence: request.min_confidence,
3134        behavior_model: Some(behavior_config.behavior_model),
3135    };
3136
3137    // Generate OpenAPI spec
3138    let generator = OpenApiSpecGenerator::new(gen_config);
3139    let result = match generator.generate_from_exchanges(exchanges).await {
3140        Ok(result) => result,
3141        Err(e) => {
3142            return (
3143                StatusCode::INTERNAL_SERVER_ERROR,
3144                Json(serde_json::json!({
3145                    "error": "Generation error",
3146                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3147                })),
3148            )
3149                .into_response();
3150        }
3151    };
3152
3153    // Prepare response
3154    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3155        raw.clone()
3156    } else {
3157        match serde_json::to_value(&result.spec.spec) {
3158            Ok(json) => json,
3159            Err(e) => {
3160                return (
3161                    StatusCode::INTERNAL_SERVER_ERROR,
3162                    Json(serde_json::json!({
3163                        "error": "Serialization error",
3164                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3165                    })),
3166                )
3167                    .into_response();
3168            }
3169        }
3170    };
3171
3172    // Build response with metadata
3173    let response = serde_json::json!({
3174        "spec": spec_json,
3175        "metadata": {
3176            "requests_analyzed": result.metadata.requests_analyzed,
3177            "paths_inferred": result.metadata.paths_inferred,
3178            "path_confidence": result.metadata.path_confidence,
3179            "generated_at": result.metadata.generated_at.to_rfc3339(),
3180            "duration_ms": result.metadata.duration_ms,
3181        }
3182    });
3183
3184    Json(response).into_response()
3185}
3186
3187/// List all rule explanations
3188async fn list_rule_explanations(
3189    State(state): State<ManagementState>,
3190    Query(params): Query<std::collections::HashMap<String, String>>,
3191) -> impl IntoResponse {
3192    use mockforge_core::intelligent_behavior::RuleType;
3193
3194    let explanations = state.rule_explanations.read().await;
3195    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3196
3197    // Filter by rule type if provided
3198    if let Some(rule_type_str) = params.get("rule_type") {
3199        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3200            explanations_vec.retain(|e| e.rule_type == rule_type);
3201        }
3202    }
3203
3204    // Filter by minimum confidence if provided
3205    if let Some(min_confidence_str) = params.get("min_confidence") {
3206        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3207            explanations_vec.retain(|e| e.confidence >= min_confidence);
3208        }
3209    }
3210
3211    // Sort by confidence (descending) and then by generated_at (descending)
3212    explanations_vec.sort_by(|a, b| {
3213        b.confidence
3214            .partial_cmp(&a.confidence)
3215            .unwrap_or(std::cmp::Ordering::Equal)
3216            .then_with(|| b.generated_at.cmp(&a.generated_at))
3217    });
3218
3219    Json(serde_json::json!({
3220        "explanations": explanations_vec,
3221        "total": explanations_vec.len(),
3222    }))
3223    .into_response()
3224}
3225
3226/// Get a specific rule explanation by ID
3227async fn get_rule_explanation(
3228    State(state): State<ManagementState>,
3229    Path(rule_id): Path<String>,
3230) -> impl IntoResponse {
3231    let explanations = state.rule_explanations.read().await;
3232
3233    match explanations.get(&rule_id) {
3234        Some(explanation) => Json(serde_json::json!({
3235            "explanation": explanation,
3236        }))
3237        .into_response(),
3238        None => (
3239            StatusCode::NOT_FOUND,
3240            Json(serde_json::json!({
3241                "error": "Rule explanation not found",
3242                "message": format!("No explanation found for rule ID: {}", rule_id)
3243            })),
3244        )
3245            .into_response(),
3246    }
3247}
3248
3249/// Request for learning from examples
3250#[derive(Debug, Deserialize)]
3251pub struct LearnFromExamplesRequest {
3252    /// Example request/response pairs to learn from
3253    pub examples: Vec<ExamplePairRequest>,
3254    /// Optional configuration override
3255    #[serde(default)]
3256    pub config: Option<serde_json::Value>,
3257}
3258
3259/// Example pair request format
3260#[derive(Debug, Deserialize)]
3261pub struct ExamplePairRequest {
3262    /// Request data (method, path, body, etc.)
3263    pub request: serde_json::Value,
3264    /// Response data (status_code, body, etc.)
3265    pub response: serde_json::Value,
3266}
3267
3268/// Learn behavioral rules from example pairs
3269///
3270/// This endpoint accepts example request/response pairs, generates behavioral rules
3271/// with explanations, and stores the explanations for later retrieval.
3272async fn learn_from_examples(
3273    State(state): State<ManagementState>,
3274    Json(request): Json<LearnFromExamplesRequest>,
3275) -> impl IntoResponse {
3276    use mockforge_core::intelligent_behavior::{
3277        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3278        rule_generator::{ExamplePair, RuleGenerator},
3279    };
3280
3281    if request.examples.is_empty() {
3282        return (
3283            StatusCode::BAD_REQUEST,
3284            Json(serde_json::json!({
3285                "error": "No examples provided",
3286                "message": "At least one example pair is required"
3287            })),
3288        )
3289            .into_response();
3290    }
3291
3292    // Convert request examples to ExamplePair format
3293    let example_pairs: Result<Vec<ExamplePair>, String> = request
3294        .examples
3295        .into_iter()
3296        .enumerate()
3297        .map(|(idx, ex)| {
3298            // Parse request JSON to extract method, path, body, etc.
3299            let method = ex
3300                .request
3301                .get("method")
3302                .and_then(|v| v.as_str())
3303                .map(|s| s.to_string())
3304                .unwrap_or_else(|| "GET".to_string());
3305            let path = ex
3306                .request
3307                .get("path")
3308                .and_then(|v| v.as_str())
3309                .map(|s| s.to_string())
3310                .unwrap_or_else(|| "/".to_string());
3311            let request_body = ex.request.get("body").cloned();
3312            let query_params = ex
3313                .request
3314                .get("query_params")
3315                .and_then(|v| v.as_object())
3316                .map(|obj| {
3317                    obj.iter()
3318                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3319                        .collect()
3320                })
3321                .unwrap_or_default();
3322            let headers = ex
3323                .request
3324                .get("headers")
3325                .and_then(|v| v.as_object())
3326                .map(|obj| {
3327                    obj.iter()
3328                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3329                        .collect()
3330                })
3331                .unwrap_or_default();
3332
3333            // Parse response JSON to extract status, body, etc.
3334            let status = ex
3335                .response
3336                .get("status_code")
3337                .or_else(|| ex.response.get("status"))
3338                .and_then(|v| v.as_u64())
3339                .map(|n| n as u16)
3340                .unwrap_or(200);
3341            let response_body = ex.response.get("body").cloned();
3342
3343            Ok(ExamplePair {
3344                method,
3345                path,
3346                request: request_body,
3347                status,
3348                response: response_body,
3349                query_params,
3350                headers,
3351                metadata: {
3352                    let mut meta = std::collections::HashMap::new();
3353                    meta.insert("source".to_string(), "api".to_string());
3354                    meta.insert("example_index".to_string(), idx.to_string());
3355                    meta
3356                },
3357            })
3358        })
3359        .collect();
3360
3361    let example_pairs = match example_pairs {
3362        Ok(pairs) => pairs,
3363        Err(e) => {
3364            return (
3365                StatusCode::BAD_REQUEST,
3366                Json(serde_json::json!({
3367                    "error": "Invalid examples",
3368                    "message": e
3369                })),
3370            )
3371                .into_response();
3372        }
3373    };
3374
3375    // Create behavior config (use provided config or default)
3376    let behavior_config = if let Some(config_json) = request.config {
3377        // Try to deserialize custom config, fall back to default
3378        serde_json::from_value(config_json)
3379            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3380            .behavior_model
3381    } else {
3382        BehaviorModelConfig::default()
3383    };
3384
3385    // Create rule generator
3386    let generator = RuleGenerator::new(behavior_config);
3387
3388    // Generate rules with explanations
3389    let (rules, explanations) =
3390        match generator.generate_rules_with_explanations(example_pairs).await {
3391            Ok(result) => result,
3392            Err(e) => {
3393                return (
3394                    StatusCode::INTERNAL_SERVER_ERROR,
3395                    Json(serde_json::json!({
3396                        "error": "Rule generation failed",
3397                        "message": format!("Failed to generate rules: {}", e)
3398                    })),
3399                )
3400                    .into_response();
3401            }
3402        };
3403
3404    // Store explanations in ManagementState
3405    {
3406        let mut stored_explanations = state.rule_explanations.write().await;
3407        for explanation in &explanations {
3408            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3409        }
3410    }
3411
3412    // Prepare response
3413    let response = serde_json::json!({
3414        "success": true,
3415        "rules_generated": {
3416            "consistency_rules": rules.consistency_rules.len(),
3417            "schemas": rules.schemas.len(),
3418            "state_machines": rules.state_transitions.len(),
3419            "system_prompt": !rules.system_prompt.is_empty(),
3420        },
3421        "explanations": explanations.iter().map(|e| serde_json::json!({
3422            "rule_id": e.rule_id,
3423            "rule_type": e.rule_type,
3424            "confidence": e.confidence,
3425            "reasoning": e.reasoning,
3426        })).collect::<Vec<_>>(),
3427        "total_explanations": explanations.len(),
3428    });
3429
3430    Json(response).into_response()
3431}
3432
3433fn extract_yaml_spec(text: &str) -> String {
3434    // Try to find YAML code blocks
3435    if let Some(start) = text.find("```yaml") {
3436        let yaml_start = text[start + 7..].trim_start();
3437        if let Some(end) = yaml_start.find("```") {
3438            return yaml_start[..end].trim().to_string();
3439        }
3440    }
3441    if let Some(start) = text.find("```") {
3442        let content_start = text[start + 3..].trim_start();
3443        if let Some(end) = content_start.find("```") {
3444            return content_start[..end].trim().to_string();
3445        }
3446    }
3447
3448    // Check if it starts with openapi: or asyncapi:
3449    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3450        return text.trim().to_string();
3451    }
3452
3453    // Return as-is if no code blocks found
3454    text.trim().to_string()
3455}
3456
3457fn extract_graphql_schema(text: &str) -> String {
3458    // Try to find GraphQL code blocks
3459    if let Some(start) = text.find("```graphql") {
3460        let schema_start = text[start + 10..].trim_start();
3461        if let Some(end) = schema_start.find("```") {
3462            return schema_start[..end].trim().to_string();
3463        }
3464    }
3465    if let Some(start) = text.find("```") {
3466        let content_start = text[start + 3..].trim_start();
3467        if let Some(end) = content_start.find("```") {
3468            return content_start[..end].trim().to_string();
3469        }
3470    }
3471
3472    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3473    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3474        return text.trim().to_string();
3475    }
3476
3477    text.trim().to_string()
3478}
3479
3480// ========== Chaos Engineering Management ==========
3481
3482/// Get current chaos engineering configuration
3483async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3484    // TODO: Get from state when chaos config is stored
3485    Json(serde_json::json!({
3486        "enabled": false,
3487        "latency": null,
3488        "fault_injection": null,
3489        "rate_limit": null,
3490        "traffic_shaping": null,
3491    }))
3492    .into_response()
3493}
3494
3495/// Request to update chaos configuration
3496#[derive(Debug, Deserialize)]
3497pub struct ChaosConfigUpdate {
3498    /// Whether to enable chaos engineering
3499    pub enabled: Option<bool>,
3500    /// Latency configuration
3501    pub latency: Option<serde_json::Value>,
3502    /// Fault injection configuration
3503    pub fault_injection: Option<serde_json::Value>,
3504    /// Rate limiting configuration
3505    pub rate_limit: Option<serde_json::Value>,
3506    /// Traffic shaping configuration
3507    pub traffic_shaping: Option<serde_json::Value>,
3508}
3509
3510/// Update chaos engineering configuration
3511async fn update_chaos_config(
3512    State(_state): State<ManagementState>,
3513    Json(config): Json<ChaosConfigUpdate>,
3514) -> impl IntoResponse {
3515    // TODO: Apply chaos config to server
3516    Json(serde_json::json!({
3517        "success": true,
3518        "message": "Chaos configuration updated"
3519    }))
3520    .into_response()
3521}
3522
3523// ========== Network Profile Management ==========
3524
3525/// List available network profiles
3526async fn list_network_profiles() -> impl IntoResponse {
3527    use mockforge_core::network_profiles::NetworkProfileCatalog;
3528
3529    let catalog = NetworkProfileCatalog::default();
3530    let profiles: Vec<serde_json::Value> = catalog
3531        .list_profiles_with_description()
3532        .iter()
3533        .map(|(name, description)| {
3534            serde_json::json!({
3535                "name": name,
3536                "description": description,
3537            })
3538        })
3539        .collect();
3540
3541    Json(serde_json::json!({
3542        "profiles": profiles
3543    }))
3544    .into_response()
3545}
3546
3547#[derive(Debug, Deserialize)]
3548/// Request to apply a network profile
3549pub struct ApplyNetworkProfileRequest {
3550    /// Name of the network profile to apply
3551    pub profile_name: String,
3552}
3553
3554/// Apply a network profile
3555async fn apply_network_profile(
3556    State(_state): State<ManagementState>,
3557    Json(request): Json<ApplyNetworkProfileRequest>,
3558) -> impl IntoResponse {
3559    use mockforge_core::network_profiles::NetworkProfileCatalog;
3560
3561    let catalog = NetworkProfileCatalog::default();
3562    if let Some(profile) = catalog.get(&request.profile_name) {
3563        // TODO: Apply profile to server configuration
3564        Json(serde_json::json!({
3565            "success": true,
3566            "message": format!("Network profile '{}' applied", request.profile_name),
3567            "profile": {
3568                "name": profile.name,
3569                "description": profile.description,
3570            }
3571        }))
3572        .into_response()
3573    } else {
3574        (
3575            StatusCode::NOT_FOUND,
3576            Json(serde_json::json!({
3577                "error": "Profile not found",
3578                "message": format!("Network profile '{}' not found", request.profile_name)
3579            })),
3580        )
3581            .into_response()
3582    }
3583}
3584
3585/// Build the management API router with UI Builder support
3586pub fn management_router_with_ui_builder(
3587    state: ManagementState,
3588    server_config: mockforge_core::config::ServerConfig,
3589) -> Router {
3590    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3591
3592    // Create the base management router
3593    let management = management_router(state);
3594
3595    // Create UI Builder state and router
3596    let ui_builder_state = UIBuilderState::new(server_config);
3597    let ui_builder = create_ui_builder_router(ui_builder_state);
3598
3599    // Nest UI Builder under /ui-builder
3600    management.nest("/ui-builder", ui_builder)
3601}
3602
3603/// Build management router with spec import API
3604pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3605    use crate::spec_import::{spec_import_router, SpecImportState};
3606
3607    // Create base management router
3608    let management = management_router(state);
3609
3610    // Merge with spec import router
3611    Router::new()
3612        .merge(management)
3613        .merge(spec_import_router(SpecImportState::new()))
3614}
3615
3616#[cfg(test)]
3617mod tests {
3618    use super::*;
3619
3620    #[tokio::test]
3621    async fn test_create_and_get_mock() {
3622        let state = ManagementState::new(None, None, 3000);
3623
3624        let mock = MockConfig {
3625            id: "test-1".to_string(),
3626            name: "Test Mock".to_string(),
3627            method: "GET".to_string(),
3628            path: "/test".to_string(),
3629            response: MockResponse {
3630                body: serde_json::json!({"message": "test"}),
3631                headers: None,
3632            },
3633            enabled: true,
3634            latency_ms: None,
3635            status_code: Some(200),
3636            request_match: None,
3637            priority: None,
3638            scenario: None,
3639            required_scenario_state: None,
3640            new_scenario_state: None,
3641        };
3642
3643        // Create mock
3644        {
3645            let mut mocks = state.mocks.write().await;
3646            mocks.push(mock.clone());
3647        }
3648
3649        // Get mock
3650        let mocks = state.mocks.read().await;
3651        let found = mocks.iter().find(|m| m.id == "test-1");
3652        assert!(found.is_some());
3653        assert_eq!(found.unwrap().name, "Test Mock");
3654    }
3655
3656    #[tokio::test]
3657    async fn test_server_stats() {
3658        let state = ManagementState::new(None, None, 3000);
3659
3660        // Add some mocks
3661        {
3662            let mut mocks = state.mocks.write().await;
3663            mocks.push(MockConfig {
3664                id: "1".to_string(),
3665                name: "Mock 1".to_string(),
3666                method: "GET".to_string(),
3667                path: "/test1".to_string(),
3668                response: MockResponse {
3669                    body: serde_json::json!({}),
3670                    headers: None,
3671                },
3672                enabled: true,
3673                latency_ms: None,
3674                status_code: Some(200),
3675                request_match: None,
3676                priority: None,
3677                scenario: None,
3678                required_scenario_state: None,
3679                new_scenario_state: None,
3680            });
3681            mocks.push(MockConfig {
3682                id: "2".to_string(),
3683                name: "Mock 2".to_string(),
3684                method: "POST".to_string(),
3685                path: "/test2".to_string(),
3686                response: MockResponse {
3687                    body: serde_json::json!({}),
3688                    headers: None,
3689                },
3690                enabled: false,
3691                latency_ms: None,
3692                status_code: Some(201),
3693                request_match: None,
3694                priority: None,
3695                scenario: None,
3696                required_scenario_state: None,
3697                new_scenario_state: None,
3698            });
3699        }
3700
3701        let mocks = state.mocks.read().await;
3702        assert_eq!(mocks.len(), 2);
3703        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3704    }
3705}