mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Message event types for real-time monitoring
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33    #[cfg(feature = "mqtt")]
34    /// MQTT message event
35    Mqtt(MqttMessageEvent),
36    #[cfg(feature = "kafka")]
37    /// Kafka message event
38    Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42/// MQTT message event for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45    /// MQTT topic name
46    pub topic: String,
47    /// Message payload content
48    pub payload: String,
49    /// Quality of Service level (0, 1, or 2)
50    pub qos: u8,
51    /// Whether the message is retained
52    pub retain: bool,
53    /// RFC3339 formatted timestamp
54    pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60    pub topic: String,
61    pub key: Option<String>,
62    pub value: String,
63    pub partition: i32,
64    pub offset: i64,
65    pub headers: Option<std::collections::HashMap<String, String>>,
66    pub timestamp: String,
67}
68
69/// Mock configuration representation
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72    /// Unique identifier for the mock
73    #[serde(skip_serializing_if = "String::is_empty")]
74    pub id: String,
75    /// Human-readable name for the mock
76    pub name: String,
77    /// HTTP method (GET, POST, etc.)
78    pub method: String,
79    /// API path pattern to match
80    pub path: String,
81    /// Response configuration
82    pub response: MockResponse,
83    /// Whether this mock is currently enabled
84    #[serde(default = "default_true")]
85    pub enabled: bool,
86    /// Optional latency to inject in milliseconds
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub latency_ms: Option<u64>,
89    /// Optional HTTP status code override
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub status_code: Option<u16>,
92    /// Request matching criteria (headers, query params, body patterns)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub request_match: Option<RequestMatchCriteria>,
95    /// Priority for mock ordering (higher priority mocks are matched first)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub priority: Option<i32>,
98    /// Scenario name for stateful mocking
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub scenario: Option<String>,
101    /// Required scenario state for this mock to be active
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub required_scenario_state: Option<String>,
104    /// New scenario state after this mock is matched
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113/// Mock response configuration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116    /// Response body as JSON
117    pub body: serde_json::Value,
118    /// Optional custom response headers
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123/// Request matching criteria for advanced request matching
124#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126    /// Headers that must be present and match (case-insensitive header names)
127    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128    pub headers: std::collections::HashMap<String, String>,
129    /// Query parameters that must be present and match
130    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131    pub query_params: std::collections::HashMap<String, String>,
132    /// Request body pattern (supports exact match or regex)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub body_pattern: Option<String>,
135    /// JSONPath expression for JSON body matching
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub json_path: Option<String>,
138    /// XPath expression for XML body matching
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub xpath: Option<String>,
141    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub custom_matcher: Option<String>,
144}
145
146/// Check if a request matches the given mock configuration
147///
148/// This function implements comprehensive request matching including:
149/// - Method and path matching
150/// - Header matching (with regex support)
151/// - Query parameter matching
152/// - Body pattern matching (exact, regex, JSONPath, XPath)
153/// - Custom matcher expressions
154pub fn mock_matches_request(
155    mock: &MockConfig,
156    method: &str,
157    path: &str,
158    headers: &std::collections::HashMap<String, String>,
159    query_params: &std::collections::HashMap<String, String>,
160    body: Option<&[u8]>,
161) -> bool {
162    use regex::Regex;
163
164    // Check if mock is enabled
165    if !mock.enabled {
166        return false;
167    }
168
169    // Check method (case-insensitive)
170    if mock.method.to_uppercase() != method.to_uppercase() {
171        return false;
172    }
173
174    // Check path pattern (supports wildcards and path parameters)
175    if !path_matches_pattern(&mock.path, path) {
176        return false;
177    }
178
179    // Check request matching criteria if present
180    if let Some(criteria) = &mock.request_match {
181        // Check headers
182        for (key, expected_value) in &criteria.headers {
183            let header_key_lower = key.to_lowercase();
184            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186            if let Some((_, actual_value)) = found {
187                // Try regex match first, then exact match
188                if let Ok(re) = Regex::new(expected_value) {
189                    if !re.is_match(actual_value) {
190                        return false;
191                    }
192                } else if actual_value != expected_value {
193                    return false;
194                }
195            } else {
196                return false; // Header not found
197            }
198        }
199
200        // Check query parameters
201        for (key, expected_value) in &criteria.query_params {
202            if let Some(actual_value) = query_params.get(key) {
203                if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Query param not found
208            }
209        }
210
211        // Check body pattern
212        if let Some(pattern) = &criteria.body_pattern {
213            if let Some(body_bytes) = body {
214                let body_str = String::from_utf8_lossy(body_bytes);
215                // Try regex first, then exact match
216                if let Ok(re) = Regex::new(pattern) {
217                    if !re.is_match(&body_str) {
218                        return false;
219                    }
220                } else if body_str.as_ref() != pattern {
221                    return false;
222                }
223            } else {
224                return false; // Body required but not present
225            }
226        }
227
228        // Check JSONPath (simplified implementation)
229        if let Some(json_path) = &criteria.json_path {
230            if let Some(body_bytes) = body {
231                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233                        // Simple JSONPath check
234                        if !json_path_exists(&json_value, json_path) {
235                            return false;
236                        }
237                    }
238                }
239            }
240        }
241
242        // Check XPath (placeholder - requires XML/XPath library for full implementation)
243        if let Some(_xpath) = &criteria.xpath {
244            // XPath matching would require an XML/XPath library
245            // For now, this is a placeholder that warns but doesn't fail
246            tracing::warn!("XPath matching not yet fully implemented");
247        }
248
249        // Check custom matcher
250        if let Some(custom) = &criteria.custom_matcher {
251            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252                return false;
253            }
254        }
255    }
256
257    true
258}
259
260/// Check if a path matches a pattern (supports wildcards and path parameters)
261fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262    // Exact match
263    if pattern == path {
264        return true;
265    }
266
267    // Wildcard match
268    if pattern == "*" {
269        return true;
270    }
271
272    // Path parameter matching (e.g., /users/{id} matches /users/123)
273    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276    if pattern_parts.len() != path_parts.len() {
277        // Check for wildcard patterns
278        if pattern.contains('*') {
279            return matches_wildcard_pattern(pattern, path);
280        }
281        return false;
282    }
283
284    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285        // Check for path parameters {param}
286        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287            continue; // Matches any value
288        }
289
290        if pattern_part != path_part {
291            return false;
292        }
293    }
294
295    true
296}
297
298/// Check if path matches a wildcard pattern
299fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300    use regex::Regex;
301
302    // Convert pattern to regex
303    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306        return re.is_match(path);
307    }
308
309    false
310}
311
312/// Check if a JSONPath exists in a JSON value (simplified implementation)
313fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
315    // This handles simple paths like $.field or $.field.subfield
316    if let Some(path) = json_path.strip_prefix("$.") {
317        let parts: Vec<&str> = path.split('.').collect();
318
319        let mut current = json;
320        for part in parts {
321            if let Some(obj) = current.as_object() {
322                if let Some(value) = obj.get(part) {
323                    current = value;
324                } else {
325                    return false;
326                }
327            } else {
328                return false;
329            }
330        }
331        true
332    } else {
333        // For complex JSONPath expressions, would need a proper JSONPath library
334        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
335        false
336    }
337}
338
339/// Evaluate a custom matcher expression
340fn evaluate_custom_matcher(
341    expression: &str,
342    method: &str,
343    path: &str,
344    headers: &std::collections::HashMap<String, String>,
345    query_params: &std::collections::HashMap<String, String>,
346    body: Option<&[u8]>,
347) -> bool {
348    use regex::Regex;
349
350    let expr = expression.trim();
351
352    // Handle equality expressions (field == "value")
353    if expr.contains("==") {
354        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
355        if parts.len() != 2 {
356            return false;
357        }
358
359        let field = parts[0];
360        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
361
362        match field {
363            "method" => method == expected_value,
364            "path" => path == expected_value,
365            _ if field.starts_with("headers.") => {
366                let header_name = &field[8..];
367                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
368            }
369            _ if field.starts_with("query.") => {
370                let param_name = &field[6..];
371                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
372            }
373            _ => false,
374        }
375    }
376    // Handle regex match expressions (field =~ "pattern")
377    else if expr.contains("=~") {
378        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
379        if parts.len() != 2 {
380            return false;
381        }
382
383        let field = parts[0];
384        let pattern = parts[1].trim_matches('"').trim_matches('\'');
385
386        if let Ok(re) = Regex::new(pattern) {
387            match field {
388                "method" => re.is_match(method),
389                "path" => re.is_match(path),
390                _ if field.starts_with("headers.") => {
391                    let header_name = &field[8..];
392                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
393                }
394                _ if field.starts_with("query.") => {
395                    let param_name = &field[6..];
396                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
397                }
398                _ => false,
399            }
400        } else {
401            false
402        }
403    }
404    // Handle contains expressions (field contains "value")
405    else if expr.contains("contains") {
406        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
407        if parts.len() != 2 {
408            return false;
409        }
410
411        let field = parts[0];
412        let search_value = parts[1].trim_matches('"').trim_matches('\'');
413
414        match field {
415            "path" => path.contains(search_value),
416            _ if field.starts_with("headers.") => {
417                let header_name = &field[8..];
418                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
419            }
420            _ if field.starts_with("body") => {
421                if let Some(body_bytes) = body {
422                    let body_str = String::from_utf8_lossy(body_bytes);
423                    body_str.contains(search_value)
424                } else {
425                    false
426                }
427            }
428            _ => false,
429        }
430    } else {
431        // Unknown expression format
432        tracing::warn!("Unknown custom matcher expression format: {}", expr);
433        false
434    }
435}
436
437/// Server statistics
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ServerStats {
440    /// Server uptime in seconds
441    pub uptime_seconds: u64,
442    /// Total number of requests processed
443    pub total_requests: u64,
444    /// Number of active mock configurations
445    pub active_mocks: usize,
446    /// Number of currently enabled mocks
447    pub enabled_mocks: usize,
448    /// Number of registered API routes
449    pub registered_routes: usize,
450}
451
452/// Server configuration info
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ServerConfig {
455    /// MockForge version string
456    pub version: String,
457    /// Server port number
458    pub port: u16,
459    /// Whether an OpenAPI spec is loaded
460    pub has_openapi_spec: bool,
461    /// Optional path to the OpenAPI spec file
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub spec_path: Option<String>,
464}
465
466/// Shared state for the management API
467#[derive(Clone)]
468pub struct ManagementState {
469    /// Collection of mock configurations
470    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
471    /// Optional OpenAPI specification
472    pub spec: Option<Arc<OpenApiSpec>>,
473    /// Optional path to the OpenAPI spec file
474    pub spec_path: Option<String>,
475    /// Server port number
476    pub port: u16,
477    /// Server start time for uptime calculation
478    pub start_time: std::time::Instant,
479    /// Counter for total requests processed
480    pub request_counter: Arc<RwLock<u64>>,
481    /// Optional proxy configuration for migration pipeline
482    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
483    /// Optional SMTP registry for email mocking
484    #[cfg(feature = "smtp")]
485    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
486    /// Optional MQTT broker for message mocking
487    #[cfg(feature = "mqtt")]
488    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
489    /// Optional Kafka broker for event streaming
490    #[cfg(feature = "kafka")]
491    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
492    /// Broadcast channel for message events (MQTT & Kafka)
493    #[cfg(any(feature = "mqtt", feature = "kafka"))]
494    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
495    /// State machine manager for scenario state machines
496    pub state_machine_manager:
497        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
498    /// Optional WebSocket broadcast channel for real-time updates
499    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
500    /// Lifecycle hook registry for extensibility
501    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
502    /// Rule explanations storage (in-memory for now)
503    pub rule_explanations: Arc<
504        RwLock<
505            std::collections::HashMap<
506                String,
507                mockforge_core::intelligent_behavior::RuleExplanation,
508            >,
509        >,
510    >,
511    /// Optional chaos API state for chaos config management
512    #[cfg(feature = "chaos")]
513    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
514    /// Optional server configuration for profile application
515    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
516}
517
518impl ManagementState {
519    /// Create a new management state
520    ///
521    /// # Arguments
522    /// * `spec` - Optional OpenAPI specification
523    /// * `spec_path` - Optional path to the OpenAPI spec file
524    /// * `port` - Server port number
525    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
526        Self {
527            mocks: Arc::new(RwLock::new(Vec::new())),
528            spec,
529            spec_path,
530            port,
531            start_time: std::time::Instant::now(),
532            request_counter: Arc::new(RwLock::new(0)),
533            proxy_config: None,
534            #[cfg(feature = "smtp")]
535            smtp_registry: None,
536            #[cfg(feature = "mqtt")]
537            mqtt_broker: None,
538            #[cfg(feature = "kafka")]
539            kafka_broker: None,
540            #[cfg(any(feature = "mqtt", feature = "kafka"))]
541            message_events: {
542                let (tx, _) = broadcast::channel(1000);
543                Arc::new(tx)
544            },
545            state_machine_manager: Arc::new(RwLock::new(
546                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
547            )),
548            ws_broadcast: None,
549            lifecycle_hooks: None,
550            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
551            #[cfg(feature = "chaos")]
552            chaos_api_state: None,
553            server_config: None,
554        }
555    }
556
557    /// Add lifecycle hook registry to management state
558    pub fn with_lifecycle_hooks(
559        mut self,
560        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
561    ) -> Self {
562        self.lifecycle_hooks = Some(hooks);
563        self
564    }
565
566    /// Add WebSocket broadcast channel to management state
567    pub fn with_ws_broadcast(
568        mut self,
569        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
570    ) -> Self {
571        self.ws_broadcast = Some(ws_broadcast);
572        self
573    }
574
575    /// Add proxy configuration to management state
576    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
577        self.proxy_config = Some(proxy_config);
578        self
579    }
580
581    #[cfg(feature = "smtp")]
582    /// Add SMTP registry to management state
583    pub fn with_smtp_registry(
584        mut self,
585        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
586    ) -> Self {
587        self.smtp_registry = Some(smtp_registry);
588        self
589    }
590
591    #[cfg(feature = "mqtt")]
592    /// Add MQTT broker to management state
593    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
594        self.mqtt_broker = Some(mqtt_broker);
595        self
596    }
597
598    #[cfg(feature = "kafka")]
599    /// Add Kafka broker to management state
600    pub fn with_kafka_broker(
601        mut self,
602        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
603    ) -> Self {
604        self.kafka_broker = Some(kafka_broker);
605        self
606    }
607
608    #[cfg(feature = "chaos")]
609    /// Add chaos API state to management state
610    pub fn with_chaos_api_state(
611        mut self,
612        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
613    ) -> Self {
614        self.chaos_api_state = Some(chaos_api_state);
615        self
616    }
617
618    /// Add server configuration to management state
619    pub fn with_server_config(
620        mut self,
621        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
622    ) -> Self {
623        self.server_config = Some(server_config);
624        self
625    }
626}
627
628/// List all mocks
629async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
630    let mocks = state.mocks.read().await;
631    Json(serde_json::json!({
632        "mocks": *mocks,
633        "total": mocks.len(),
634        "enabled": mocks.iter().filter(|m| m.enabled).count()
635    }))
636}
637
638/// Get a specific mock by ID
639async fn get_mock(
640    State(state): State<ManagementState>,
641    Path(id): Path<String>,
642) -> Result<Json<MockConfig>, StatusCode> {
643    let mocks = state.mocks.read().await;
644    mocks
645        .iter()
646        .find(|m| m.id == id)
647        .cloned()
648        .map(Json)
649        .ok_or(StatusCode::NOT_FOUND)
650}
651
652/// Create a new mock
653async fn create_mock(
654    State(state): State<ManagementState>,
655    Json(mut mock): Json<MockConfig>,
656) -> Result<Json<MockConfig>, StatusCode> {
657    let mut mocks = state.mocks.write().await;
658
659    // Generate ID if not provided
660    if mock.id.is_empty() {
661        mock.id = uuid::Uuid::new_v4().to_string();
662    }
663
664    // Check for duplicate ID
665    if mocks.iter().any(|m| m.id == mock.id) {
666        return Err(StatusCode::CONFLICT);
667    }
668
669    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
670
671    // Invoke lifecycle hooks
672    if let Some(hooks) = &state.lifecycle_hooks {
673        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
674            id: mock.id.clone(),
675            name: mock.name.clone(),
676            config: serde_json::to_value(&mock).unwrap_or_default(),
677        };
678        hooks.invoke_mock_created(&event).await;
679    }
680
681    mocks.push(mock.clone());
682
683    // Broadcast WebSocket event
684    if let Some(tx) = &state.ws_broadcast {
685        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
686    }
687
688    Ok(Json(mock))
689}
690
691/// Update an existing mock
692async fn update_mock(
693    State(state): State<ManagementState>,
694    Path(id): Path<String>,
695    Json(updated_mock): Json<MockConfig>,
696) -> Result<Json<MockConfig>, StatusCode> {
697    let mut mocks = state.mocks.write().await;
698
699    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
700
701    // Get old mock for comparison
702    let old_mock = mocks[position].clone();
703
704    info!("Updating mock: {}", id);
705    mocks[position] = updated_mock.clone();
706
707    // Invoke lifecycle hooks
708    if let Some(hooks) = &state.lifecycle_hooks {
709        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
710            id: updated_mock.id.clone(),
711            name: updated_mock.name.clone(),
712            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
713        };
714        hooks.invoke_mock_updated(&event).await;
715
716        // Check if enabled state changed
717        if old_mock.enabled != updated_mock.enabled {
718            let state_event = if updated_mock.enabled {
719                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
720                    id: updated_mock.id.clone(),
721                }
722            } else {
723                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
724                    id: updated_mock.id.clone(),
725                }
726            };
727            hooks.invoke_mock_state_changed(&state_event).await;
728        }
729    }
730
731    // Broadcast WebSocket event
732    if let Some(tx) = &state.ws_broadcast {
733        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
734    }
735
736    Ok(Json(updated_mock))
737}
738
739/// Delete a mock
740async fn delete_mock(
741    State(state): State<ManagementState>,
742    Path(id): Path<String>,
743) -> Result<StatusCode, StatusCode> {
744    let mut mocks = state.mocks.write().await;
745
746    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
747
748    // Get mock info before deletion for lifecycle hooks
749    let deleted_mock = mocks[position].clone();
750
751    info!("Deleting mock: {}", id);
752    mocks.remove(position);
753
754    // Invoke lifecycle hooks
755    if let Some(hooks) = &state.lifecycle_hooks {
756        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
757            id: deleted_mock.id.clone(),
758            name: deleted_mock.name.clone(),
759        };
760        hooks.invoke_mock_deleted(&event).await;
761    }
762
763    // Broadcast WebSocket event
764    if let Some(tx) = &state.ws_broadcast {
765        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
766    }
767
768    Ok(StatusCode::NO_CONTENT)
769}
770
771/// Request to validate configuration
772#[derive(Debug, Deserialize)]
773pub struct ValidateConfigRequest {
774    /// Configuration to validate (as JSON)
775    pub config: serde_json::Value,
776    /// Format of the configuration ("json" or "yaml")
777    #[serde(default = "default_format")]
778    pub format: String,
779}
780
781fn default_format() -> String {
782    "json".to_string()
783}
784
785/// Validate configuration without applying it
786async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
787    use mockforge_core::config::ServerConfig;
788
789    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
790        "yaml" | "yml" => {
791            let yaml_str = match serde_json::to_string(&request.config) {
792                Ok(s) => s,
793                Err(e) => {
794                    return (
795                        StatusCode::BAD_REQUEST,
796                        Json(serde_json::json!({
797                            "valid": false,
798                            "error": format!("Failed to convert to string: {}", e),
799                            "message": "Configuration validation failed"
800                        })),
801                    )
802                        .into_response();
803                }
804            };
805            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
806        }
807        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
808    };
809
810    match config_result {
811        Ok(_) => Json(serde_json::json!({
812            "valid": true,
813            "message": "Configuration is valid"
814        }))
815        .into_response(),
816        Err(e) => (
817            StatusCode::BAD_REQUEST,
818            Json(serde_json::json!({
819                "valid": false,
820                "error": format!("Invalid configuration: {}", e),
821                "message": "Configuration validation failed"
822            })),
823        )
824            .into_response(),
825    }
826}
827
828/// Request for bulk configuration update
829#[derive(Debug, Deserialize)]
830pub struct BulkConfigUpdateRequest {
831    /// Partial configuration updates (only specified fields will be updated)
832    pub updates: serde_json::Value,
833}
834
835/// Bulk update configuration
836///
837/// This endpoint allows updating multiple configuration options at once.
838/// Only the specified fields in the updates object will be modified.
839///
840/// Configuration updates are applied to the server configuration if available
841/// in ManagementState. Changes take effect immediately for supported settings.
842async fn bulk_update_config(
843    State(state): State<ManagementState>,
844    Json(request): Json<BulkConfigUpdateRequest>,
845) -> impl IntoResponse {
846    // Validate the updates structure
847    if !request.updates.is_object() {
848        return (
849            StatusCode::BAD_REQUEST,
850            Json(serde_json::json!({
851                "error": "Invalid request",
852                "message": "Updates must be a JSON object"
853            })),
854        )
855            .into_response();
856    }
857
858    // Try to validate as partial ServerConfig
859    use mockforge_core::config::ServerConfig;
860
861    // Create a minimal valid config and try to merge updates
862    let base_config = ServerConfig::default();
863    let base_json = match serde_json::to_value(&base_config) {
864        Ok(v) => v,
865        Err(e) => {
866            return (
867                StatusCode::INTERNAL_SERVER_ERROR,
868                Json(serde_json::json!({
869                    "error": "Internal error",
870                    "message": format!("Failed to serialize base config: {}", e)
871                })),
872            )
873                .into_response();
874        }
875    };
876
877    // Merge updates into base config (simplified merge)
878    let mut merged = base_json.clone();
879    if let (Some(merged_obj), Some(updates_obj)) =
880        (merged.as_object_mut(), request.updates.as_object())
881    {
882        for (key, value) in updates_obj {
883            merged_obj.insert(key.clone(), value.clone());
884        }
885    }
886
887    // Validate the merged config
888    match serde_json::from_value::<ServerConfig>(merged) {
889        Ok(_) => {
890            // Config is valid
891            // Note: Runtime application of config changes would require:
892            // 1. Storing ServerConfig in ManagementState
893            // 2. Implementing hot-reload mechanism for server configuration
894            // 3. Updating router state and middleware based on new config
895            // For now, this endpoint only validates the configuration structure
896            Json(serde_json::json!({
897                "success": true,
898                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
899                "updates_received": request.updates,
900                "validated": true
901            }))
902            .into_response()
903        }
904        Err(e) => (
905            StatusCode::BAD_REQUEST,
906            Json(serde_json::json!({
907                "error": "Invalid configuration",
908                "message": format!("Configuration validation failed: {}", e),
909                "validated": false
910            })),
911        )
912            .into_response(),
913    }
914}
915
916/// Get server statistics
917async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
918    let mocks = state.mocks.read().await;
919    let request_count = *state.request_counter.read().await;
920
921    Json(ServerStats {
922        uptime_seconds: state.start_time.elapsed().as_secs(),
923        total_requests: request_count,
924        active_mocks: mocks.len(),
925        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
926        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
927    })
928}
929
930/// Get server configuration
931async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
932    Json(ServerConfig {
933        version: env!("CARGO_PKG_VERSION").to_string(),
934        port: state.port,
935        has_openapi_spec: state.spec.is_some(),
936        spec_path: state.spec_path.clone(),
937    })
938}
939
940/// Health check endpoint
941async fn health_check() -> Json<serde_json::Value> {
942    Json(serde_json::json!({
943        "status": "healthy",
944        "service": "mockforge-management",
945        "timestamp": chrono::Utc::now().to_rfc3339()
946    }))
947}
948
949/// Export format for mock configurations
950#[derive(Debug, Clone, Serialize, Deserialize)]
951#[serde(rename_all = "lowercase")]
952pub enum ExportFormat {
953    /// JSON format
954    Json,
955    /// YAML format
956    Yaml,
957}
958
959/// Export mocks in specified format
960async fn export_mocks(
961    State(state): State<ManagementState>,
962    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
963) -> Result<(StatusCode, String), StatusCode> {
964    let mocks = state.mocks.read().await;
965
966    let format = params
967        .get("format")
968        .map(|f| match f.as_str() {
969            "yaml" | "yml" => ExportFormat::Yaml,
970            _ => ExportFormat::Json,
971        })
972        .unwrap_or(ExportFormat::Json);
973
974    match format {
975        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
976            .map(|json| (StatusCode::OK, json))
977            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
979            .map(|yaml| (StatusCode::OK, yaml))
980            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
981    }
982}
983
984/// Import mocks from JSON/YAML
985async fn import_mocks(
986    State(state): State<ManagementState>,
987    Json(mocks): Json<Vec<MockConfig>>,
988) -> impl IntoResponse {
989    let mut current_mocks = state.mocks.write().await;
990    current_mocks.clear();
991    current_mocks.extend(mocks);
992    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
993}
994
995#[cfg(feature = "smtp")]
996/// List SMTP emails in mailbox
997async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
998    if let Some(ref smtp_registry) = state.smtp_registry {
999        match smtp_registry.get_emails() {
1000            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1001            Err(e) => (
1002                StatusCode::INTERNAL_SERVER_ERROR,
1003                Json(serde_json::json!({
1004                    "error": "Failed to retrieve emails",
1005                    "message": e.to_string()
1006                })),
1007            ),
1008        }
1009    } else {
1010        (
1011            StatusCode::NOT_IMPLEMENTED,
1012            Json(serde_json::json!({
1013                "error": "SMTP mailbox management not available",
1014                "message": "SMTP server is not enabled or registry not available."
1015            })),
1016        )
1017    }
1018}
1019
1020/// Get specific SMTP email
1021#[cfg(feature = "smtp")]
1022async fn get_smtp_email(
1023    State(state): State<ManagementState>,
1024    Path(id): Path<String>,
1025) -> impl IntoResponse {
1026    if let Some(ref smtp_registry) = state.smtp_registry {
1027        match smtp_registry.get_email_by_id(&id) {
1028            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1029            Ok(None) => (
1030                StatusCode::NOT_FOUND,
1031                Json(serde_json::json!({
1032                    "error": "Email not found",
1033                    "id": id
1034                })),
1035            ),
1036            Err(e) => (
1037                StatusCode::INTERNAL_SERVER_ERROR,
1038                Json(serde_json::json!({
1039                    "error": "Failed to retrieve email",
1040                    "message": e.to_string()
1041                })),
1042            ),
1043        }
1044    } else {
1045        (
1046            StatusCode::NOT_IMPLEMENTED,
1047            Json(serde_json::json!({
1048                "error": "SMTP mailbox management not available",
1049                "message": "SMTP server is not enabled or registry not available."
1050            })),
1051        )
1052    }
1053}
1054
1055/// Clear SMTP mailbox
1056#[cfg(feature = "smtp")]
1057async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1058    if let Some(ref smtp_registry) = state.smtp_registry {
1059        match smtp_registry.clear_mailbox() {
1060            Ok(()) => (
1061                StatusCode::OK,
1062                Json(serde_json::json!({
1063                    "message": "Mailbox cleared successfully"
1064                })),
1065            ),
1066            Err(e) => (
1067                StatusCode::INTERNAL_SERVER_ERROR,
1068                Json(serde_json::json!({
1069                    "error": "Failed to clear mailbox",
1070                    "message": e.to_string()
1071                })),
1072            ),
1073        }
1074    } else {
1075        (
1076            StatusCode::NOT_IMPLEMENTED,
1077            Json(serde_json::json!({
1078                "error": "SMTP mailbox management not available",
1079                "message": "SMTP server is not enabled or registry not available."
1080            })),
1081        )
1082    }
1083}
1084
1085/// Export SMTP mailbox
1086#[cfg(feature = "smtp")]
1087async fn export_smtp_mailbox(
1088    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1089) -> impl IntoResponse {
1090    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1091    (
1092        StatusCode::NOT_IMPLEMENTED,
1093        Json(serde_json::json!({
1094            "error": "SMTP mailbox management not available via HTTP API",
1095            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1096            "requested_format": format
1097        })),
1098    )
1099}
1100
1101/// Search SMTP emails
1102#[cfg(feature = "smtp")]
1103async fn search_smtp_emails(
1104    State(state): State<ManagementState>,
1105    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1106) -> impl IntoResponse {
1107    if let Some(ref smtp_registry) = state.smtp_registry {
1108        let filters = EmailSearchFilters {
1109            sender: params.get("sender").cloned(),
1110            recipient: params.get("recipient").cloned(),
1111            subject: params.get("subject").cloned(),
1112            body: params.get("body").cloned(),
1113            since: params
1114                .get("since")
1115                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1116                .map(|dt| dt.with_timezone(&chrono::Utc)),
1117            until: params
1118                .get("until")
1119                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1120                .map(|dt| dt.with_timezone(&chrono::Utc)),
1121            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1122            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1123        };
1124
1125        match smtp_registry.search_emails(filters) {
1126            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1127            Err(e) => (
1128                StatusCode::INTERNAL_SERVER_ERROR,
1129                Json(serde_json::json!({
1130                    "error": "Failed to search emails",
1131                    "message": e.to_string()
1132                })),
1133            ),
1134        }
1135    } else {
1136        (
1137            StatusCode::NOT_IMPLEMENTED,
1138            Json(serde_json::json!({
1139                "error": "SMTP mailbox management not available",
1140                "message": "SMTP server is not enabled or registry not available."
1141            })),
1142        )
1143    }
1144}
1145
1146/// MQTT broker statistics
1147#[cfg(feature = "mqtt")]
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1149pub struct MqttBrokerStats {
1150    /// Number of connected MQTT clients
1151    pub connected_clients: usize,
1152    /// Number of active MQTT topics
1153    pub active_topics: usize,
1154    /// Number of retained messages
1155    pub retained_messages: usize,
1156    /// Total number of subscriptions
1157    pub total_subscriptions: usize,
1158}
1159
1160/// MQTT management handlers
1161#[cfg(feature = "mqtt")]
1162async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1163    if let Some(broker) = &state.mqtt_broker {
1164        let connected_clients = broker.get_connected_clients().await.len();
1165        let active_topics = broker.get_active_topics().await.len();
1166        let stats = broker.get_topic_stats().await;
1167
1168        let broker_stats = MqttBrokerStats {
1169            connected_clients,
1170            active_topics,
1171            retained_messages: stats.retained_messages,
1172            total_subscriptions: stats.total_subscriptions,
1173        };
1174
1175        Json(broker_stats).into_response()
1176    } else {
1177        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1178    }
1179}
1180
1181#[cfg(feature = "mqtt")]
1182async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1183    if let Some(broker) = &state.mqtt_broker {
1184        let clients = broker.get_connected_clients().await;
1185        Json(serde_json::json!({
1186            "clients": clients
1187        }))
1188        .into_response()
1189    } else {
1190        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191    }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1196    if let Some(broker) = &state.mqtt_broker {
1197        let topics = broker.get_active_topics().await;
1198        Json(serde_json::json!({
1199            "topics": topics
1200        }))
1201        .into_response()
1202    } else {
1203        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204    }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn disconnect_mqtt_client(
1209    State(state): State<ManagementState>,
1210    Path(client_id): Path<String>,
1211) -> impl IntoResponse {
1212    if let Some(broker) = &state.mqtt_broker {
1213        match broker.disconnect_client(&client_id).await {
1214            Ok(_) => {
1215                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1216            }
1217            Err(e) => {
1218                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1219                    .into_response()
1220            }
1221        }
1222    } else {
1223        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1224    }
1225}
1226
1227// ========== MQTT Publish Handler ==========
1228
1229#[cfg(feature = "mqtt")]
1230/// Request to publish a single MQTT message
1231#[derive(Debug, Deserialize)]
1232pub struct MqttPublishRequest {
1233    /// Topic to publish to
1234    pub topic: String,
1235    /// Message payload (string or JSON)
1236    pub payload: String,
1237    /// QoS level (0, 1, or 2)
1238    #[serde(default = "default_qos")]
1239    pub qos: u8,
1240    /// Whether to retain the message
1241    #[serde(default)]
1242    pub retain: bool,
1243}
1244
1245#[cfg(feature = "mqtt")]
1246fn default_qos() -> u8 {
1247    0
1248}
1249
1250#[cfg(feature = "mqtt")]
1251/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1252async fn publish_mqtt_message_handler(
1253    State(state): State<ManagementState>,
1254    Json(request): Json<serde_json::Value>,
1255) -> impl IntoResponse {
1256    // Extract fields from JSON manually
1257    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1258    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1259    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1260    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1261
1262    if topic.is_none() || payload.is_none() {
1263        return (
1264            StatusCode::BAD_REQUEST,
1265            Json(serde_json::json!({
1266                "error": "Invalid request",
1267                "message": "Missing required fields: topic and payload"
1268            })),
1269        );
1270    }
1271
1272    let topic = topic.unwrap();
1273    let payload = payload.unwrap();
1274
1275    if let Some(broker) = &state.mqtt_broker {
1276        // Validate QoS
1277        if qos > 2 {
1278            return (
1279                StatusCode::BAD_REQUEST,
1280                Json(serde_json::json!({
1281                    "error": "Invalid QoS",
1282                    "message": "QoS must be 0, 1, or 2"
1283                })),
1284            );
1285        }
1286
1287        // Convert payload to bytes
1288        let payload_bytes = payload.as_bytes().to_vec();
1289        let client_id = "mockforge-management-api".to_string();
1290
1291        let publish_result = broker
1292            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1293            .await
1294            .map_err(|e| format!("{}", e));
1295
1296        match publish_result {
1297            Ok(_) => {
1298                // Emit message event for real-time monitoring
1299                let event = MessageEvent::Mqtt(MqttMessageEvent {
1300                    topic: topic.clone(),
1301                    payload: payload.clone(),
1302                    qos,
1303                    retain,
1304                    timestamp: chrono::Utc::now().to_rfc3339(),
1305                });
1306                let _ = state.message_events.send(event);
1307
1308                (
1309                    StatusCode::OK,
1310                    Json(serde_json::json!({
1311                        "success": true,
1312                        "message": format!("Message published to topic '{}'", topic),
1313                        "topic": topic,
1314                        "qos": qos,
1315                        "retain": retain
1316                    })),
1317                )
1318            }
1319            Err(error_msg) => (
1320                StatusCode::INTERNAL_SERVER_ERROR,
1321                Json(serde_json::json!({
1322                    "error": "Failed to publish message",
1323                    "message": error_msg
1324                })),
1325            ),
1326        }
1327    } else {
1328        (
1329            StatusCode::SERVICE_UNAVAILABLE,
1330            Json(serde_json::json!({
1331                "error": "MQTT broker not available",
1332                "message": "MQTT broker is not enabled or not available."
1333            })),
1334        )
1335    }
1336}
1337
1338#[cfg(not(feature = "mqtt"))]
1339/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1340async fn publish_mqtt_message_handler(
1341    State(_state): State<ManagementState>,
1342    Json(_request): Json<serde_json::Value>,
1343) -> impl IntoResponse {
1344    (
1345        StatusCode::SERVICE_UNAVAILABLE,
1346        Json(serde_json::json!({
1347            "error": "MQTT feature not enabled",
1348            "message": "MQTT support is not compiled into this build"
1349        })),
1350    )
1351}
1352
1353#[cfg(feature = "mqtt")]
1354/// Request to publish multiple MQTT messages
1355#[derive(Debug, Deserialize)]
1356pub struct MqttBatchPublishRequest {
1357    /// List of messages to publish
1358    pub messages: Vec<MqttPublishRequest>,
1359    /// Delay between messages in milliseconds
1360    #[serde(default = "default_delay")]
1361    pub delay_ms: u64,
1362}
1363
1364#[cfg(feature = "mqtt")]
1365fn default_delay() -> u64 {
1366    100
1367}
1368
1369#[cfg(feature = "mqtt")]
1370/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1371async fn publish_mqtt_batch_handler(
1372    State(state): State<ManagementState>,
1373    Json(request): Json<serde_json::Value>,
1374) -> impl IntoResponse {
1375    // Extract fields from JSON manually
1376    let messages_json = request.get("messages").and_then(|v| v.as_array());
1377    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1378
1379    if messages_json.is_none() {
1380        return (
1381            StatusCode::BAD_REQUEST,
1382            Json(serde_json::json!({
1383                "error": "Invalid request",
1384                "message": "Missing required field: messages"
1385            })),
1386        );
1387    }
1388
1389    let messages_json = messages_json.unwrap();
1390
1391    if let Some(broker) = &state.mqtt_broker {
1392        if messages_json.is_empty() {
1393            return (
1394                StatusCode::BAD_REQUEST,
1395                Json(serde_json::json!({
1396                    "error": "Empty batch",
1397                    "message": "At least one message is required"
1398                })),
1399            );
1400        }
1401
1402        let mut results = Vec::new();
1403        let client_id = "mockforge-management-api".to_string();
1404
1405        for (index, msg_json) in messages_json.iter().enumerate() {
1406            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1407            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1408            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1409            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1410
1411            if topic.is_none() || payload.is_none() {
1412                results.push(serde_json::json!({
1413                    "index": index,
1414                    "success": false,
1415                    "error": "Missing required fields: topic and payload"
1416                }));
1417                continue;
1418            }
1419
1420            let topic = topic.unwrap();
1421            let payload = payload.unwrap();
1422
1423            // Validate QoS
1424            if qos > 2 {
1425                results.push(serde_json::json!({
1426                    "index": index,
1427                    "success": false,
1428                    "error": "Invalid QoS (must be 0, 1, or 2)"
1429                }));
1430                continue;
1431            }
1432
1433            // Convert payload to bytes
1434            let payload_bytes = payload.as_bytes().to_vec();
1435
1436            let publish_result = broker
1437                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1438                .await
1439                .map_err(|e| format!("{}", e));
1440
1441            match publish_result {
1442                Ok(_) => {
1443                    // Emit message event
1444                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1445                        topic: topic.clone(),
1446                        payload: payload.clone(),
1447                        qos,
1448                        retain,
1449                        timestamp: chrono::Utc::now().to_rfc3339(),
1450                    });
1451                    let _ = state.message_events.send(event);
1452
1453                    results.push(serde_json::json!({
1454                        "index": index,
1455                        "success": true,
1456                        "topic": topic,
1457                        "qos": qos
1458                    }));
1459                }
1460                Err(error_msg) => {
1461                    results.push(serde_json::json!({
1462                        "index": index,
1463                        "success": false,
1464                        "error": error_msg
1465                    }));
1466                }
1467            }
1468
1469            // Add delay between messages (except for the last one)
1470            if index < messages_json.len() - 1 && delay_ms > 0 {
1471                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1472            }
1473        }
1474
1475        let success_count =
1476            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1477
1478        (
1479            StatusCode::OK,
1480            Json(serde_json::json!({
1481                "success": true,
1482                "total": messages_json.len(),
1483                "succeeded": success_count,
1484                "failed": messages_json.len() - success_count,
1485                "results": results
1486            })),
1487        )
1488    } else {
1489        (
1490            StatusCode::SERVICE_UNAVAILABLE,
1491            Json(serde_json::json!({
1492                "error": "MQTT broker not available",
1493                "message": "MQTT broker is not enabled or not available."
1494            })),
1495        )
1496    }
1497}
1498
1499#[cfg(not(feature = "mqtt"))]
1500/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1501async fn publish_mqtt_batch_handler(
1502    State(_state): State<ManagementState>,
1503    Json(_request): Json<serde_json::Value>,
1504) -> impl IntoResponse {
1505    (
1506        StatusCode::SERVICE_UNAVAILABLE,
1507        Json(serde_json::json!({
1508            "error": "MQTT feature not enabled",
1509            "message": "MQTT support is not compiled into this build"
1510        })),
1511    )
1512}
1513
1514// Migration pipeline handlers
1515
1516/// Request to set migration mode
1517#[derive(Debug, Deserialize)]
1518struct SetMigrationModeRequest {
1519    mode: String,
1520}
1521
1522/// Get all migration routes
1523async fn get_migration_routes(
1524    State(state): State<ManagementState>,
1525) -> Result<Json<serde_json::Value>, StatusCode> {
1526    let proxy_config = match &state.proxy_config {
1527        Some(config) => config,
1528        None => {
1529            return Ok(Json(serde_json::json!({
1530                "error": "Migration not configured. Proxy config not available."
1531            })));
1532        }
1533    };
1534
1535    let config = proxy_config.read().await;
1536    let routes = config.get_migration_routes();
1537
1538    Ok(Json(serde_json::json!({
1539        "routes": routes
1540    })))
1541}
1542
1543/// Toggle a route's migration mode
1544async fn toggle_route_migration(
1545    State(state): State<ManagementState>,
1546    Path(pattern): Path<String>,
1547) -> Result<Json<serde_json::Value>, StatusCode> {
1548    let proxy_config = match &state.proxy_config {
1549        Some(config) => config,
1550        None => {
1551            return Ok(Json(serde_json::json!({
1552                "error": "Migration not configured. Proxy config not available."
1553            })));
1554        }
1555    };
1556
1557    let mut config = proxy_config.write().await;
1558    let new_mode = match config.toggle_route_migration(&pattern) {
1559        Some(mode) => mode,
1560        None => {
1561            return Ok(Json(serde_json::json!({
1562                "error": format!("Route pattern not found: {}", pattern)
1563            })));
1564        }
1565    };
1566
1567    Ok(Json(serde_json::json!({
1568        "pattern": pattern,
1569        "mode": format!("{:?}", new_mode).to_lowercase()
1570    })))
1571}
1572
1573/// Set a route's migration mode explicitly
1574async fn set_route_migration_mode(
1575    State(state): State<ManagementState>,
1576    Path(pattern): Path<String>,
1577    Json(request): Json<SetMigrationModeRequest>,
1578) -> Result<Json<serde_json::Value>, StatusCode> {
1579    let proxy_config = match &state.proxy_config {
1580        Some(config) => config,
1581        None => {
1582            return Ok(Json(serde_json::json!({
1583                "error": "Migration not configured. Proxy config not available."
1584            })));
1585        }
1586    };
1587
1588    use mockforge_core::proxy::config::MigrationMode;
1589    let mode = match request.mode.to_lowercase().as_str() {
1590        "mock" => MigrationMode::Mock,
1591        "shadow" => MigrationMode::Shadow,
1592        "real" => MigrationMode::Real,
1593        "auto" => MigrationMode::Auto,
1594        _ => {
1595            return Ok(Json(serde_json::json!({
1596                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1597            })));
1598        }
1599    };
1600
1601    let mut config = proxy_config.write().await;
1602    let updated = config.update_rule_migration_mode(&pattern, mode);
1603
1604    if !updated {
1605        return Ok(Json(serde_json::json!({
1606            "error": format!("Route pattern not found: {}", pattern)
1607        })));
1608    }
1609
1610    Ok(Json(serde_json::json!({
1611        "pattern": pattern,
1612        "mode": format!("{:?}", mode).to_lowercase()
1613    })))
1614}
1615
1616/// Toggle a group's migration mode
1617async fn toggle_group_migration(
1618    State(state): State<ManagementState>,
1619    Path(group): Path<String>,
1620) -> Result<Json<serde_json::Value>, StatusCode> {
1621    let proxy_config = match &state.proxy_config {
1622        Some(config) => config,
1623        None => {
1624            return Ok(Json(serde_json::json!({
1625                "error": "Migration not configured. Proxy config not available."
1626            })));
1627        }
1628    };
1629
1630    let mut config = proxy_config.write().await;
1631    let new_mode = config.toggle_group_migration(&group);
1632
1633    Ok(Json(serde_json::json!({
1634        "group": group,
1635        "mode": format!("{:?}", new_mode).to_lowercase()
1636    })))
1637}
1638
1639/// Set a group's migration mode explicitly
1640async fn set_group_migration_mode(
1641    State(state): State<ManagementState>,
1642    Path(group): Path<String>,
1643    Json(request): Json<SetMigrationModeRequest>,
1644) -> Result<Json<serde_json::Value>, StatusCode> {
1645    let proxy_config = match &state.proxy_config {
1646        Some(config) => config,
1647        None => {
1648            return Ok(Json(serde_json::json!({
1649                "error": "Migration not configured. Proxy config not available."
1650            })));
1651        }
1652    };
1653
1654    use mockforge_core::proxy::config::MigrationMode;
1655    let mode = match request.mode.to_lowercase().as_str() {
1656        "mock" => MigrationMode::Mock,
1657        "shadow" => MigrationMode::Shadow,
1658        "real" => MigrationMode::Real,
1659        "auto" => MigrationMode::Auto,
1660        _ => {
1661            return Ok(Json(serde_json::json!({
1662                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1663            })));
1664        }
1665    };
1666
1667    let mut config = proxy_config.write().await;
1668    config.update_group_migration_mode(&group, mode);
1669
1670    Ok(Json(serde_json::json!({
1671        "group": group,
1672        "mode": format!("{:?}", mode).to_lowercase()
1673    })))
1674}
1675
1676/// Get all migration groups
1677async fn get_migration_groups(
1678    State(state): State<ManagementState>,
1679) -> Result<Json<serde_json::Value>, StatusCode> {
1680    let proxy_config = match &state.proxy_config {
1681        Some(config) => config,
1682        None => {
1683            return Ok(Json(serde_json::json!({
1684                "error": "Migration not configured. Proxy config not available."
1685            })));
1686        }
1687    };
1688
1689    let config = proxy_config.read().await;
1690    let groups = config.get_migration_groups();
1691
1692    // Convert to JSON-serializable format
1693    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1694        .into_iter()
1695        .map(|(name, info)| {
1696            (
1697                name,
1698                serde_json::json!({
1699                    "name": info.name,
1700                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1701                    "route_count": info.route_count
1702                }),
1703            )
1704        })
1705        .collect();
1706
1707    Ok(Json(serde_json::json!(groups_json)))
1708}
1709
1710/// Get overall migration status
1711async fn get_migration_status(
1712    State(state): State<ManagementState>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714    let proxy_config = match &state.proxy_config {
1715        Some(config) => config,
1716        None => {
1717            return Ok(Json(serde_json::json!({
1718                "error": "Migration not configured. Proxy config not available."
1719            })));
1720        }
1721    };
1722
1723    let config = proxy_config.read().await;
1724    let routes = config.get_migration_routes();
1725    let groups = config.get_migration_groups();
1726
1727    let mut mock_count = 0;
1728    let mut shadow_count = 0;
1729    let mut real_count = 0;
1730    let mut auto_count = 0;
1731
1732    for route in &routes {
1733        match route.migration_mode {
1734            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1735            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1736            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1737            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1738        }
1739    }
1740
1741    Ok(Json(serde_json::json!({
1742        "total_routes": routes.len(),
1743        "mock_routes": mock_count,
1744        "shadow_routes": shadow_count,
1745        "real_routes": real_count,
1746        "auto_routes": auto_count,
1747        "total_groups": groups.len(),
1748        "migration_enabled": config.migration_enabled
1749    })))
1750}
1751
1752// ========== Proxy Replacement Rules Management ==========
1753
1754/// Request body for creating/updating proxy replacement rules
1755#[derive(Debug, Deserialize, Serialize)]
1756pub struct ProxyRuleRequest {
1757    /// URL pattern to match (supports wildcards like "/api/users/*")
1758    pub pattern: String,
1759    /// Rule type: "request" or "response"
1760    #[serde(rename = "type")]
1761    pub rule_type: String,
1762    /// Optional status code filter for response rules
1763    #[serde(default)]
1764    pub status_codes: Vec<u16>,
1765    /// Body transformations to apply
1766    pub body_transforms: Vec<BodyTransformRequest>,
1767    /// Whether this rule is enabled
1768    #[serde(default = "default_true")]
1769    pub enabled: bool,
1770}
1771
1772/// Request body for individual body transformations
1773#[derive(Debug, Deserialize, Serialize)]
1774pub struct BodyTransformRequest {
1775    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1776    pub path: String,
1777    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1778    pub replace: String,
1779    /// Operation to perform: "replace", "add", or "remove"
1780    #[serde(default)]
1781    pub operation: String,
1782}
1783
1784/// Response format for proxy rules
1785#[derive(Debug, Serialize)]
1786pub struct ProxyRuleResponse {
1787    /// Rule ID (index in the array)
1788    pub id: usize,
1789    /// URL pattern
1790    pub pattern: String,
1791    /// Rule type
1792    #[serde(rename = "type")]
1793    pub rule_type: String,
1794    /// Status codes (for response rules)
1795    pub status_codes: Vec<u16>,
1796    /// Body transformations
1797    pub body_transforms: Vec<BodyTransformRequest>,
1798    /// Whether enabled
1799    pub enabled: bool,
1800}
1801
1802/// List all proxy replacement rules
1803async fn list_proxy_rules(
1804    State(state): State<ManagementState>,
1805) -> Result<Json<serde_json::Value>, StatusCode> {
1806    let proxy_config = match &state.proxy_config {
1807        Some(config) => config,
1808        None => {
1809            return Ok(Json(serde_json::json!({
1810                "error": "Proxy not configured. Proxy config not available."
1811            })));
1812        }
1813    };
1814
1815    let config = proxy_config.read().await;
1816
1817    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1818
1819    // Add request replacement rules
1820    for (idx, rule) in config.request_replacements.iter().enumerate() {
1821        rules.push(ProxyRuleResponse {
1822            id: idx,
1823            pattern: rule.pattern.clone(),
1824            rule_type: "request".to_string(),
1825            status_codes: Vec::new(),
1826            body_transforms: rule
1827                .body_transforms
1828                .iter()
1829                .map(|t| BodyTransformRequest {
1830                    path: t.path.clone(),
1831                    replace: t.replace.clone(),
1832                    operation: format!("{:?}", t.operation).to_lowercase(),
1833                })
1834                .collect(),
1835            enabled: rule.enabled,
1836        });
1837    }
1838
1839    // Add response replacement rules
1840    let request_count = config.request_replacements.len();
1841    for (idx, rule) in config.response_replacements.iter().enumerate() {
1842        rules.push(ProxyRuleResponse {
1843            id: request_count + idx,
1844            pattern: rule.pattern.clone(),
1845            rule_type: "response".to_string(),
1846            status_codes: rule.status_codes.clone(),
1847            body_transforms: rule
1848                .body_transforms
1849                .iter()
1850                .map(|t| BodyTransformRequest {
1851                    path: t.path.clone(),
1852                    replace: t.replace.clone(),
1853                    operation: format!("{:?}", t.operation).to_lowercase(),
1854                })
1855                .collect(),
1856            enabled: rule.enabled,
1857        });
1858    }
1859
1860    Ok(Json(serde_json::json!({
1861        "rules": rules
1862    })))
1863}
1864
1865/// Create a new proxy replacement rule
1866async fn create_proxy_rule(
1867    State(state): State<ManagementState>,
1868    Json(request): Json<ProxyRuleRequest>,
1869) -> Result<Json<serde_json::Value>, StatusCode> {
1870    let proxy_config = match &state.proxy_config {
1871        Some(config) => config,
1872        None => {
1873            return Ok(Json(serde_json::json!({
1874                "error": "Proxy not configured. Proxy config not available."
1875            })));
1876        }
1877    };
1878
1879    // Validate request
1880    if request.body_transforms.is_empty() {
1881        return Ok(Json(serde_json::json!({
1882            "error": "At least one body transform is required"
1883        })));
1884    }
1885
1886    let body_transforms: Vec<BodyTransform> = request
1887        .body_transforms
1888        .iter()
1889        .map(|t| {
1890            let op = match t.operation.as_str() {
1891                "replace" => TransformOperation::Replace,
1892                "add" => TransformOperation::Add,
1893                "remove" => TransformOperation::Remove,
1894                _ => TransformOperation::Replace,
1895            };
1896            BodyTransform {
1897                path: t.path.clone(),
1898                replace: t.replace.clone(),
1899                operation: op,
1900            }
1901        })
1902        .collect();
1903
1904    let new_rule = BodyTransformRule {
1905        pattern: request.pattern.clone(),
1906        status_codes: request.status_codes.clone(),
1907        body_transforms,
1908        enabled: request.enabled,
1909    };
1910
1911    let mut config = proxy_config.write().await;
1912
1913    let rule_id = if request.rule_type == "request" {
1914        config.request_replacements.push(new_rule);
1915        config.request_replacements.len() - 1
1916    } else if request.rule_type == "response" {
1917        config.response_replacements.push(new_rule);
1918        config.request_replacements.len() + config.response_replacements.len() - 1
1919    } else {
1920        return Ok(Json(serde_json::json!({
1921            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1922        })));
1923    };
1924
1925    Ok(Json(serde_json::json!({
1926        "id": rule_id,
1927        "message": "Rule created successfully"
1928    })))
1929}
1930
1931/// Get a specific proxy replacement rule
1932async fn get_proxy_rule(
1933    State(state): State<ManagementState>,
1934    Path(id): Path<String>,
1935) -> Result<Json<serde_json::Value>, StatusCode> {
1936    let proxy_config = match &state.proxy_config {
1937        Some(config) => config,
1938        None => {
1939            return Ok(Json(serde_json::json!({
1940                "error": "Proxy not configured. Proxy config not available."
1941            })));
1942        }
1943    };
1944
1945    let config = proxy_config.read().await;
1946    let rule_id: usize = match id.parse() {
1947        Ok(id) => id,
1948        Err(_) => {
1949            return Ok(Json(serde_json::json!({
1950                "error": format!("Invalid rule ID: {}", id)
1951            })));
1952        }
1953    };
1954
1955    let request_count = config.request_replacements.len();
1956
1957    if rule_id < request_count {
1958        // Request rule
1959        let rule = &config.request_replacements[rule_id];
1960        Ok(Json(serde_json::json!({
1961            "id": rule_id,
1962            "pattern": rule.pattern,
1963            "type": "request",
1964            "status_codes": [],
1965            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1966                "path": t.path,
1967                "replace": t.replace,
1968                "operation": format!("{:?}", t.operation).to_lowercase()
1969            })).collect::<Vec<_>>(),
1970            "enabled": rule.enabled
1971        })))
1972    } else if rule_id < request_count + config.response_replacements.len() {
1973        // Response rule
1974        let response_idx = rule_id - request_count;
1975        let rule = &config.response_replacements[response_idx];
1976        Ok(Json(serde_json::json!({
1977            "id": rule_id,
1978            "pattern": rule.pattern,
1979            "type": "response",
1980            "status_codes": rule.status_codes,
1981            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1982                "path": t.path,
1983                "replace": t.replace,
1984                "operation": format!("{:?}", t.operation).to_lowercase()
1985            })).collect::<Vec<_>>(),
1986            "enabled": rule.enabled
1987        })))
1988    } else {
1989        Ok(Json(serde_json::json!({
1990            "error": format!("Rule ID {} not found", rule_id)
1991        })))
1992    }
1993}
1994
1995/// Update a proxy replacement rule
1996async fn update_proxy_rule(
1997    State(state): State<ManagementState>,
1998    Path(id): Path<String>,
1999    Json(request): Json<ProxyRuleRequest>,
2000) -> Result<Json<serde_json::Value>, StatusCode> {
2001    let proxy_config = match &state.proxy_config {
2002        Some(config) => config,
2003        None => {
2004            return Ok(Json(serde_json::json!({
2005                "error": "Proxy not configured. Proxy config not available."
2006            })));
2007        }
2008    };
2009
2010    let mut config = proxy_config.write().await;
2011    let rule_id: usize = match id.parse() {
2012        Ok(id) => id,
2013        Err(_) => {
2014            return Ok(Json(serde_json::json!({
2015                "error": format!("Invalid rule ID: {}", id)
2016            })));
2017        }
2018    };
2019
2020    let body_transforms: Vec<BodyTransform> = request
2021        .body_transforms
2022        .iter()
2023        .map(|t| {
2024            let op = match t.operation.as_str() {
2025                "replace" => TransformOperation::Replace,
2026                "add" => TransformOperation::Add,
2027                "remove" => TransformOperation::Remove,
2028                _ => TransformOperation::Replace,
2029            };
2030            BodyTransform {
2031                path: t.path.clone(),
2032                replace: t.replace.clone(),
2033                operation: op,
2034            }
2035        })
2036        .collect();
2037
2038    let updated_rule = BodyTransformRule {
2039        pattern: request.pattern.clone(),
2040        status_codes: request.status_codes.clone(),
2041        body_transforms,
2042        enabled: request.enabled,
2043    };
2044
2045    let request_count = config.request_replacements.len();
2046
2047    if rule_id < request_count {
2048        // Update request rule
2049        config.request_replacements[rule_id] = updated_rule;
2050    } else if rule_id < request_count + config.response_replacements.len() {
2051        // Update response rule
2052        let response_idx = rule_id - request_count;
2053        config.response_replacements[response_idx] = updated_rule;
2054    } else {
2055        return Ok(Json(serde_json::json!({
2056            "error": format!("Rule ID {} not found", rule_id)
2057        })));
2058    }
2059
2060    Ok(Json(serde_json::json!({
2061        "id": rule_id,
2062        "message": "Rule updated successfully"
2063    })))
2064}
2065
2066/// Delete a proxy replacement rule
2067async fn delete_proxy_rule(
2068    State(state): State<ManagementState>,
2069    Path(id): Path<String>,
2070) -> Result<Json<serde_json::Value>, StatusCode> {
2071    let proxy_config = match &state.proxy_config {
2072        Some(config) => config,
2073        None => {
2074            return Ok(Json(serde_json::json!({
2075                "error": "Proxy not configured. Proxy config not available."
2076            })));
2077        }
2078    };
2079
2080    let mut config = proxy_config.write().await;
2081    let rule_id: usize = match id.parse() {
2082        Ok(id) => id,
2083        Err(_) => {
2084            return Ok(Json(serde_json::json!({
2085                "error": format!("Invalid rule ID: {}", id)
2086            })));
2087        }
2088    };
2089
2090    let request_count = config.request_replacements.len();
2091
2092    if rule_id < request_count {
2093        // Delete request rule
2094        config.request_replacements.remove(rule_id);
2095    } else if rule_id < request_count + config.response_replacements.len() {
2096        // Delete response rule
2097        let response_idx = rule_id - request_count;
2098        config.response_replacements.remove(response_idx);
2099    } else {
2100        return Ok(Json(serde_json::json!({
2101            "error": format!("Rule ID {} not found", rule_id)
2102        })));
2103    }
2104
2105    Ok(Json(serde_json::json!({
2106        "id": rule_id,
2107        "message": "Rule deleted successfully"
2108    })))
2109}
2110
2111/// Get recent intercepted requests/responses for inspection
2112/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2113async fn get_proxy_inspect(
2114    State(_state): State<ManagementState>,
2115    Query(params): Query<std::collections::HashMap<String, String>>,
2116) -> Result<Json<serde_json::Value>, StatusCode> {
2117    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2118
2119    // Note: Request/response inspection would require:
2120    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2121    // 2. Integrating with proxy middleware to capture traffic
2122    // 3. Implementing filtering and pagination for large volumes of traffic
2123    // For now, return an empty response structure indicating the feature is not yet implemented
2124    Ok(Json(serde_json::json!({
2125        "requests": [],
2126        "responses": [],
2127        "limit": limit,
2128        "total": 0,
2129        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2130    })))
2131}
2132
2133/// Build the management API router
2134pub fn management_router(state: ManagementState) -> Router {
2135    let router = Router::new()
2136        .route("/health", get(health_check))
2137        .route("/stats", get(get_stats))
2138        .route("/config", get(get_config))
2139        .route("/config/validate", post(validate_config))
2140        .route("/config/bulk", post(bulk_update_config))
2141        .route("/mocks", get(list_mocks))
2142        .route("/mocks", post(create_mock))
2143        .route("/mocks/{id}", get(get_mock))
2144        .route("/mocks/{id}", put(update_mock))
2145        .route("/mocks/{id}", delete(delete_mock))
2146        .route("/export", get(export_mocks))
2147        .route("/import", post(import_mocks));
2148
2149    #[cfg(feature = "smtp")]
2150    let router = router
2151        .route("/smtp/mailbox", get(list_smtp_emails))
2152        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2153        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2154        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2155        .route("/smtp/mailbox/search", get(search_smtp_emails));
2156
2157    #[cfg(not(feature = "smtp"))]
2158    let router = router;
2159
2160    // MQTT routes
2161    #[cfg(feature = "mqtt")]
2162    let router = router
2163        .route("/mqtt/stats", get(get_mqtt_stats))
2164        .route("/mqtt/clients", get(get_mqtt_clients))
2165        .route("/mqtt/topics", get(get_mqtt_topics))
2166        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2167        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2168        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2169        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2170
2171    #[cfg(not(feature = "mqtt"))]
2172    let router = router
2173        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2174        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2175
2176    #[cfg(feature = "kafka")]
2177    let router = router
2178        .route("/kafka/stats", get(get_kafka_stats))
2179        .route("/kafka/topics", get(get_kafka_topics))
2180        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2181        .route("/kafka/groups", get(get_kafka_groups))
2182        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2183        .route("/kafka/produce", post(produce_kafka_message))
2184        .route("/kafka/produce/batch", post(produce_kafka_batch))
2185        .route("/kafka/messages/stream", get(kafka_messages_stream));
2186
2187    #[cfg(not(feature = "kafka"))]
2188    let router = router;
2189
2190    // Migration pipeline routes
2191    let router = router
2192        .route("/migration/routes", get(get_migration_routes))
2193        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2194        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2195        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2196        .route("/migration/groups/{group}", put(set_group_migration_mode))
2197        .route("/migration/groups", get(get_migration_groups))
2198        .route("/migration/status", get(get_migration_status));
2199
2200    // Proxy replacement rules routes
2201    let router = router
2202        .route("/proxy/rules", get(list_proxy_rules))
2203        .route("/proxy/rules", post(create_proxy_rule))
2204        .route("/proxy/rules/{id}", get(get_proxy_rule))
2205        .route("/proxy/rules/{id}", put(update_proxy_rule))
2206        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2207        .route("/proxy/inspect", get(get_proxy_inspect));
2208
2209    // AI-powered features
2210    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2211
2212    // Snapshot diff endpoints
2213    let router = router.nest(
2214        "/snapshot-diff",
2215        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2216    );
2217
2218    #[cfg(feature = "behavioral-cloning")]
2219    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2220
2221    let router = router
2222        .route("/mockai/learn", post(learn_from_examples))
2223        .route("/mockai/rules/explanations", get(list_rule_explanations))
2224        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2225        .route("/chaos/config", get(get_chaos_config))
2226        .route("/chaos/config", post(update_chaos_config))
2227        .route("/network/profiles", get(list_network_profiles))
2228        .route("/network/profile/apply", post(apply_network_profile));
2229
2230    // State machine API routes
2231    let router =
2232        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2233
2234    router.with_state(state)
2235}
2236
2237#[cfg(feature = "kafka")]
2238#[derive(Debug, Clone, Serialize, Deserialize)]
2239pub struct KafkaBrokerStats {
2240    /// Number of topics
2241    pub topics: usize,
2242    /// Total number of partitions
2243    pub partitions: usize,
2244    /// Number of consumer groups
2245    pub consumer_groups: usize,
2246    /// Total messages produced
2247    pub messages_produced: u64,
2248    /// Total messages consumed
2249    pub messages_consumed: u64,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaTopicInfo {
2255    pub name: String,
2256    pub partitions: usize,
2257    pub replication_factor: i32,
2258}
2259
2260#[cfg(feature = "kafka")]
2261#[derive(Debug, Clone, Serialize, Deserialize)]
2262pub struct KafkaConsumerGroupInfo {
2263    pub group_id: String,
2264    pub members: usize,
2265    pub state: String,
2266}
2267
2268#[cfg(feature = "kafka")]
2269/// Get Kafka broker statistics
2270async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2271    if let Some(broker) = &state.kafka_broker {
2272        let topics = broker.topics.read().await;
2273        let consumer_groups = broker.consumer_groups.read().await;
2274
2275        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2276
2277        // Get metrics snapshot for message counts
2278        let metrics_snapshot = broker.metrics().snapshot();
2279
2280        let stats = KafkaBrokerStats {
2281            topics: topics.len(),
2282            partitions: total_partitions,
2283            consumer_groups: consumer_groups.groups().len(),
2284            messages_produced: metrics_snapshot.messages_produced_total,
2285            messages_consumed: metrics_snapshot.messages_consumed_total,
2286        };
2287
2288        Json(stats).into_response()
2289    } else {
2290        (
2291            StatusCode::SERVICE_UNAVAILABLE,
2292            Json(serde_json::json!({
2293                "error": "Kafka broker not available",
2294                "message": "Kafka broker is not enabled or not available."
2295            })),
2296        )
2297            .into_response()
2298    }
2299}
2300
2301#[cfg(feature = "kafka")]
2302/// List Kafka topics
2303async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2304    if let Some(broker) = &state.kafka_broker {
2305        let topics = broker.topics.read().await;
2306        let topic_list: Vec<KafkaTopicInfo> = topics
2307            .iter()
2308            .map(|(name, topic)| KafkaTopicInfo {
2309                name: name.clone(),
2310                partitions: topic.partitions.len(),
2311                replication_factor: topic.config.replication_factor as i32,
2312            })
2313            .collect();
2314
2315        Json(serde_json::json!({
2316            "topics": topic_list
2317        }))
2318        .into_response()
2319    } else {
2320        (
2321            StatusCode::SERVICE_UNAVAILABLE,
2322            Json(serde_json::json!({
2323                "error": "Kafka broker not available",
2324                "message": "Kafka broker is not enabled or not available."
2325            })),
2326        )
2327            .into_response()
2328    }
2329}
2330
2331#[cfg(feature = "kafka")]
2332/// Get Kafka topic details
2333async fn get_kafka_topic(
2334    State(state): State<ManagementState>,
2335    Path(topic_name): Path<String>,
2336) -> impl IntoResponse {
2337    if let Some(broker) = &state.kafka_broker {
2338        let topics = broker.topics.read().await;
2339        if let Some(topic) = topics.get(&topic_name) {
2340            Json(serde_json::json!({
2341                "name": topic_name,
2342                "partitions": topic.partitions.len(),
2343                "replication_factor": topic.config.replication_factor,
2344                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2345                    "id": idx as i32,
2346                    "leader": 0,
2347                    "replicas": vec![0],
2348                    "message_count": partition.messages.len()
2349                })).collect::<Vec<_>>()
2350            })).into_response()
2351        } else {
2352            (
2353                StatusCode::NOT_FOUND,
2354                Json(serde_json::json!({
2355                    "error": "Topic not found",
2356                    "topic": topic_name
2357                })),
2358            )
2359                .into_response()
2360        }
2361    } else {
2362        (
2363            StatusCode::SERVICE_UNAVAILABLE,
2364            Json(serde_json::json!({
2365                "error": "Kafka broker not available",
2366                "message": "Kafka broker is not enabled or not available."
2367            })),
2368        )
2369            .into_response()
2370    }
2371}
2372
2373#[cfg(feature = "kafka")]
2374/// List Kafka consumer groups
2375async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2376    if let Some(broker) = &state.kafka_broker {
2377        let consumer_groups = broker.consumer_groups.read().await;
2378        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2379            .groups()
2380            .iter()
2381            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2382                group_id: group_id.clone(),
2383                members: group.members.len(),
2384                state: "Stable".to_string(), // Simplified - could be more detailed
2385            })
2386            .collect();
2387
2388        Json(serde_json::json!({
2389            "groups": groups
2390        }))
2391        .into_response()
2392    } else {
2393        (
2394            StatusCode::SERVICE_UNAVAILABLE,
2395            Json(serde_json::json!({
2396                "error": "Kafka broker not available",
2397                "message": "Kafka broker is not enabled or not available."
2398            })),
2399        )
2400            .into_response()
2401    }
2402}
2403
2404#[cfg(feature = "kafka")]
2405/// Get Kafka consumer group details
2406async fn get_kafka_group(
2407    State(state): State<ManagementState>,
2408    Path(group_id): Path<String>,
2409) -> impl IntoResponse {
2410    if let Some(broker) = &state.kafka_broker {
2411        let consumer_groups = broker.consumer_groups.read().await;
2412        if let Some(group) = consumer_groups.groups().get(&group_id) {
2413            Json(serde_json::json!({
2414                "group_id": group_id,
2415                "members": group.members.len(),
2416                "state": "Stable",
2417                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2418                    "member_id": member_id,
2419                    "client_id": member.client_id,
2420                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2421                        "topic": a.topic,
2422                        "partitions": a.partitions
2423                    })).collect::<Vec<_>>()
2424                })).collect::<Vec<_>>(),
2425                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2426                    "topic": topic,
2427                    "partition": partition,
2428                    "offset": offset
2429                })).collect::<Vec<_>>()
2430            })).into_response()
2431        } else {
2432            (
2433                StatusCode::NOT_FOUND,
2434                Json(serde_json::json!({
2435                    "error": "Consumer group not found",
2436                    "group_id": group_id
2437                })),
2438            )
2439                .into_response()
2440        }
2441    } else {
2442        (
2443            StatusCode::SERVICE_UNAVAILABLE,
2444            Json(serde_json::json!({
2445                "error": "Kafka broker not available",
2446                "message": "Kafka broker is not enabled or not available."
2447            })),
2448        )
2449            .into_response()
2450    }
2451}
2452
2453// ========== Kafka Produce Handler ==========
2454
2455#[cfg(feature = "kafka")]
2456#[derive(Debug, Deserialize)]
2457pub struct KafkaProduceRequest {
2458    /// Topic to produce to
2459    pub topic: String,
2460    /// Message key (optional)
2461    #[serde(default)]
2462    pub key: Option<String>,
2463    /// Message value (JSON string or plain string)
2464    pub value: String,
2465    /// Partition ID (optional, auto-assigned if not provided)
2466    #[serde(default)]
2467    pub partition: Option<i32>,
2468    /// Message headers (optional, key-value pairs)
2469    #[serde(default)]
2470    pub headers: Option<std::collections::HashMap<String, String>>,
2471}
2472
2473#[cfg(feature = "kafka")]
2474/// Produce a message to a Kafka topic
2475async fn produce_kafka_message(
2476    State(state): State<ManagementState>,
2477    Json(request): Json<KafkaProduceRequest>,
2478) -> impl IntoResponse {
2479    if let Some(broker) = &state.kafka_broker {
2480        let mut topics = broker.topics.write().await;
2481
2482        // Get or create the topic
2483        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2484            mockforge_kafka::topics::Topic::new(
2485                request.topic.clone(),
2486                mockforge_kafka::topics::TopicConfig::default(),
2487            )
2488        });
2489
2490        // Determine partition
2491        let partition_id = if let Some(partition) = request.partition {
2492            partition
2493        } else {
2494            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2495        };
2496
2497        // Validate partition exists
2498        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2499            return (
2500                StatusCode::BAD_REQUEST,
2501                Json(serde_json::json!({
2502                    "error": "Invalid partition",
2503                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2504                })),
2505            )
2506                .into_response();
2507        }
2508
2509        // Create the message
2510        let key_clone = request.key.clone();
2511        let headers_clone = request.headers.clone();
2512        let message = mockforge_kafka::partitions::KafkaMessage {
2513            offset: 0, // Will be set by partition.append
2514            timestamp: chrono::Utc::now().timestamp_millis(),
2515            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2516            value: request.value.as_bytes().to_vec(),
2517            headers: headers_clone
2518                .clone()
2519                .unwrap_or_default()
2520                .into_iter()
2521                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2522                .collect(),
2523        };
2524
2525        // Produce to partition
2526        match topic_entry.produce(partition_id, message).await {
2527            Ok(offset) => {
2528                // Record metrics for successful message production
2529                if let Some(broker) = &state.kafka_broker {
2530                    broker.metrics().record_messages_produced(1);
2531                }
2532
2533                // Emit message event for real-time monitoring
2534                #[cfg(feature = "kafka")]
2535                {
2536                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2537                        topic: request.topic.clone(),
2538                        key: key_clone,
2539                        value: request.value.clone(),
2540                        partition: partition_id,
2541                        offset,
2542                        headers: headers_clone,
2543                        timestamp: chrono::Utc::now().to_rfc3339(),
2544                    });
2545                    let _ = state.message_events.send(event);
2546                }
2547
2548                Json(serde_json::json!({
2549                    "success": true,
2550                    "message": format!("Message produced to topic '{}'", request.topic),
2551                    "topic": request.topic,
2552                    "partition": partition_id,
2553                    "offset": offset
2554                }))
2555                .into_response()
2556            }
2557            Err(e) => (
2558                StatusCode::INTERNAL_SERVER_ERROR,
2559                Json(serde_json::json!({
2560                    "error": "Failed to produce message",
2561                    "message": e.to_string()
2562                })),
2563            )
2564                .into_response(),
2565        }
2566    } else {
2567        (
2568            StatusCode::SERVICE_UNAVAILABLE,
2569            Json(serde_json::json!({
2570                "error": "Kafka broker not available",
2571                "message": "Kafka broker is not enabled or not available."
2572            })),
2573        )
2574            .into_response()
2575    }
2576}
2577
2578#[cfg(feature = "kafka")]
2579#[derive(Debug, Deserialize)]
2580pub struct KafkaBatchProduceRequest {
2581    /// List of messages to produce
2582    pub messages: Vec<KafkaProduceRequest>,
2583    /// Delay between messages in milliseconds
2584    #[serde(default = "default_delay")]
2585    pub delay_ms: u64,
2586}
2587
2588#[cfg(feature = "kafka")]
2589/// Produce multiple messages to Kafka topics
2590async fn produce_kafka_batch(
2591    State(state): State<ManagementState>,
2592    Json(request): Json<KafkaBatchProduceRequest>,
2593) -> impl IntoResponse {
2594    if let Some(broker) = &state.kafka_broker {
2595        if request.messages.is_empty() {
2596            return (
2597                StatusCode::BAD_REQUEST,
2598                Json(serde_json::json!({
2599                    "error": "Empty batch",
2600                    "message": "At least one message is required"
2601                })),
2602            )
2603                .into_response();
2604        }
2605
2606        let mut results = Vec::new();
2607
2608        for (index, msg_request) in request.messages.iter().enumerate() {
2609            let mut topics = broker.topics.write().await;
2610
2611            // Get or create the topic
2612            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2613                mockforge_kafka::topics::Topic::new(
2614                    msg_request.topic.clone(),
2615                    mockforge_kafka::topics::TopicConfig::default(),
2616                )
2617            });
2618
2619            // Determine partition
2620            let partition_id = if let Some(partition) = msg_request.partition {
2621                partition
2622            } else {
2623                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2624            };
2625
2626            // Validate partition exists
2627            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2628                results.push(serde_json::json!({
2629                    "index": index,
2630                    "success": false,
2631                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2632                }));
2633                continue;
2634            }
2635
2636            // Create the message
2637            let message = mockforge_kafka::partitions::KafkaMessage {
2638                offset: 0,
2639                timestamp: chrono::Utc::now().timestamp_millis(),
2640                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2641                value: msg_request.value.as_bytes().to_vec(),
2642                headers: msg_request
2643                    .headers
2644                    .clone()
2645                    .unwrap_or_default()
2646                    .into_iter()
2647                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2648                    .collect(),
2649            };
2650
2651            // Produce to partition
2652            match topic_entry.produce(partition_id, message).await {
2653                Ok(offset) => {
2654                    // Record metrics for successful message production
2655                    if let Some(broker) = &state.kafka_broker {
2656                        broker.metrics().record_messages_produced(1);
2657                    }
2658
2659                    // Emit message event
2660                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2661                        topic: msg_request.topic.clone(),
2662                        key: msg_request.key.clone(),
2663                        value: msg_request.value.clone(),
2664                        partition: partition_id,
2665                        offset,
2666                        headers: msg_request.headers.clone(),
2667                        timestamp: chrono::Utc::now().to_rfc3339(),
2668                    });
2669                    let _ = state.message_events.send(event);
2670
2671                    results.push(serde_json::json!({
2672                        "index": index,
2673                        "success": true,
2674                        "topic": msg_request.topic,
2675                        "partition": partition_id,
2676                        "offset": offset
2677                    }));
2678                }
2679                Err(e) => {
2680                    results.push(serde_json::json!({
2681                        "index": index,
2682                        "success": false,
2683                        "error": e.to_string()
2684                    }));
2685                }
2686            }
2687
2688            // Add delay between messages (except for the last one)
2689            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2690                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2691            }
2692        }
2693
2694        let success_count =
2695            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2696
2697        Json(serde_json::json!({
2698            "success": true,
2699            "total": request.messages.len(),
2700            "succeeded": success_count,
2701            "failed": request.messages.len() - success_count,
2702            "results": results
2703        }))
2704        .into_response()
2705    } else {
2706        (
2707            StatusCode::SERVICE_UNAVAILABLE,
2708            Json(serde_json::json!({
2709                "error": "Kafka broker not available",
2710                "message": "Kafka broker is not enabled or not available."
2711            })),
2712        )
2713            .into_response()
2714    }
2715}
2716
2717// ========== Real-time Message Streaming (SSE) ==========
2718
2719#[cfg(feature = "mqtt")]
2720/// SSE stream for MQTT messages
2721async fn mqtt_messages_stream(
2722    State(state): State<ManagementState>,
2723    Query(params): Query<std::collections::HashMap<String, String>>,
2724) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2725    let rx = state.message_events.subscribe();
2726    let topic_filter = params.get("topic").cloned();
2727
2728    let stream = stream::unfold(rx, move |mut rx| {
2729        let topic_filter = topic_filter.clone();
2730
2731        async move {
2732            loop {
2733                match rx.recv().await {
2734                    Ok(MessageEvent::Mqtt(event)) => {
2735                        // Apply topic filter if specified
2736                        if let Some(filter) = &topic_filter {
2737                            if !event.topic.contains(filter) {
2738                                continue;
2739                            }
2740                        }
2741
2742                        let event_json = serde_json::json!({
2743                            "protocol": "mqtt",
2744                            "topic": event.topic,
2745                            "payload": event.payload,
2746                            "qos": event.qos,
2747                            "retain": event.retain,
2748                            "timestamp": event.timestamp,
2749                        });
2750
2751                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2752                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2753                            return Some((Ok(sse_event), rx));
2754                        }
2755                    }
2756                    #[cfg(feature = "kafka")]
2757                    Ok(MessageEvent::Kafka(_)) => {
2758                        // Skip Kafka events in MQTT stream
2759                        continue;
2760                    }
2761                    Err(broadcast::error::RecvError::Closed) => {
2762                        return None;
2763                    }
2764                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2765                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2766                        continue;
2767                    }
2768                }
2769            }
2770        }
2771    });
2772
2773    Sse::new(stream).keep_alive(
2774        axum::response::sse::KeepAlive::new()
2775            .interval(std::time::Duration::from_secs(15))
2776            .text("keep-alive-text"),
2777    )
2778}
2779
2780#[cfg(feature = "kafka")]
2781/// SSE stream for Kafka messages
2782async fn kafka_messages_stream(
2783    State(state): State<ManagementState>,
2784    Query(params): Query<std::collections::HashMap<String, String>>,
2785) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2786    let mut rx = state.message_events.subscribe();
2787    let topic_filter = params.get("topic").cloned();
2788
2789    let stream = stream::unfold(rx, move |mut rx| {
2790        let topic_filter = topic_filter.clone();
2791
2792        async move {
2793            loop {
2794                match rx.recv().await {
2795                    #[cfg(feature = "mqtt")]
2796                    Ok(MessageEvent::Mqtt(_)) => {
2797                        // Skip MQTT events in Kafka stream
2798                        continue;
2799                    }
2800                    Ok(MessageEvent::Kafka(event)) => {
2801                        // Apply topic filter if specified
2802                        if let Some(filter) = &topic_filter {
2803                            if !event.topic.contains(filter) {
2804                                continue;
2805                            }
2806                        }
2807
2808                        let event_json = serde_json::json!({
2809                            "protocol": "kafka",
2810                            "topic": event.topic,
2811                            "key": event.key,
2812                            "value": event.value,
2813                            "partition": event.partition,
2814                            "offset": event.offset,
2815                            "headers": event.headers,
2816                            "timestamp": event.timestamp,
2817                        });
2818
2819                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2820                            let sse_event =
2821                                Event::default().event("kafka_message").data(event_data);
2822                            return Some((Ok(sse_event), rx));
2823                        }
2824                    }
2825                    Err(broadcast::error::RecvError::Closed) => {
2826                        return None;
2827                    }
2828                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2829                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2830                        continue;
2831                    }
2832                }
2833            }
2834        }
2835    });
2836
2837    Sse::new(stream).keep_alive(
2838        axum::response::sse::KeepAlive::new()
2839            .interval(std::time::Duration::from_secs(15))
2840            .text("keep-alive-text"),
2841    )
2842}
2843
2844// ========== AI-Powered Features ==========
2845
2846/// Request for AI-powered API specification generation
2847#[derive(Debug, Deserialize)]
2848pub struct GenerateSpecRequest {
2849    /// Natural language description of the API to generate
2850    pub query: String,
2851    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2852    pub spec_type: String,
2853    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2854    pub api_version: Option<String>,
2855}
2856
2857/// Request for OpenAPI generation from recorded traffic
2858#[derive(Debug, Deserialize)]
2859pub struct GenerateOpenApiFromTrafficRequest {
2860    /// Path to recorder database (optional, defaults to ./recordings.db)
2861    #[serde(default)]
2862    pub database_path: Option<String>,
2863    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2864    #[serde(default)]
2865    pub since: Option<String>,
2866    /// End time for filtering (ISO 8601 format)
2867    #[serde(default)]
2868    pub until: Option<String>,
2869    /// Path pattern filter (supports wildcards, e.g., /api/*)
2870    #[serde(default)]
2871    pub path_pattern: Option<String>,
2872    /// Minimum confidence score for including paths (0.0 to 1.0)
2873    #[serde(default = "default_min_confidence")]
2874    pub min_confidence: f64,
2875}
2876
2877fn default_min_confidence() -> f64 {
2878    0.7
2879}
2880
2881/// Generate API specification from natural language using AI
2882#[cfg(feature = "data-faker")]
2883async fn generate_ai_spec(
2884    State(_state): State<ManagementState>,
2885    Json(request): Json<GenerateSpecRequest>,
2886) -> impl IntoResponse {
2887    use mockforge_data::rag::{
2888        config::{EmbeddingProvider, LlmProvider, RagConfig},
2889        engine::RagEngine,
2890        storage::{DocumentStorage, StorageFactory},
2891    };
2892    use std::sync::Arc;
2893
2894    // Build RAG config from environment variables
2895    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2896        .ok()
2897        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2898
2899    // Check if RAG is configured - require API key
2900    if api_key.is_none() {
2901        return (
2902            StatusCode::SERVICE_UNAVAILABLE,
2903            Json(serde_json::json!({
2904                "error": "AI service not configured",
2905                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2906            })),
2907        )
2908            .into_response();
2909    }
2910
2911    // Build RAG configuration
2912    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2913        .unwrap_or_else(|_| "openai".to_string())
2914        .to_lowercase();
2915
2916    let provider = match provider_str.as_str() {
2917        "openai" => LlmProvider::OpenAI,
2918        "anthropic" => LlmProvider::Anthropic,
2919        "ollama" => LlmProvider::Ollama,
2920        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2921        _ => LlmProvider::OpenAI,
2922    };
2923
2924    let api_endpoint =
2925        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2926            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2927            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2928            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2929            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2930        });
2931
2932    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2933        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2934        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2935        LlmProvider::Ollama => "llama2".to_string(),
2936        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2937    });
2938
2939    // Build RagConfig using default() and override fields
2940    let mut rag_config = RagConfig::default();
2941    rag_config.provider = provider;
2942    rag_config.api_endpoint = api_endpoint;
2943    rag_config.api_key = api_key;
2944    rag_config.model = model;
2945    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2946        .unwrap_or_else(|_| "4096".to_string())
2947        .parse()
2948        .unwrap_or(4096);
2949    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2950        .unwrap_or_else(|_| "0.3".to_string())
2951        .parse()
2952        .unwrap_or(0.3); // Lower temperature for more structured output
2953    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2954        .unwrap_or_else(|_| "60".to_string())
2955        .parse()
2956        .unwrap_or(60);
2957    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2958        .unwrap_or_else(|_| "4000".to_string())
2959        .parse()
2960        .unwrap_or(4000);
2961
2962    // Build the prompt for spec generation
2963    let spec_type_label = match request.spec_type.as_str() {
2964        "openapi" => "OpenAPI 3.0",
2965        "graphql" => "GraphQL",
2966        "asyncapi" => "AsyncAPI",
2967        _ => "OpenAPI 3.0",
2968    };
2969
2970    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2971
2972    let prompt = format!(
2973        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2974
2975User Requirements:
2976{}
2977
2978Instructions:
29791. Generate a complete, valid {} specification
29802. Include all paths, operations, request/response schemas, and components
29813. Use realistic field names and data types
29824. Include proper descriptions and examples
29835. Follow {} best practices
29846. Return ONLY the specification, no additional explanation
29857. For OpenAPI, use version {}
2986
2987Return the specification in {} format."#,
2988        spec_type_label,
2989        request.query,
2990        spec_type_label,
2991        spec_type_label,
2992        api_version,
2993        if request.spec_type == "graphql" {
2994            "GraphQL SDL"
2995        } else {
2996            "YAML"
2997        }
2998    );
2999
3000    // Create in-memory storage for RAG engine
3001    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3002    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3003    // a simpler approach: create InMemoryStorage directly
3004    use mockforge_data::rag::storage::InMemoryStorage;
3005    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3006
3007    // Create RAG engine
3008    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3009        Ok(engine) => engine,
3010        Err(e) => {
3011            return (
3012                StatusCode::INTERNAL_SERVER_ERROR,
3013                Json(serde_json::json!({
3014                    "error": "Failed to initialize RAG engine",
3015                    "message": e.to_string()
3016                })),
3017            )
3018                .into_response();
3019        }
3020    };
3021
3022    // Generate using RAG engine
3023    match rag_engine.generate(&prompt, None).await {
3024        Ok(generated_text) => {
3025            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3026            let spec = if request.spec_type == "graphql" {
3027                // For GraphQL, extract SDL
3028                extract_graphql_schema(&generated_text)
3029            } else {
3030                // For OpenAPI/AsyncAPI, extract YAML
3031                extract_yaml_spec(&generated_text)
3032            };
3033
3034            Json(serde_json::json!({
3035                "success": true,
3036                "spec": spec,
3037                "spec_type": request.spec_type,
3038            }))
3039            .into_response()
3040        }
3041        Err(e) => (
3042            StatusCode::INTERNAL_SERVER_ERROR,
3043            Json(serde_json::json!({
3044                "error": "AI generation failed",
3045                "message": e.to_string()
3046            })),
3047        )
3048            .into_response(),
3049    }
3050}
3051
3052#[cfg(not(feature = "data-faker"))]
3053async fn generate_ai_spec(
3054    State(_state): State<ManagementState>,
3055    Json(_request): Json<GenerateSpecRequest>,
3056) -> impl IntoResponse {
3057    (
3058        StatusCode::NOT_IMPLEMENTED,
3059        Json(serde_json::json!({
3060            "error": "AI features not enabled",
3061            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3062        })),
3063    )
3064        .into_response()
3065}
3066
3067/// Generate OpenAPI specification from recorded traffic
3068#[cfg(feature = "behavioral-cloning")]
3069async fn generate_openapi_from_traffic(
3070    State(_state): State<ManagementState>,
3071    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3072) -> impl IntoResponse {
3073    use chrono::{DateTime, Utc};
3074    use mockforge_core::intelligent_behavior::{
3075        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3076        IntelligentBehaviorConfig,
3077    };
3078    use mockforge_recorder::{
3079        database::RecorderDatabase,
3080        openapi_export::{QueryFilters, RecordingsToOpenApi},
3081    };
3082    use std::path::PathBuf;
3083
3084    // Determine database path
3085    let db_path = if let Some(ref path) = request.database_path {
3086        PathBuf::from(path)
3087    } else {
3088        std::env::current_dir()
3089            .unwrap_or_else(|_| PathBuf::from("."))
3090            .join("recordings.db")
3091    };
3092
3093    // Open database
3094    let db = match RecorderDatabase::new(&db_path).await {
3095        Ok(db) => db,
3096        Err(e) => {
3097            return (
3098                StatusCode::BAD_REQUEST,
3099                Json(serde_json::json!({
3100                    "error": "Database error",
3101                    "message": format!("Failed to open recorder database: {}", e)
3102                })),
3103            )
3104                .into_response();
3105        }
3106    };
3107
3108    // Parse time filters
3109    let since_dt = if let Some(ref since_str) = request.since {
3110        match DateTime::parse_from_rfc3339(since_str) {
3111            Ok(dt) => Some(dt.with_timezone(&Utc)),
3112            Err(e) => {
3113                return (
3114                    StatusCode::BAD_REQUEST,
3115                    Json(serde_json::json!({
3116                        "error": "Invalid date format",
3117                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3118                    })),
3119                )
3120                    .into_response();
3121            }
3122        }
3123    } else {
3124        None
3125    };
3126
3127    let until_dt = if let Some(ref until_str) = request.until {
3128        match DateTime::parse_from_rfc3339(until_str) {
3129            Ok(dt) => Some(dt.with_timezone(&Utc)),
3130            Err(e) => {
3131                return (
3132                    StatusCode::BAD_REQUEST,
3133                    Json(serde_json::json!({
3134                        "error": "Invalid date format",
3135                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3136                    })),
3137                )
3138                    .into_response();
3139            }
3140        }
3141    } else {
3142        None
3143    };
3144
3145    // Build query filters
3146    let query_filters = QueryFilters {
3147        since: since_dt,
3148        until: until_dt,
3149        path_pattern: request.path_pattern.clone(),
3150        min_status_code: None,
3151        max_requests: Some(1000),
3152    };
3153
3154    // Query HTTP exchanges
3155    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3156    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3157    // dependency, so we need to manually convert to the local version.
3158    let exchanges_from_recorder =
3159        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3160            Ok(exchanges) => exchanges,
3161            Err(e) => {
3162                return (
3163                    StatusCode::INTERNAL_SERVER_ERROR,
3164                    Json(serde_json::json!({
3165                        "error": "Query error",
3166                        "message": format!("Failed to query HTTP exchanges: {}", e)
3167                    })),
3168                )
3169                    .into_response();
3170            }
3171        };
3172
3173    if exchanges_from_recorder.is_empty() {
3174        return (
3175            StatusCode::NOT_FOUND,
3176            Json(serde_json::json!({
3177                "error": "No exchanges found",
3178                "message": "No HTTP exchanges found matching the specified filters"
3179            })),
3180        )
3181            .into_response();
3182    }
3183
3184    // Convert to local HttpExchange type to avoid version mismatch
3185    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3186    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3187        .into_iter()
3188        .map(|e| LocalHttpExchange {
3189            method: e.method,
3190            path: e.path,
3191            query_params: e.query_params,
3192            headers: e.headers,
3193            body: e.body,
3194            body_encoding: e.body_encoding,
3195            status_code: e.status_code,
3196            response_headers: e.response_headers,
3197            response_body: e.response_body,
3198            response_body_encoding: e.response_body_encoding,
3199            timestamp: e.timestamp,
3200        })
3201        .collect();
3202
3203    // Create OpenAPI generator config
3204    let behavior_config = IntelligentBehaviorConfig::default();
3205    let gen_config = OpenApiGenerationConfig {
3206        min_confidence: request.min_confidence,
3207        behavior_model: Some(behavior_config.behavior_model),
3208    };
3209
3210    // Generate OpenAPI spec
3211    let generator = OpenApiSpecGenerator::new(gen_config);
3212    let result = match generator.generate_from_exchanges(exchanges).await {
3213        Ok(result) => result,
3214        Err(e) => {
3215            return (
3216                StatusCode::INTERNAL_SERVER_ERROR,
3217                Json(serde_json::json!({
3218                    "error": "Generation error",
3219                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3220                })),
3221            )
3222                .into_response();
3223        }
3224    };
3225
3226    // Prepare response
3227    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3228        raw.clone()
3229    } else {
3230        match serde_json::to_value(&result.spec.spec) {
3231            Ok(json) => json,
3232            Err(e) => {
3233                return (
3234                    StatusCode::INTERNAL_SERVER_ERROR,
3235                    Json(serde_json::json!({
3236                        "error": "Serialization error",
3237                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3238                    })),
3239                )
3240                    .into_response();
3241            }
3242        }
3243    };
3244
3245    // Build response with metadata
3246    let response = serde_json::json!({
3247        "spec": spec_json,
3248        "metadata": {
3249            "requests_analyzed": result.metadata.requests_analyzed,
3250            "paths_inferred": result.metadata.paths_inferred,
3251            "path_confidence": result.metadata.path_confidence,
3252            "generated_at": result.metadata.generated_at.to_rfc3339(),
3253            "duration_ms": result.metadata.duration_ms,
3254        }
3255    });
3256
3257    Json(response).into_response()
3258}
3259
3260/// List all rule explanations
3261async fn list_rule_explanations(
3262    State(state): State<ManagementState>,
3263    Query(params): Query<std::collections::HashMap<String, String>>,
3264) -> impl IntoResponse {
3265    use mockforge_core::intelligent_behavior::RuleType;
3266
3267    let explanations = state.rule_explanations.read().await;
3268    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3269
3270    // Filter by rule type if provided
3271    if let Some(rule_type_str) = params.get("rule_type") {
3272        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3273            explanations_vec.retain(|e| e.rule_type == rule_type);
3274        }
3275    }
3276
3277    // Filter by minimum confidence if provided
3278    if let Some(min_confidence_str) = params.get("min_confidence") {
3279        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3280            explanations_vec.retain(|e| e.confidence >= min_confidence);
3281        }
3282    }
3283
3284    // Sort by confidence (descending) and then by generated_at (descending)
3285    explanations_vec.sort_by(|a, b| {
3286        b.confidence
3287            .partial_cmp(&a.confidence)
3288            .unwrap_or(std::cmp::Ordering::Equal)
3289            .then_with(|| b.generated_at.cmp(&a.generated_at))
3290    });
3291
3292    Json(serde_json::json!({
3293        "explanations": explanations_vec,
3294        "total": explanations_vec.len(),
3295    }))
3296    .into_response()
3297}
3298
3299/// Get a specific rule explanation by ID
3300async fn get_rule_explanation(
3301    State(state): State<ManagementState>,
3302    Path(rule_id): Path<String>,
3303) -> impl IntoResponse {
3304    let explanations = state.rule_explanations.read().await;
3305
3306    match explanations.get(&rule_id) {
3307        Some(explanation) => Json(serde_json::json!({
3308            "explanation": explanation,
3309        }))
3310        .into_response(),
3311        None => (
3312            StatusCode::NOT_FOUND,
3313            Json(serde_json::json!({
3314                "error": "Rule explanation not found",
3315                "message": format!("No explanation found for rule ID: {}", rule_id)
3316            })),
3317        )
3318            .into_response(),
3319    }
3320}
3321
3322/// Request for learning from examples
3323#[derive(Debug, Deserialize)]
3324pub struct LearnFromExamplesRequest {
3325    /// Example request/response pairs to learn from
3326    pub examples: Vec<ExamplePairRequest>,
3327    /// Optional configuration override
3328    #[serde(default)]
3329    pub config: Option<serde_json::Value>,
3330}
3331
3332/// Example pair request format
3333#[derive(Debug, Deserialize)]
3334pub struct ExamplePairRequest {
3335    /// Request data (method, path, body, etc.)
3336    pub request: serde_json::Value,
3337    /// Response data (status_code, body, etc.)
3338    pub response: serde_json::Value,
3339}
3340
3341/// Learn behavioral rules from example pairs
3342///
3343/// This endpoint accepts example request/response pairs, generates behavioral rules
3344/// with explanations, and stores the explanations for later retrieval.
3345async fn learn_from_examples(
3346    State(state): State<ManagementState>,
3347    Json(request): Json<LearnFromExamplesRequest>,
3348) -> impl IntoResponse {
3349    use mockforge_core::intelligent_behavior::{
3350        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3351        rule_generator::{ExamplePair, RuleGenerator},
3352    };
3353
3354    if request.examples.is_empty() {
3355        return (
3356            StatusCode::BAD_REQUEST,
3357            Json(serde_json::json!({
3358                "error": "No examples provided",
3359                "message": "At least one example pair is required"
3360            })),
3361        )
3362            .into_response();
3363    }
3364
3365    // Convert request examples to ExamplePair format
3366    let example_pairs: Result<Vec<ExamplePair>, String> = request
3367        .examples
3368        .into_iter()
3369        .enumerate()
3370        .map(|(idx, ex)| {
3371            // Parse request JSON to extract method, path, body, etc.
3372            let method = ex
3373                .request
3374                .get("method")
3375                .and_then(|v| v.as_str())
3376                .map(|s| s.to_string())
3377                .unwrap_or_else(|| "GET".to_string());
3378            let path = ex
3379                .request
3380                .get("path")
3381                .and_then(|v| v.as_str())
3382                .map(|s| s.to_string())
3383                .unwrap_or_else(|| "/".to_string());
3384            let request_body = ex.request.get("body").cloned();
3385            let query_params = ex
3386                .request
3387                .get("query_params")
3388                .and_then(|v| v.as_object())
3389                .map(|obj| {
3390                    obj.iter()
3391                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3392                        .collect()
3393                })
3394                .unwrap_or_default();
3395            let headers = ex
3396                .request
3397                .get("headers")
3398                .and_then(|v| v.as_object())
3399                .map(|obj| {
3400                    obj.iter()
3401                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3402                        .collect()
3403                })
3404                .unwrap_or_default();
3405
3406            // Parse response JSON to extract status, body, etc.
3407            let status = ex
3408                .response
3409                .get("status_code")
3410                .or_else(|| ex.response.get("status"))
3411                .and_then(|v| v.as_u64())
3412                .map(|n| n as u16)
3413                .unwrap_or(200);
3414            let response_body = ex.response.get("body").cloned();
3415
3416            Ok(ExamplePair {
3417                method,
3418                path,
3419                request: request_body,
3420                status,
3421                response: response_body,
3422                query_params,
3423                headers,
3424                metadata: {
3425                    let mut meta = std::collections::HashMap::new();
3426                    meta.insert("source".to_string(), "api".to_string());
3427                    meta.insert("example_index".to_string(), idx.to_string());
3428                    meta
3429                },
3430            })
3431        })
3432        .collect();
3433
3434    let example_pairs = match example_pairs {
3435        Ok(pairs) => pairs,
3436        Err(e) => {
3437            return (
3438                StatusCode::BAD_REQUEST,
3439                Json(serde_json::json!({
3440                    "error": "Invalid examples",
3441                    "message": e
3442                })),
3443            )
3444                .into_response();
3445        }
3446    };
3447
3448    // Create behavior config (use provided config or default)
3449    let behavior_config = if let Some(config_json) = request.config {
3450        // Try to deserialize custom config, fall back to default
3451        serde_json::from_value(config_json)
3452            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3453            .behavior_model
3454    } else {
3455        BehaviorModelConfig::default()
3456    };
3457
3458    // Create rule generator
3459    let generator = RuleGenerator::new(behavior_config);
3460
3461    // Generate rules with explanations
3462    let (rules, explanations) =
3463        match generator.generate_rules_with_explanations(example_pairs).await {
3464            Ok(result) => result,
3465            Err(e) => {
3466                return (
3467                    StatusCode::INTERNAL_SERVER_ERROR,
3468                    Json(serde_json::json!({
3469                        "error": "Rule generation failed",
3470                        "message": format!("Failed to generate rules: {}", e)
3471                    })),
3472                )
3473                    .into_response();
3474            }
3475        };
3476
3477    // Store explanations in ManagementState
3478    {
3479        let mut stored_explanations = state.rule_explanations.write().await;
3480        for explanation in &explanations {
3481            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3482        }
3483    }
3484
3485    // Prepare response
3486    let response = serde_json::json!({
3487        "success": true,
3488        "rules_generated": {
3489            "consistency_rules": rules.consistency_rules.len(),
3490            "schemas": rules.schemas.len(),
3491            "state_machines": rules.state_transitions.len(),
3492            "system_prompt": !rules.system_prompt.is_empty(),
3493        },
3494        "explanations": explanations.iter().map(|e| serde_json::json!({
3495            "rule_id": e.rule_id,
3496            "rule_type": e.rule_type,
3497            "confidence": e.confidence,
3498            "reasoning": e.reasoning,
3499        })).collect::<Vec<_>>(),
3500        "total_explanations": explanations.len(),
3501    });
3502
3503    Json(response).into_response()
3504}
3505
3506fn extract_yaml_spec(text: &str) -> String {
3507    // Try to find YAML code blocks
3508    if let Some(start) = text.find("```yaml") {
3509        let yaml_start = text[start + 7..].trim_start();
3510        if let Some(end) = yaml_start.find("```") {
3511            return yaml_start[..end].trim().to_string();
3512        }
3513    }
3514    if let Some(start) = text.find("```") {
3515        let content_start = text[start + 3..].trim_start();
3516        if let Some(end) = content_start.find("```") {
3517            return content_start[..end].trim().to_string();
3518        }
3519    }
3520
3521    // Check if it starts with openapi: or asyncapi:
3522    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3523        return text.trim().to_string();
3524    }
3525
3526    // Return as-is if no code blocks found
3527    text.trim().to_string()
3528}
3529
3530/// Extract GraphQL schema from text content
3531fn extract_graphql_schema(text: &str) -> String {
3532    // Try to find GraphQL code blocks
3533    if let Some(start) = text.find("```graphql") {
3534        let schema_start = text[start + 10..].trim_start();
3535        if let Some(end) = schema_start.find("```") {
3536            return schema_start[..end].trim().to_string();
3537        }
3538    }
3539    if let Some(start) = text.find("```") {
3540        let content_start = text[start + 3..].trim_start();
3541        if let Some(end) = content_start.find("```") {
3542            return content_start[..end].trim().to_string();
3543        }
3544    }
3545
3546    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3547    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3548        return text.trim().to_string();
3549    }
3550
3551    text.trim().to_string()
3552}
3553
3554// ========== Chaos Engineering Management ==========
3555
3556/// Get current chaos engineering configuration
3557async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3558    #[cfg(feature = "chaos")]
3559    {
3560        if let Some(chaos_state) = &state.chaos_api_state {
3561            let config = chaos_state.config.read().await;
3562            // Convert ChaosConfig to JSON response format
3563            Json(serde_json::json!({
3564                "enabled": config.enabled,
3565                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3566                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3567                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3568                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3569            }))
3570            .into_response()
3571        } else {
3572            // Chaos API not available, return default
3573            Json(serde_json::json!({
3574                "enabled": false,
3575                "latency": null,
3576                "fault_injection": null,
3577                "rate_limit": null,
3578                "traffic_shaping": null,
3579            }))
3580            .into_response()
3581        }
3582    }
3583    #[cfg(not(feature = "chaos"))]
3584    {
3585        // Chaos feature not enabled
3586        Json(serde_json::json!({
3587            "enabled": false,
3588            "latency": null,
3589            "fault_injection": null,
3590            "rate_limit": null,
3591            "traffic_shaping": null,
3592        }))
3593        .into_response()
3594    }
3595}
3596
3597/// Request to update chaos configuration
3598#[derive(Debug, Deserialize)]
3599pub struct ChaosConfigUpdate {
3600    /// Whether to enable chaos engineering
3601    pub enabled: Option<bool>,
3602    /// Latency configuration
3603    pub latency: Option<serde_json::Value>,
3604    /// Fault injection configuration
3605    pub fault_injection: Option<serde_json::Value>,
3606    /// Rate limiting configuration
3607    pub rate_limit: Option<serde_json::Value>,
3608    /// Traffic shaping configuration
3609    pub traffic_shaping: Option<serde_json::Value>,
3610}
3611
3612/// Update chaos engineering configuration
3613async fn update_chaos_config(
3614    State(state): State<ManagementState>,
3615    Json(config_update): Json<ChaosConfigUpdate>,
3616) -> impl IntoResponse {
3617    #[cfg(feature = "chaos")]
3618    {
3619        if let Some(chaos_state) = &state.chaos_api_state {
3620            use mockforge_chaos::config::{
3621                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3622                TrafficShapingConfig,
3623            };
3624
3625            let mut config = chaos_state.config.write().await;
3626
3627            // Update enabled flag if provided
3628            if let Some(enabled) = config_update.enabled {
3629                config.enabled = enabled;
3630            }
3631
3632            // Update latency config if provided
3633            if let Some(latency_json) = config_update.latency {
3634                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3635                    config.latency = Some(latency);
3636                }
3637            }
3638
3639            // Update fault injection config if provided
3640            if let Some(fault_json) = config_update.fault_injection {
3641                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3642                    config.fault_injection = Some(fault);
3643                }
3644            }
3645
3646            // Update rate limit config if provided
3647            if let Some(rate_json) = config_update.rate_limit {
3648                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3649                    config.rate_limit = Some(rate);
3650                }
3651            }
3652
3653            // Update traffic shaping config if provided
3654            if let Some(traffic_json) = config_update.traffic_shaping {
3655                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3656                    config.traffic_shaping = Some(traffic);
3657                }
3658            }
3659
3660            // Reinitialize middleware injectors with new config
3661            // The middleware will pick up the changes on the next request
3662            drop(config);
3663
3664            info!("Chaos configuration updated successfully");
3665            Json(serde_json::json!({
3666                "success": true,
3667                "message": "Chaos configuration updated and applied"
3668            }))
3669            .into_response()
3670        } else {
3671            (
3672                StatusCode::SERVICE_UNAVAILABLE,
3673                Json(serde_json::json!({
3674                    "success": false,
3675                    "error": "Chaos API not available",
3676                    "message": "Chaos engineering is not enabled or configured"
3677                })),
3678            )
3679                .into_response()
3680        }
3681    }
3682    #[cfg(not(feature = "chaos"))]
3683    {
3684        (
3685            StatusCode::NOT_IMPLEMENTED,
3686            Json(serde_json::json!({
3687                "success": false,
3688                "error": "Chaos feature not enabled",
3689                "message": "Chaos engineering feature is not compiled into this build"
3690            })),
3691        )
3692            .into_response()
3693    }
3694}
3695
3696// ========== Network Profile Management ==========
3697
3698/// List available network profiles
3699async fn list_network_profiles() -> impl IntoResponse {
3700    use mockforge_core::network_profiles::NetworkProfileCatalog;
3701
3702    let catalog = NetworkProfileCatalog::default();
3703    let profiles: Vec<serde_json::Value> = catalog
3704        .list_profiles_with_description()
3705        .iter()
3706        .map(|(name, description)| {
3707            serde_json::json!({
3708                "name": name,
3709                "description": description,
3710            })
3711        })
3712        .collect();
3713
3714    Json(serde_json::json!({
3715        "profiles": profiles
3716    }))
3717    .into_response()
3718}
3719
3720#[derive(Debug, Deserialize)]
3721/// Request to apply a network profile
3722pub struct ApplyNetworkProfileRequest {
3723    /// Name of the network profile to apply
3724    pub profile_name: String,
3725}
3726
3727/// Apply a network profile
3728async fn apply_network_profile(
3729    State(state): State<ManagementState>,
3730    Json(request): Json<ApplyNetworkProfileRequest>,
3731) -> impl IntoResponse {
3732    use mockforge_core::network_profiles::NetworkProfileCatalog;
3733
3734    let catalog = NetworkProfileCatalog::default();
3735    if let Some(profile) = catalog.get(&request.profile_name) {
3736        // Apply profile to server configuration if available
3737        // NetworkProfile contains latency and traffic_shaping configs
3738        if let Some(server_config) = &state.server_config {
3739            let mut config = server_config.write().await;
3740
3741            // Apply network profile's traffic shaping to core config
3742            use mockforge_core::config::NetworkShapingConfig;
3743
3744            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3745            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3746            // which has bandwidth and burst_loss fields
3747            let network_shaping = NetworkShapingConfig {
3748                enabled: profile.traffic_shaping.bandwidth.enabled
3749                    || profile.traffic_shaping.burst_loss.enabled,
3750                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3751                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3752                max_connections: 1000, // Default value
3753            };
3754
3755            // Update chaos config if it exists, or create it
3756            // Chaos config is in observability.chaos, not core.chaos
3757            if let Some(ref mut chaos) = config.observability.chaos {
3758                chaos.traffic_shaping = Some(network_shaping);
3759            } else {
3760                // Create minimal chaos config with traffic shaping
3761                use mockforge_core::config::ChaosEngConfig;
3762                config.observability.chaos = Some(ChaosEngConfig {
3763                    enabled: true,
3764                    latency: None,
3765                    fault_injection: None,
3766                    rate_limit: None,
3767                    traffic_shaping: Some(network_shaping),
3768                    scenario: None,
3769                });
3770            }
3771
3772            info!("Network profile '{}' applied to server configuration", request.profile_name);
3773        } else {
3774            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3775        }
3776
3777        // Also update chaos API state if available
3778        #[cfg(feature = "chaos")]
3779        {
3780            if let Some(chaos_state) = &state.chaos_api_state {
3781                use mockforge_chaos::config::TrafficShapingConfig;
3782
3783                let mut chaos_config = chaos_state.config.write().await;
3784                // Apply profile's traffic shaping to chaos API state
3785                let chaos_traffic_shaping = TrafficShapingConfig {
3786                    enabled: profile.traffic_shaping.bandwidth.enabled
3787                        || profile.traffic_shaping.burst_loss.enabled,
3788                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3789                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3790                    max_connections: 0,
3791                    connection_timeout_ms: 30000,
3792                };
3793                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3794                chaos_config.enabled = true; // Enable chaos when applying a profile
3795                drop(chaos_config);
3796                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3797            }
3798        }
3799
3800        Json(serde_json::json!({
3801            "success": true,
3802            "message": format!("Network profile '{}' applied", request.profile_name),
3803            "profile": {
3804                "name": profile.name,
3805                "description": profile.description,
3806            }
3807        }))
3808        .into_response()
3809    } else {
3810        (
3811            StatusCode::NOT_FOUND,
3812            Json(serde_json::json!({
3813                "error": "Profile not found",
3814                "message": format!("Network profile '{}' not found", request.profile_name)
3815            })),
3816        )
3817            .into_response()
3818    }
3819}
3820
3821/// Build the management API router with UI Builder support
3822pub fn management_router_with_ui_builder(
3823    state: ManagementState,
3824    server_config: mockforge_core::config::ServerConfig,
3825) -> Router {
3826    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3827
3828    // Create the base management router
3829    let management = management_router(state);
3830
3831    // Create UI Builder state and router
3832    let ui_builder_state = UIBuilderState::new(server_config);
3833    let ui_builder = create_ui_builder_router(ui_builder_state);
3834
3835    // Nest UI Builder under /ui-builder
3836    management.nest("/ui-builder", ui_builder)
3837}
3838
3839/// Build management router with spec import API
3840pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3841    use crate::spec_import::{spec_import_router, SpecImportState};
3842
3843    // Create base management router
3844    let management = management_router(state);
3845
3846    // Merge with spec import router
3847    Router::new()
3848        .merge(management)
3849        .merge(spec_import_router(SpecImportState::new()))
3850}
3851
3852#[cfg(test)]
3853mod tests {
3854    use super::*;
3855
3856    #[tokio::test]
3857    async fn test_create_and_get_mock() {
3858        let state = ManagementState::new(None, None, 3000);
3859
3860        let mock = MockConfig {
3861            id: "test-1".to_string(),
3862            name: "Test Mock".to_string(),
3863            method: "GET".to_string(),
3864            path: "/test".to_string(),
3865            response: MockResponse {
3866                body: serde_json::json!({"message": "test"}),
3867                headers: None,
3868            },
3869            enabled: true,
3870            latency_ms: None,
3871            status_code: Some(200),
3872            request_match: None,
3873            priority: None,
3874            scenario: None,
3875            required_scenario_state: None,
3876            new_scenario_state: None,
3877        };
3878
3879        // Create mock
3880        {
3881            let mut mocks = state.mocks.write().await;
3882            mocks.push(mock.clone());
3883        }
3884
3885        // Get mock
3886        let mocks = state.mocks.read().await;
3887        let found = mocks.iter().find(|m| m.id == "test-1");
3888        assert!(found.is_some());
3889        assert_eq!(found.unwrap().name, "Test Mock");
3890    }
3891
3892    #[tokio::test]
3893    async fn test_server_stats() {
3894        let state = ManagementState::new(None, None, 3000);
3895
3896        // Add some mocks
3897        {
3898            let mut mocks = state.mocks.write().await;
3899            mocks.push(MockConfig {
3900                id: "1".to_string(),
3901                name: "Mock 1".to_string(),
3902                method: "GET".to_string(),
3903                path: "/test1".to_string(),
3904                response: MockResponse {
3905                    body: serde_json::json!({}),
3906                    headers: None,
3907                },
3908                enabled: true,
3909                latency_ms: None,
3910                status_code: Some(200),
3911                request_match: None,
3912                priority: None,
3913                scenario: None,
3914                required_scenario_state: None,
3915                new_scenario_state: None,
3916            });
3917            mocks.push(MockConfig {
3918                id: "2".to_string(),
3919                name: "Mock 2".to_string(),
3920                method: "POST".to_string(),
3921                path: "/test2".to_string(),
3922                response: MockResponse {
3923                    body: serde_json::json!({}),
3924                    headers: None,
3925                },
3926                enabled: false,
3927                latency_ms: None,
3928                status_code: Some(201),
3929                request_match: None,
3930                priority: None,
3931                scenario: None,
3932                required_scenario_state: None,
3933                new_scenario_state: None,
3934            });
3935        }
3936
3937        let mocks = state.mocks.read().await;
3938        assert_eq!(mocks.len(), 2);
3939        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3940    }
3941}