Skip to main content

mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{IntoResponse, Json},
9    routing::{delete, get, post, put},
10    Router,
11};
12#[cfg(any(feature = "mqtt", feature = "kafka"))]
13use axum::response::sse::{Event, Sse};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32/// Get the broadcast channel capacity from environment or use default
33fn get_message_broadcast_capacity() -> usize {
34    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35        .ok()
36        .and_then(|s| s.parse().ok())
37        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40/// Message event types for real-time monitoring
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45    #[cfg(feature = "mqtt")]
46    /// MQTT message event
47    Mqtt(MqttMessageEvent),
48    #[cfg(feature = "kafka")]
49    /// Kafka message event
50    Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54/// MQTT message event for real-time monitoring
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57    /// MQTT topic name
58    pub topic: String,
59    /// Message payload content
60    pub payload: String,
61    /// Quality of Service level (0, 1, or 2)
62    pub qos: u8,
63    /// Whether the message is retained
64    pub retain: bool,
65    /// RFC3339 formatted timestamp
66    pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72    pub topic: String,
73    pub key: Option<String>,
74    pub value: String,
75    pub partition: i32,
76    pub offset: i64,
77    pub headers: Option<std::collections::HashMap<String, String>>,
78    pub timestamp: String,
79}
80
81/// Mock configuration representation
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84    /// Unique identifier for the mock
85    #[serde(skip_serializing_if = "String::is_empty")]
86    pub id: String,
87    /// Human-readable name for the mock
88    pub name: String,
89    /// HTTP method (GET, POST, etc.)
90    pub method: String,
91    /// API path pattern to match
92    pub path: String,
93    /// Response configuration
94    pub response: MockResponse,
95    /// Whether this mock is currently enabled
96    #[serde(default = "default_true")]
97    pub enabled: bool,
98    /// Optional latency to inject in milliseconds
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub latency_ms: Option<u64>,
101    /// Optional HTTP status code override
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub status_code: Option<u16>,
104    /// Request matching criteria (headers, query params, body patterns)
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub request_match: Option<RequestMatchCriteria>,
107    /// Priority for mock ordering (higher priority mocks are matched first)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub priority: Option<i32>,
110    /// Scenario name for stateful mocking
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub scenario: Option<String>,
113    /// Required scenario state for this mock to be active
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub required_scenario_state: Option<String>,
116    /// New scenario state after this mock is matched
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122    true
123}
124
125/// Mock response configuration
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128    /// Response body as JSON
129    pub body: serde_json::Value,
130    /// Optional custom response headers
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135/// Request matching criteria for advanced request matching
136#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138    /// Headers that must be present and match (case-insensitive header names)
139    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140    pub headers: std::collections::HashMap<String, String>,
141    /// Query parameters that must be present and match
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub query_params: std::collections::HashMap<String, String>,
144    /// Request body pattern (supports exact match or regex)
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub body_pattern: Option<String>,
147    /// JSONPath expression for JSON body matching
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub json_path: Option<String>,
150    /// XPath expression for XML body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub xpath: Option<String>,
153    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub custom_matcher: Option<String>,
156}
157
158/// Check if a request matches the given mock configuration
159///
160/// This function implements comprehensive request matching including:
161/// - Method and path matching
162/// - Header matching (with regex support)
163/// - Query parameter matching
164/// - Body pattern matching (exact, regex, JSONPath, XPath)
165/// - Custom matcher expressions
166pub fn mock_matches_request(
167    mock: &MockConfig,
168    method: &str,
169    path: &str,
170    headers: &std::collections::HashMap<String, String>,
171    query_params: &std::collections::HashMap<String, String>,
172    body: Option<&[u8]>,
173) -> bool {
174    use regex::Regex;
175
176    // Check if mock is enabled
177    if !mock.enabled {
178        return false;
179    }
180
181    // Check method (case-insensitive)
182    if mock.method.to_uppercase() != method.to_uppercase() {
183        return false;
184    }
185
186    // Check path pattern (supports wildcards and path parameters)
187    if !path_matches_pattern(&mock.path, path) {
188        return false;
189    }
190
191    // Check request matching criteria if present
192    if let Some(criteria) = &mock.request_match {
193        // Check headers
194        for (key, expected_value) in &criteria.headers {
195            let header_key_lower = key.to_lowercase();
196            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198            if let Some((_, actual_value)) = found {
199                // Try regex match first, then exact match
200                if let Ok(re) = Regex::new(expected_value) {
201                    if !re.is_match(actual_value) {
202                        return false;
203                    }
204                } else if actual_value != expected_value {
205                    return false;
206                }
207            } else {
208                return false; // Header not found
209            }
210        }
211
212        // Check query parameters
213        for (key, expected_value) in &criteria.query_params {
214            if let Some(actual_value) = query_params.get(key) {
215                if actual_value != expected_value {
216                    return false;
217                }
218            } else {
219                return false; // Query param not found
220            }
221        }
222
223        // Check body pattern
224        if let Some(pattern) = &criteria.body_pattern {
225            if let Some(body_bytes) = body {
226                let body_str = String::from_utf8_lossy(body_bytes);
227                // Try regex first, then exact match
228                if let Ok(re) = Regex::new(pattern) {
229                    if !re.is_match(&body_str) {
230                        return false;
231                    }
232                } else if body_str.as_ref() != pattern {
233                    return false;
234                }
235            } else {
236                return false; // Body required but not present
237            }
238        }
239
240        // Check JSONPath (simplified implementation)
241        if let Some(json_path) = &criteria.json_path {
242            if let Some(body_bytes) = body {
243                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245                        // Simple JSONPath check
246                        if !json_path_exists(&json_value, json_path) {
247                            return false;
248                        }
249                    }
250                }
251            }
252        }
253
254        // Check XPath (placeholder - requires XML/XPath library for full implementation)
255        if let Some(_xpath) = &criteria.xpath {
256            // XPath matching would require an XML/XPath library
257            // For now, this is a placeholder that warns but doesn't fail
258            tracing::warn!("XPath matching not yet fully implemented");
259        }
260
261        // Check custom matcher
262        if let Some(custom) = &criteria.custom_matcher {
263            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
264                return false;
265            }
266        }
267    }
268
269    true
270}
271
272/// Check if a path matches a pattern (supports wildcards and path parameters)
273fn path_matches_pattern(pattern: &str, path: &str) -> bool {
274    // Exact match
275    if pattern == path {
276        return true;
277    }
278
279    // Wildcard match
280    if pattern == "*" {
281        return true;
282    }
283
284    // Path parameter matching (e.g., /users/{id} matches /users/123)
285    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
286    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
287
288    if pattern_parts.len() != path_parts.len() {
289        // Check for wildcard patterns
290        if pattern.contains('*') {
291            return matches_wildcard_pattern(pattern, path);
292        }
293        return false;
294    }
295
296    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
297        // Check for path parameters {param}
298        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
299            continue; // Matches any value
300        }
301
302        if pattern_part != path_part {
303            return false;
304        }
305    }
306
307    true
308}
309
310/// Check if path matches a wildcard pattern
311fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
312    use regex::Regex;
313
314    // Convert pattern to regex
315    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
316
317    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
318        return re.is_match(path);
319    }
320
321    false
322}
323
324/// Check if a JSONPath exists in a JSON value (simplified implementation)
325fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
326    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
327    // This handles simple paths like $.field or $.field.subfield
328    if let Some(path) = json_path.strip_prefix("$.") {
329        let parts: Vec<&str> = path.split('.').collect();
330
331        let mut current = json;
332        for part in parts {
333            if let Some(obj) = current.as_object() {
334                if let Some(value) = obj.get(part) {
335                    current = value;
336                } else {
337                    return false;
338                }
339            } else {
340                return false;
341            }
342        }
343        true
344    } else {
345        // For complex JSONPath expressions, would need a proper JSONPath library
346        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
347        false
348    }
349}
350
351/// Evaluate a custom matcher expression
352fn evaluate_custom_matcher(
353    expression: &str,
354    method: &str,
355    path: &str,
356    headers: &std::collections::HashMap<String, String>,
357    query_params: &std::collections::HashMap<String, String>,
358    body: Option<&[u8]>,
359) -> bool {
360    use regex::Regex;
361
362    let expr = expression.trim();
363
364    // Handle equality expressions (field == "value")
365    if expr.contains("==") {
366        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
367        if parts.len() != 2 {
368            return false;
369        }
370
371        let field = parts[0];
372        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
373
374        match field {
375            "method" => method == expected_value,
376            "path" => path == expected_value,
377            _ if field.starts_with("headers.") => {
378                let header_name = &field[8..];
379                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
380            }
381            _ if field.starts_with("query.") => {
382                let param_name = &field[6..];
383                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
384            }
385            _ => false,
386        }
387    }
388    // Handle regex match expressions (field =~ "pattern")
389    else if expr.contains("=~") {
390        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
391        if parts.len() != 2 {
392            return false;
393        }
394
395        let field = parts[0];
396        let pattern = parts[1].trim_matches('"').trim_matches('\'');
397
398        if let Ok(re) = Regex::new(pattern) {
399            match field {
400                "method" => re.is_match(method),
401                "path" => re.is_match(path),
402                _ if field.starts_with("headers.") => {
403                    let header_name = &field[8..];
404                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
405                }
406                _ if field.starts_with("query.") => {
407                    let param_name = &field[6..];
408                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
409                }
410                _ => false,
411            }
412        } else {
413            false
414        }
415    }
416    // Handle contains expressions (field contains "value")
417    else if expr.contains("contains") {
418        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
419        if parts.len() != 2 {
420            return false;
421        }
422
423        let field = parts[0];
424        let search_value = parts[1].trim_matches('"').trim_matches('\'');
425
426        match field {
427            "path" => path.contains(search_value),
428            _ if field.starts_with("headers.") => {
429                let header_name = &field[8..];
430                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
431            }
432            _ if field.starts_with("body") => {
433                if let Some(body_bytes) = body {
434                    let body_str = String::from_utf8_lossy(body_bytes);
435                    body_str.contains(search_value)
436                } else {
437                    false
438                }
439            }
440            _ => false,
441        }
442    } else {
443        // Unknown expression format
444        tracing::warn!("Unknown custom matcher expression format: {}", expr);
445        false
446    }
447}
448
449/// Server statistics
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct ServerStats {
452    /// Server uptime in seconds
453    pub uptime_seconds: u64,
454    /// Total number of requests processed
455    pub total_requests: u64,
456    /// Number of active mock configurations
457    pub active_mocks: usize,
458    /// Number of currently enabled mocks
459    pub enabled_mocks: usize,
460    /// Number of registered API routes
461    pub registered_routes: usize,
462}
463
464/// Server configuration info
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ServerConfig {
467    /// MockForge version string
468    pub version: String,
469    /// Server port number
470    pub port: u16,
471    /// Whether an OpenAPI spec is loaded
472    pub has_openapi_spec: bool,
473    /// Optional path to the OpenAPI spec file
474    #[serde(skip_serializing_if = "Option::is_none")]
475    pub spec_path: Option<String>,
476}
477
478/// Shared state for the management API
479#[derive(Clone)]
480pub struct ManagementState {
481    /// Collection of mock configurations
482    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
483    /// Optional OpenAPI specification
484    pub spec: Option<Arc<OpenApiSpec>>,
485    /// Optional path to the OpenAPI spec file
486    pub spec_path: Option<String>,
487    /// Server port number
488    pub port: u16,
489    /// Server start time for uptime calculation
490    pub start_time: std::time::Instant,
491    /// Counter for total requests processed
492    pub request_counter: Arc<RwLock<u64>>,
493    /// Optional proxy configuration for migration pipeline
494    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
495    /// Optional SMTP registry for email mocking
496    #[cfg(feature = "smtp")]
497    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
498    /// Optional MQTT broker for message mocking
499    #[cfg(feature = "mqtt")]
500    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
501    /// Optional Kafka broker for event streaming
502    #[cfg(feature = "kafka")]
503    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
504    /// Broadcast channel for message events (MQTT & Kafka)
505    #[cfg(any(feature = "mqtt", feature = "kafka"))]
506    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
507    /// State machine manager for scenario state machines
508    pub state_machine_manager:
509        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
510    /// Optional WebSocket broadcast channel for real-time updates
511    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
512    /// Lifecycle hook registry for extensibility
513    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
514    /// Rule explanations storage (in-memory for now)
515    pub rule_explanations: Arc<
516        RwLock<
517            std::collections::HashMap<
518                String,
519                mockforge_core::intelligent_behavior::RuleExplanation,
520            >,
521        >,
522    >,
523    /// Optional chaos API state for chaos config management
524    #[cfg(feature = "chaos")]
525    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
526    /// Optional server configuration for profile application
527    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
528}
529
530impl ManagementState {
531    /// Create a new management state
532    ///
533    /// # Arguments
534    /// * `spec` - Optional OpenAPI specification
535    /// * `spec_path` - Optional path to the OpenAPI spec file
536    /// * `port` - Server port number
537    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
538        Self {
539            mocks: Arc::new(RwLock::new(Vec::new())),
540            spec,
541            spec_path,
542            port,
543            start_time: std::time::Instant::now(),
544            request_counter: Arc::new(RwLock::new(0)),
545            proxy_config: None,
546            #[cfg(feature = "smtp")]
547            smtp_registry: None,
548            #[cfg(feature = "mqtt")]
549            mqtt_broker: None,
550            #[cfg(feature = "kafka")]
551            kafka_broker: None,
552            #[cfg(any(feature = "mqtt", feature = "kafka"))]
553            message_events: {
554                let capacity = get_message_broadcast_capacity();
555                let (tx, _) = broadcast::channel(capacity);
556                Arc::new(tx)
557            },
558            state_machine_manager: Arc::new(RwLock::new(
559                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
560            )),
561            ws_broadcast: None,
562            lifecycle_hooks: None,
563            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
564            #[cfg(feature = "chaos")]
565            chaos_api_state: None,
566            server_config: None,
567        }
568    }
569
570    /// Add lifecycle hook registry to management state
571    pub fn with_lifecycle_hooks(
572        mut self,
573        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
574    ) -> Self {
575        self.lifecycle_hooks = Some(hooks);
576        self
577    }
578
579    /// Add WebSocket broadcast channel to management state
580    pub fn with_ws_broadcast(
581        mut self,
582        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
583    ) -> Self {
584        self.ws_broadcast = Some(ws_broadcast);
585        self
586    }
587
588    /// Add proxy configuration to management state
589    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
590        self.proxy_config = Some(proxy_config);
591        self
592    }
593
594    #[cfg(feature = "smtp")]
595    /// Add SMTP registry to management state
596    pub fn with_smtp_registry(
597        mut self,
598        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
599    ) -> Self {
600        self.smtp_registry = Some(smtp_registry);
601        self
602    }
603
604    #[cfg(feature = "mqtt")]
605    /// Add MQTT broker to management state
606    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
607        self.mqtt_broker = Some(mqtt_broker);
608        self
609    }
610
611    #[cfg(feature = "kafka")]
612    /// Add Kafka broker to management state
613    pub fn with_kafka_broker(
614        mut self,
615        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
616    ) -> Self {
617        self.kafka_broker = Some(kafka_broker);
618        self
619    }
620
621    #[cfg(feature = "chaos")]
622    /// Add chaos API state to management state
623    pub fn with_chaos_api_state(
624        mut self,
625        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
626    ) -> Self {
627        self.chaos_api_state = Some(chaos_api_state);
628        self
629    }
630
631    /// Add server configuration to management state
632    pub fn with_server_config(
633        mut self,
634        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
635    ) -> Self {
636        self.server_config = Some(server_config);
637        self
638    }
639}
640
641/// List all mocks
642async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
643    let mocks = state.mocks.read().await;
644    Json(serde_json::json!({
645        "mocks": *mocks,
646        "total": mocks.len(),
647        "enabled": mocks.iter().filter(|m| m.enabled).count()
648    }))
649}
650
651/// Get a specific mock by ID
652async fn get_mock(
653    State(state): State<ManagementState>,
654    Path(id): Path<String>,
655) -> Result<Json<MockConfig>, StatusCode> {
656    let mocks = state.mocks.read().await;
657    mocks
658        .iter()
659        .find(|m| m.id == id)
660        .cloned()
661        .map(Json)
662        .ok_or(StatusCode::NOT_FOUND)
663}
664
665/// Create a new mock
666async fn create_mock(
667    State(state): State<ManagementState>,
668    Json(mut mock): Json<MockConfig>,
669) -> Result<Json<MockConfig>, StatusCode> {
670    let mut mocks = state.mocks.write().await;
671
672    // Generate ID if not provided
673    if mock.id.is_empty() {
674        mock.id = uuid::Uuid::new_v4().to_string();
675    }
676
677    // Check for duplicate ID
678    if mocks.iter().any(|m| m.id == mock.id) {
679        return Err(StatusCode::CONFLICT);
680    }
681
682    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
683
684    // Invoke lifecycle hooks
685    if let Some(hooks) = &state.lifecycle_hooks {
686        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
687            id: mock.id.clone(),
688            name: mock.name.clone(),
689            config: serde_json::to_value(&mock).unwrap_or_default(),
690        };
691        hooks.invoke_mock_created(&event).await;
692    }
693
694    mocks.push(mock.clone());
695
696    // Broadcast WebSocket event
697    if let Some(tx) = &state.ws_broadcast {
698        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
699    }
700
701    Ok(Json(mock))
702}
703
704/// Update an existing mock
705async fn update_mock(
706    State(state): State<ManagementState>,
707    Path(id): Path<String>,
708    Json(updated_mock): Json<MockConfig>,
709) -> Result<Json<MockConfig>, StatusCode> {
710    let mut mocks = state.mocks.write().await;
711
712    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
713
714    // Get old mock for comparison
715    let old_mock = mocks[position].clone();
716
717    info!("Updating mock: {}", id);
718    mocks[position] = updated_mock.clone();
719
720    // Invoke lifecycle hooks
721    if let Some(hooks) = &state.lifecycle_hooks {
722        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
723            id: updated_mock.id.clone(),
724            name: updated_mock.name.clone(),
725            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
726        };
727        hooks.invoke_mock_updated(&event).await;
728
729        // Check if enabled state changed
730        if old_mock.enabled != updated_mock.enabled {
731            let state_event = if updated_mock.enabled {
732                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
733                    id: updated_mock.id.clone(),
734                }
735            } else {
736                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
737                    id: updated_mock.id.clone(),
738                }
739            };
740            hooks.invoke_mock_state_changed(&state_event).await;
741        }
742    }
743
744    // Broadcast WebSocket event
745    if let Some(tx) = &state.ws_broadcast {
746        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
747    }
748
749    Ok(Json(updated_mock))
750}
751
752/// Delete a mock
753async fn delete_mock(
754    State(state): State<ManagementState>,
755    Path(id): Path<String>,
756) -> Result<StatusCode, StatusCode> {
757    let mut mocks = state.mocks.write().await;
758
759    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
760
761    // Get mock info before deletion for lifecycle hooks
762    let deleted_mock = mocks[position].clone();
763
764    info!("Deleting mock: {}", id);
765    mocks.remove(position);
766
767    // Invoke lifecycle hooks
768    if let Some(hooks) = &state.lifecycle_hooks {
769        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
770            id: deleted_mock.id.clone(),
771            name: deleted_mock.name.clone(),
772        };
773        hooks.invoke_mock_deleted(&event).await;
774    }
775
776    // Broadcast WebSocket event
777    if let Some(tx) = &state.ws_broadcast {
778        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
779    }
780
781    Ok(StatusCode::NO_CONTENT)
782}
783
784/// Request to validate configuration
785#[derive(Debug, Deserialize)]
786pub struct ValidateConfigRequest {
787    /// Configuration to validate (as JSON)
788    pub config: serde_json::Value,
789    /// Format of the configuration ("json" or "yaml")
790    #[serde(default = "default_format")]
791    pub format: String,
792}
793
794fn default_format() -> String {
795    "json".to_string()
796}
797
798/// Validate configuration without applying it
799async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
800    use mockforge_core::config::ServerConfig;
801
802    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
803        "yaml" | "yml" => {
804            let yaml_str = match serde_json::to_string(&request.config) {
805                Ok(s) => s,
806                Err(e) => {
807                    return (
808                        StatusCode::BAD_REQUEST,
809                        Json(serde_json::json!({
810                            "valid": false,
811                            "error": format!("Failed to convert to string: {}", e),
812                            "message": "Configuration validation failed"
813                        })),
814                    )
815                        .into_response();
816                }
817            };
818            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
819        }
820        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
821    };
822
823    match config_result {
824        Ok(_) => Json(serde_json::json!({
825            "valid": true,
826            "message": "Configuration is valid"
827        }))
828        .into_response(),
829        Err(e) => (
830            StatusCode::BAD_REQUEST,
831            Json(serde_json::json!({
832                "valid": false,
833                "error": format!("Invalid configuration: {}", e),
834                "message": "Configuration validation failed"
835            })),
836        )
837            .into_response(),
838    }
839}
840
841/// Request for bulk configuration update
842#[derive(Debug, Deserialize)]
843pub struct BulkConfigUpdateRequest {
844    /// Partial configuration updates (only specified fields will be updated)
845    pub updates: serde_json::Value,
846}
847
848/// Bulk update configuration
849///
850/// This endpoint allows updating multiple configuration options at once.
851/// Only the specified fields in the updates object will be modified.
852///
853/// Configuration updates are applied to the server configuration if available
854/// in ManagementState. Changes take effect immediately for supported settings.
855async fn bulk_update_config(
856    State(_state): State<ManagementState>,
857    Json(request): Json<BulkConfigUpdateRequest>,
858) -> impl IntoResponse {
859    // Validate the updates structure
860    if !request.updates.is_object() {
861        return (
862            StatusCode::BAD_REQUEST,
863            Json(serde_json::json!({
864                "error": "Invalid request",
865                "message": "Updates must be a JSON object"
866            })),
867        )
868            .into_response();
869    }
870
871    // Try to validate as partial ServerConfig
872    use mockforge_core::config::ServerConfig;
873
874    // Create a minimal valid config and try to merge updates
875    let base_config = ServerConfig::default();
876    let base_json = match serde_json::to_value(&base_config) {
877        Ok(v) => v,
878        Err(e) => {
879            return (
880                StatusCode::INTERNAL_SERVER_ERROR,
881                Json(serde_json::json!({
882                    "error": "Internal error",
883                    "message": format!("Failed to serialize base config: {}", e)
884                })),
885            )
886                .into_response();
887        }
888    };
889
890    // Merge updates into base config (simplified merge)
891    let mut merged = base_json.clone();
892    if let (Some(merged_obj), Some(updates_obj)) =
893        (merged.as_object_mut(), request.updates.as_object())
894    {
895        for (key, value) in updates_obj {
896            merged_obj.insert(key.clone(), value.clone());
897        }
898    }
899
900    // Validate the merged config
901    match serde_json::from_value::<ServerConfig>(merged) {
902        Ok(_) => {
903            // Config is valid
904            // Note: Runtime application of config changes would require:
905            // 1. Storing ServerConfig in ManagementState
906            // 2. Implementing hot-reload mechanism for server configuration
907            // 3. Updating router state and middleware based on new config
908            // For now, this endpoint only validates the configuration structure
909            Json(serde_json::json!({
910                "success": true,
911                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
912                "updates_received": request.updates,
913                "validated": true
914            }))
915            .into_response()
916        }
917        Err(e) => (
918            StatusCode::BAD_REQUEST,
919            Json(serde_json::json!({
920                "error": "Invalid configuration",
921                "message": format!("Configuration validation failed: {}", e),
922                "validated": false
923            })),
924        )
925            .into_response(),
926    }
927}
928
929/// Get server statistics
930async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
931    let mocks = state.mocks.read().await;
932    let request_count = *state.request_counter.read().await;
933
934    Json(ServerStats {
935        uptime_seconds: state.start_time.elapsed().as_secs(),
936        total_requests: request_count,
937        active_mocks: mocks.len(),
938        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
939        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
940    })
941}
942
943/// Get server configuration
944async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
945    Json(ServerConfig {
946        version: env!("CARGO_PKG_VERSION").to_string(),
947        port: state.port,
948        has_openapi_spec: state.spec.is_some(),
949        spec_path: state.spec_path.clone(),
950    })
951}
952
953/// Health check endpoint
954async fn health_check() -> Json<serde_json::Value> {
955    Json(serde_json::json!({
956        "status": "healthy",
957        "service": "mockforge-management",
958        "timestamp": chrono::Utc::now().to_rfc3339()
959    }))
960}
961
962/// Export format for mock configurations
963#[derive(Debug, Clone, Serialize, Deserialize)]
964#[serde(rename_all = "lowercase")]
965pub enum ExportFormat {
966    /// JSON format
967    Json,
968    /// YAML format
969    Yaml,
970}
971
972/// Export mocks in specified format
973async fn export_mocks(
974    State(state): State<ManagementState>,
975    Query(params): Query<std::collections::HashMap<String, String>>,
976) -> Result<(StatusCode, String), StatusCode> {
977    let mocks = state.mocks.read().await;
978
979    let format = params
980        .get("format")
981        .map(|f| match f.as_str() {
982            "yaml" | "yml" => ExportFormat::Yaml,
983            _ => ExportFormat::Json,
984        })
985        .unwrap_or(ExportFormat::Json);
986
987    match format {
988        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
989            .map(|json| (StatusCode::OK, json))
990            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
991        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
992            .map(|yaml| (StatusCode::OK, yaml))
993            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
994    }
995}
996
997/// Import mocks from JSON/YAML
998async fn import_mocks(
999    State(state): State<ManagementState>,
1000    Json(mocks): Json<Vec<MockConfig>>,
1001) -> impl IntoResponse {
1002    let mut current_mocks = state.mocks.write().await;
1003    current_mocks.clear();
1004    current_mocks.extend(mocks);
1005    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1006}
1007
1008#[cfg(feature = "smtp")]
1009/// List SMTP emails in mailbox
1010async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1011    if let Some(ref smtp_registry) = state.smtp_registry {
1012        match smtp_registry.get_emails() {
1013            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1014            Err(e) => (
1015                StatusCode::INTERNAL_SERVER_ERROR,
1016                Json(serde_json::json!({
1017                    "error": "Failed to retrieve emails",
1018                    "message": e.to_string()
1019                })),
1020            ),
1021        }
1022    } else {
1023        (
1024            StatusCode::NOT_IMPLEMENTED,
1025            Json(serde_json::json!({
1026                "error": "SMTP mailbox management not available",
1027                "message": "SMTP server is not enabled or registry not available."
1028            })),
1029        )
1030    }
1031}
1032
1033/// Get specific SMTP email
1034#[cfg(feature = "smtp")]
1035async fn get_smtp_email(
1036    State(state): State<ManagementState>,
1037    Path(id): Path<String>,
1038) -> impl IntoResponse {
1039    if let Some(ref smtp_registry) = state.smtp_registry {
1040        match smtp_registry.get_email_by_id(&id) {
1041            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1042            Ok(None) => (
1043                StatusCode::NOT_FOUND,
1044                Json(serde_json::json!({
1045                    "error": "Email not found",
1046                    "id": id
1047                })),
1048            ),
1049            Err(e) => (
1050                StatusCode::INTERNAL_SERVER_ERROR,
1051                Json(serde_json::json!({
1052                    "error": "Failed to retrieve email",
1053                    "message": e.to_string()
1054                })),
1055            ),
1056        }
1057    } else {
1058        (
1059            StatusCode::NOT_IMPLEMENTED,
1060            Json(serde_json::json!({
1061                "error": "SMTP mailbox management not available",
1062                "message": "SMTP server is not enabled or registry not available."
1063            })),
1064        )
1065    }
1066}
1067
1068/// Clear SMTP mailbox
1069#[cfg(feature = "smtp")]
1070async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1071    if let Some(ref smtp_registry) = state.smtp_registry {
1072        match smtp_registry.clear_mailbox() {
1073            Ok(()) => (
1074                StatusCode::OK,
1075                Json(serde_json::json!({
1076                    "message": "Mailbox cleared successfully"
1077                })),
1078            ),
1079            Err(e) => (
1080                StatusCode::INTERNAL_SERVER_ERROR,
1081                Json(serde_json::json!({
1082                    "error": "Failed to clear mailbox",
1083                    "message": e.to_string()
1084                })),
1085            ),
1086        }
1087    } else {
1088        (
1089            StatusCode::NOT_IMPLEMENTED,
1090            Json(serde_json::json!({
1091                "error": "SMTP mailbox management not available",
1092                "message": "SMTP server is not enabled or registry not available."
1093            })),
1094        )
1095    }
1096}
1097
1098/// Export SMTP mailbox
1099#[cfg(feature = "smtp")]
1100async fn export_smtp_mailbox(
1101    Query(params): Query<std::collections::HashMap<String, String>>,
1102) -> impl IntoResponse {
1103    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1104    (
1105        StatusCode::NOT_IMPLEMENTED,
1106        Json(serde_json::json!({
1107            "error": "SMTP mailbox management not available via HTTP API",
1108            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1109            "requested_format": format
1110        })),
1111    )
1112}
1113
1114/// Search SMTP emails
1115#[cfg(feature = "smtp")]
1116async fn search_smtp_emails(
1117    State(state): State<ManagementState>,
1118    Query(params): Query<std::collections::HashMap<String, String>>,
1119) -> impl IntoResponse {
1120    if let Some(ref smtp_registry) = state.smtp_registry {
1121        let filters = EmailSearchFilters {
1122            sender: params.get("sender").cloned(),
1123            recipient: params.get("recipient").cloned(),
1124            subject: params.get("subject").cloned(),
1125            body: params.get("body").cloned(),
1126            since: params
1127                .get("since")
1128                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1129                .map(|dt| dt.with_timezone(&chrono::Utc)),
1130            until: params
1131                .get("until")
1132                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1133                .map(|dt| dt.with_timezone(&chrono::Utc)),
1134            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1135            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1136        };
1137
1138        match smtp_registry.search_emails(filters) {
1139            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1140            Err(e) => (
1141                StatusCode::INTERNAL_SERVER_ERROR,
1142                Json(serde_json::json!({
1143                    "error": "Failed to search emails",
1144                    "message": e.to_string()
1145                })),
1146            ),
1147        }
1148    } else {
1149        (
1150            StatusCode::NOT_IMPLEMENTED,
1151            Json(serde_json::json!({
1152                "error": "SMTP mailbox management not available",
1153                "message": "SMTP server is not enabled or registry not available."
1154            })),
1155        )
1156    }
1157}
1158
1159/// MQTT broker statistics
1160#[cfg(feature = "mqtt")]
1161#[derive(Debug, Clone, Serialize, Deserialize)]
1162pub struct MqttBrokerStats {
1163    /// Number of connected MQTT clients
1164    pub connected_clients: usize,
1165    /// Number of active MQTT topics
1166    pub active_topics: usize,
1167    /// Number of retained messages
1168    pub retained_messages: usize,
1169    /// Total number of subscriptions
1170    pub total_subscriptions: usize,
1171}
1172
1173/// MQTT management handlers
1174#[cfg(feature = "mqtt")]
1175async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1176    if let Some(broker) = &state.mqtt_broker {
1177        let connected_clients = broker.get_connected_clients().await.len();
1178        let active_topics = broker.get_active_topics().await.len();
1179        let stats = broker.get_topic_stats().await;
1180
1181        let broker_stats = MqttBrokerStats {
1182            connected_clients,
1183            active_topics,
1184            retained_messages: stats.retained_messages,
1185            total_subscriptions: stats.total_subscriptions,
1186        };
1187
1188        Json(broker_stats).into_response()
1189    } else {
1190        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191    }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1196    if let Some(broker) = &state.mqtt_broker {
1197        let clients = broker.get_connected_clients().await;
1198        Json(serde_json::json!({
1199            "clients": clients
1200        }))
1201        .into_response()
1202    } else {
1203        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204    }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1209    if let Some(broker) = &state.mqtt_broker {
1210        let topics = broker.get_active_topics().await;
1211        Json(serde_json::json!({
1212            "topics": topics
1213        }))
1214        .into_response()
1215    } else {
1216        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1217    }
1218}
1219
1220#[cfg(feature = "mqtt")]
1221async fn disconnect_mqtt_client(
1222    State(state): State<ManagementState>,
1223    Path(client_id): Path<String>,
1224) -> impl IntoResponse {
1225    if let Some(broker) = &state.mqtt_broker {
1226        match broker.disconnect_client(&client_id).await {
1227            Ok(_) => {
1228                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1229            }
1230            Err(e) => {
1231                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1232                    .into_response()
1233            }
1234        }
1235    } else {
1236        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1237    }
1238}
1239
1240// ========== MQTT Publish Handler ==========
1241
1242#[cfg(feature = "mqtt")]
1243/// Request to publish a single MQTT message
1244#[derive(Debug, Deserialize)]
1245pub struct MqttPublishRequest {
1246    /// Topic to publish to
1247    pub topic: String,
1248    /// Message payload (string or JSON)
1249    pub payload: String,
1250    /// QoS level (0, 1, or 2)
1251    #[serde(default = "default_qos")]
1252    pub qos: u8,
1253    /// Whether to retain the message
1254    #[serde(default)]
1255    pub retain: bool,
1256}
1257
1258#[cfg(feature = "mqtt")]
1259fn default_qos() -> u8 {
1260    0
1261}
1262
1263#[cfg(feature = "mqtt")]
1264/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1265async fn publish_mqtt_message_handler(
1266    State(state): State<ManagementState>,
1267    Json(request): Json<serde_json::Value>,
1268) -> impl IntoResponse {
1269    // Extract fields from JSON manually
1270    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1271    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1272    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1273    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1274
1275    if topic.is_none() || payload.is_none() {
1276        return (
1277            StatusCode::BAD_REQUEST,
1278            Json(serde_json::json!({
1279                "error": "Invalid request",
1280                "message": "Missing required fields: topic and payload"
1281            })),
1282        );
1283    }
1284
1285    let topic = topic.unwrap();
1286    let payload = payload.unwrap();
1287
1288    if let Some(broker) = &state.mqtt_broker {
1289        // Validate QoS
1290        if qos > 2 {
1291            return (
1292                StatusCode::BAD_REQUEST,
1293                Json(serde_json::json!({
1294                    "error": "Invalid QoS",
1295                    "message": "QoS must be 0, 1, or 2"
1296                })),
1297            );
1298        }
1299
1300        // Convert payload to bytes
1301        let payload_bytes = payload.as_bytes().to_vec();
1302        let client_id = "mockforge-management-api".to_string();
1303
1304        let publish_result = broker
1305            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1306            .await
1307            .map_err(|e| format!("{}", e));
1308
1309        match publish_result {
1310            Ok(_) => {
1311                // Emit message event for real-time monitoring
1312                let event = MessageEvent::Mqtt(MqttMessageEvent {
1313                    topic: topic.clone(),
1314                    payload: payload.clone(),
1315                    qos,
1316                    retain,
1317                    timestamp: chrono::Utc::now().to_rfc3339(),
1318                });
1319                let _ = state.message_events.send(event);
1320
1321                (
1322                    StatusCode::OK,
1323                    Json(serde_json::json!({
1324                        "success": true,
1325                        "message": format!("Message published to topic '{}'", topic),
1326                        "topic": topic,
1327                        "qos": qos,
1328                        "retain": retain
1329                    })),
1330                )
1331            }
1332            Err(error_msg) => (
1333                StatusCode::INTERNAL_SERVER_ERROR,
1334                Json(serde_json::json!({
1335                    "error": "Failed to publish message",
1336                    "message": error_msg
1337                })),
1338            ),
1339        }
1340    } else {
1341        (
1342            StatusCode::SERVICE_UNAVAILABLE,
1343            Json(serde_json::json!({
1344                "error": "MQTT broker not available",
1345                "message": "MQTT broker is not enabled or not available."
1346            })),
1347        )
1348    }
1349}
1350
1351#[cfg(not(feature = "mqtt"))]
1352/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1353async fn publish_mqtt_message_handler(
1354    State(_state): State<ManagementState>,
1355    Json(_request): Json<serde_json::Value>,
1356) -> impl IntoResponse {
1357    (
1358        StatusCode::SERVICE_UNAVAILABLE,
1359        Json(serde_json::json!({
1360            "error": "MQTT feature not enabled",
1361            "message": "MQTT support is not compiled into this build"
1362        })),
1363    )
1364}
1365
1366#[cfg(feature = "mqtt")]
1367/// Request to publish multiple MQTT messages
1368#[derive(Debug, Deserialize)]
1369pub struct MqttBatchPublishRequest {
1370    /// List of messages to publish
1371    pub messages: Vec<MqttPublishRequest>,
1372    /// Delay between messages in milliseconds
1373    #[serde(default = "default_delay")]
1374    pub delay_ms: u64,
1375}
1376
1377#[cfg(feature = "mqtt")]
1378fn default_delay() -> u64 {
1379    100
1380}
1381
1382#[cfg(feature = "mqtt")]
1383/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1384async fn publish_mqtt_batch_handler(
1385    State(state): State<ManagementState>,
1386    Json(request): Json<serde_json::Value>,
1387) -> impl IntoResponse {
1388    // Extract fields from JSON manually
1389    let messages_json = request.get("messages").and_then(|v| v.as_array());
1390    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1391
1392    if messages_json.is_none() {
1393        return (
1394            StatusCode::BAD_REQUEST,
1395            Json(serde_json::json!({
1396                "error": "Invalid request",
1397                "message": "Missing required field: messages"
1398            })),
1399        );
1400    }
1401
1402    let messages_json = messages_json.unwrap();
1403
1404    if let Some(broker) = &state.mqtt_broker {
1405        if messages_json.is_empty() {
1406            return (
1407                StatusCode::BAD_REQUEST,
1408                Json(serde_json::json!({
1409                    "error": "Empty batch",
1410                    "message": "At least one message is required"
1411                })),
1412            );
1413        }
1414
1415        let mut results = Vec::new();
1416        let client_id = "mockforge-management-api".to_string();
1417
1418        for (index, msg_json) in messages_json.iter().enumerate() {
1419            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1420            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1421            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1422            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1423
1424            if topic.is_none() || payload.is_none() {
1425                results.push(serde_json::json!({
1426                    "index": index,
1427                    "success": false,
1428                    "error": "Missing required fields: topic and payload"
1429                }));
1430                continue;
1431            }
1432
1433            let topic = topic.unwrap();
1434            let payload = payload.unwrap();
1435
1436            // Validate QoS
1437            if qos > 2 {
1438                results.push(serde_json::json!({
1439                    "index": index,
1440                    "success": false,
1441                    "error": "Invalid QoS (must be 0, 1, or 2)"
1442                }));
1443                continue;
1444            }
1445
1446            // Convert payload to bytes
1447            let payload_bytes = payload.as_bytes().to_vec();
1448
1449            let publish_result = broker
1450                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1451                .await
1452                .map_err(|e| format!("{}", e));
1453
1454            match publish_result {
1455                Ok(_) => {
1456                    // Emit message event
1457                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1458                        topic: topic.clone(),
1459                        payload: payload.clone(),
1460                        qos,
1461                        retain,
1462                        timestamp: chrono::Utc::now().to_rfc3339(),
1463                    });
1464                    let _ = state.message_events.send(event);
1465
1466                    results.push(serde_json::json!({
1467                        "index": index,
1468                        "success": true,
1469                        "topic": topic,
1470                        "qos": qos
1471                    }));
1472                }
1473                Err(error_msg) => {
1474                    results.push(serde_json::json!({
1475                        "index": index,
1476                        "success": false,
1477                        "error": error_msg
1478                    }));
1479                }
1480            }
1481
1482            // Add delay between messages (except for the last one)
1483            if index < messages_json.len() - 1 && delay_ms > 0 {
1484                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1485            }
1486        }
1487
1488        let success_count =
1489            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1490
1491        (
1492            StatusCode::OK,
1493            Json(serde_json::json!({
1494                "success": true,
1495                "total": messages_json.len(),
1496                "succeeded": success_count,
1497                "failed": messages_json.len() - success_count,
1498                "results": results
1499            })),
1500        )
1501    } else {
1502        (
1503            StatusCode::SERVICE_UNAVAILABLE,
1504            Json(serde_json::json!({
1505                "error": "MQTT broker not available",
1506                "message": "MQTT broker is not enabled or not available."
1507            })),
1508        )
1509    }
1510}
1511
1512#[cfg(not(feature = "mqtt"))]
1513/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1514async fn publish_mqtt_batch_handler(
1515    State(_state): State<ManagementState>,
1516    Json(_request): Json<serde_json::Value>,
1517) -> impl IntoResponse {
1518    (
1519        StatusCode::SERVICE_UNAVAILABLE,
1520        Json(serde_json::json!({
1521            "error": "MQTT feature not enabled",
1522            "message": "MQTT support is not compiled into this build"
1523        })),
1524    )
1525}
1526
1527// Migration pipeline handlers
1528
1529/// Request to set migration mode
1530#[derive(Debug, Deserialize)]
1531struct SetMigrationModeRequest {
1532    mode: String,
1533}
1534
1535/// Get all migration routes
1536async fn get_migration_routes(
1537    State(state): State<ManagementState>,
1538) -> Result<Json<serde_json::Value>, StatusCode> {
1539    let proxy_config = match &state.proxy_config {
1540        Some(config) => config,
1541        None => {
1542            return Ok(Json(serde_json::json!({
1543                "error": "Migration not configured. Proxy config not available."
1544            })));
1545        }
1546    };
1547
1548    let config = proxy_config.read().await;
1549    let routes = config.get_migration_routes();
1550
1551    Ok(Json(serde_json::json!({
1552        "routes": routes
1553    })))
1554}
1555
1556/// Toggle a route's migration mode
1557async fn toggle_route_migration(
1558    State(state): State<ManagementState>,
1559    Path(pattern): Path<String>,
1560) -> Result<Json<serde_json::Value>, StatusCode> {
1561    let proxy_config = match &state.proxy_config {
1562        Some(config) => config,
1563        None => {
1564            return Ok(Json(serde_json::json!({
1565                "error": "Migration not configured. Proxy config not available."
1566            })));
1567        }
1568    };
1569
1570    let mut config = proxy_config.write().await;
1571    let new_mode = match config.toggle_route_migration(&pattern) {
1572        Some(mode) => mode,
1573        None => {
1574            return Ok(Json(serde_json::json!({
1575                "error": format!("Route pattern not found: {}", pattern)
1576            })));
1577        }
1578    };
1579
1580    Ok(Json(serde_json::json!({
1581        "pattern": pattern,
1582        "mode": format!("{:?}", new_mode).to_lowercase()
1583    })))
1584}
1585
1586/// Set a route's migration mode explicitly
1587async fn set_route_migration_mode(
1588    State(state): State<ManagementState>,
1589    Path(pattern): Path<String>,
1590    Json(request): Json<SetMigrationModeRequest>,
1591) -> Result<Json<serde_json::Value>, StatusCode> {
1592    let proxy_config = match &state.proxy_config {
1593        Some(config) => config,
1594        None => {
1595            return Ok(Json(serde_json::json!({
1596                "error": "Migration not configured. Proxy config not available."
1597            })));
1598        }
1599    };
1600
1601    use mockforge_core::proxy::config::MigrationMode;
1602    let mode = match request.mode.to_lowercase().as_str() {
1603        "mock" => MigrationMode::Mock,
1604        "shadow" => MigrationMode::Shadow,
1605        "real" => MigrationMode::Real,
1606        "auto" => MigrationMode::Auto,
1607        _ => {
1608            return Ok(Json(serde_json::json!({
1609                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1610            })));
1611        }
1612    };
1613
1614    let mut config = proxy_config.write().await;
1615    let updated = config.update_rule_migration_mode(&pattern, mode);
1616
1617    if !updated {
1618        return Ok(Json(serde_json::json!({
1619            "error": format!("Route pattern not found: {}", pattern)
1620        })));
1621    }
1622
1623    Ok(Json(serde_json::json!({
1624        "pattern": pattern,
1625        "mode": format!("{:?}", mode).to_lowercase()
1626    })))
1627}
1628
1629/// Toggle a group's migration mode
1630async fn toggle_group_migration(
1631    State(state): State<ManagementState>,
1632    Path(group): Path<String>,
1633) -> Result<Json<serde_json::Value>, StatusCode> {
1634    let proxy_config = match &state.proxy_config {
1635        Some(config) => config,
1636        None => {
1637            return Ok(Json(serde_json::json!({
1638                "error": "Migration not configured. Proxy config not available."
1639            })));
1640        }
1641    };
1642
1643    let mut config = proxy_config.write().await;
1644    let new_mode = config.toggle_group_migration(&group);
1645
1646    Ok(Json(serde_json::json!({
1647        "group": group,
1648        "mode": format!("{:?}", new_mode).to_lowercase()
1649    })))
1650}
1651
1652/// Set a group's migration mode explicitly
1653async fn set_group_migration_mode(
1654    State(state): State<ManagementState>,
1655    Path(group): Path<String>,
1656    Json(request): Json<SetMigrationModeRequest>,
1657) -> Result<Json<serde_json::Value>, StatusCode> {
1658    let proxy_config = match &state.proxy_config {
1659        Some(config) => config,
1660        None => {
1661            return Ok(Json(serde_json::json!({
1662                "error": "Migration not configured. Proxy config not available."
1663            })));
1664        }
1665    };
1666
1667    use mockforge_core::proxy::config::MigrationMode;
1668    let mode = match request.mode.to_lowercase().as_str() {
1669        "mock" => MigrationMode::Mock,
1670        "shadow" => MigrationMode::Shadow,
1671        "real" => MigrationMode::Real,
1672        "auto" => MigrationMode::Auto,
1673        _ => {
1674            return Ok(Json(serde_json::json!({
1675                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1676            })));
1677        }
1678    };
1679
1680    let mut config = proxy_config.write().await;
1681    config.update_group_migration_mode(&group, mode);
1682
1683    Ok(Json(serde_json::json!({
1684        "group": group,
1685        "mode": format!("{:?}", mode).to_lowercase()
1686    })))
1687}
1688
1689/// Get all migration groups
1690async fn get_migration_groups(
1691    State(state): State<ManagementState>,
1692) -> Result<Json<serde_json::Value>, StatusCode> {
1693    let proxy_config = match &state.proxy_config {
1694        Some(config) => config,
1695        None => {
1696            return Ok(Json(serde_json::json!({
1697                "error": "Migration not configured. Proxy config not available."
1698            })));
1699        }
1700    };
1701
1702    let config = proxy_config.read().await;
1703    let groups = config.get_migration_groups();
1704
1705    // Convert to JSON-serializable format
1706    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1707        .into_iter()
1708        .map(|(name, info)| {
1709            (
1710                name,
1711                serde_json::json!({
1712                    "name": info.name,
1713                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1714                    "route_count": info.route_count
1715                }),
1716            )
1717        })
1718        .collect();
1719
1720    Ok(Json(serde_json::json!(groups_json)))
1721}
1722
1723/// Get overall migration status
1724async fn get_migration_status(
1725    State(state): State<ManagementState>,
1726) -> Result<Json<serde_json::Value>, StatusCode> {
1727    let proxy_config = match &state.proxy_config {
1728        Some(config) => config,
1729        None => {
1730            return Ok(Json(serde_json::json!({
1731                "error": "Migration not configured. Proxy config not available."
1732            })));
1733        }
1734    };
1735
1736    let config = proxy_config.read().await;
1737    let routes = config.get_migration_routes();
1738    let groups = config.get_migration_groups();
1739
1740    let mut mock_count = 0;
1741    let mut shadow_count = 0;
1742    let mut real_count = 0;
1743    let mut auto_count = 0;
1744
1745    for route in &routes {
1746        match route.migration_mode {
1747            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1748            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1749            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1750            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1751        }
1752    }
1753
1754    Ok(Json(serde_json::json!({
1755        "total_routes": routes.len(),
1756        "mock_routes": mock_count,
1757        "shadow_routes": shadow_count,
1758        "real_routes": real_count,
1759        "auto_routes": auto_count,
1760        "total_groups": groups.len(),
1761        "migration_enabled": config.migration_enabled
1762    })))
1763}
1764
1765// ========== Proxy Replacement Rules Management ==========
1766
1767/// Request body for creating/updating proxy replacement rules
1768#[derive(Debug, Deserialize, Serialize)]
1769pub struct ProxyRuleRequest {
1770    /// URL pattern to match (supports wildcards like "/api/users/*")
1771    pub pattern: String,
1772    /// Rule type: "request" or "response"
1773    #[serde(rename = "type")]
1774    pub rule_type: String,
1775    /// Optional status code filter for response rules
1776    #[serde(default)]
1777    pub status_codes: Vec<u16>,
1778    /// Body transformations to apply
1779    pub body_transforms: Vec<BodyTransformRequest>,
1780    /// Whether this rule is enabled
1781    #[serde(default = "default_true")]
1782    pub enabled: bool,
1783}
1784
1785/// Request body for individual body transformations
1786#[derive(Debug, Deserialize, Serialize)]
1787pub struct BodyTransformRequest {
1788    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1789    pub path: String,
1790    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1791    pub replace: String,
1792    /// Operation to perform: "replace", "add", or "remove"
1793    #[serde(default)]
1794    pub operation: String,
1795}
1796
1797/// Response format for proxy rules
1798#[derive(Debug, Serialize)]
1799pub struct ProxyRuleResponse {
1800    /// Rule ID (index in the array)
1801    pub id: usize,
1802    /// URL pattern
1803    pub pattern: String,
1804    /// Rule type
1805    #[serde(rename = "type")]
1806    pub rule_type: String,
1807    /// Status codes (for response rules)
1808    pub status_codes: Vec<u16>,
1809    /// Body transformations
1810    pub body_transforms: Vec<BodyTransformRequest>,
1811    /// Whether enabled
1812    pub enabled: bool,
1813}
1814
1815/// List all proxy replacement rules
1816async fn list_proxy_rules(
1817    State(state): State<ManagementState>,
1818) -> Result<Json<serde_json::Value>, StatusCode> {
1819    let proxy_config = match &state.proxy_config {
1820        Some(config) => config,
1821        None => {
1822            return Ok(Json(serde_json::json!({
1823                "error": "Proxy not configured. Proxy config not available."
1824            })));
1825        }
1826    };
1827
1828    let config = proxy_config.read().await;
1829
1830    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1831
1832    // Add request replacement rules
1833    for (idx, rule) in config.request_replacements.iter().enumerate() {
1834        rules.push(ProxyRuleResponse {
1835            id: idx,
1836            pattern: rule.pattern.clone(),
1837            rule_type: "request".to_string(),
1838            status_codes: Vec::new(),
1839            body_transforms: rule
1840                .body_transforms
1841                .iter()
1842                .map(|t| BodyTransformRequest {
1843                    path: t.path.clone(),
1844                    replace: t.replace.clone(),
1845                    operation: format!("{:?}", t.operation).to_lowercase(),
1846                })
1847                .collect(),
1848            enabled: rule.enabled,
1849        });
1850    }
1851
1852    // Add response replacement rules
1853    let request_count = config.request_replacements.len();
1854    for (idx, rule) in config.response_replacements.iter().enumerate() {
1855        rules.push(ProxyRuleResponse {
1856            id: request_count + idx,
1857            pattern: rule.pattern.clone(),
1858            rule_type: "response".to_string(),
1859            status_codes: rule.status_codes.clone(),
1860            body_transforms: rule
1861                .body_transforms
1862                .iter()
1863                .map(|t| BodyTransformRequest {
1864                    path: t.path.clone(),
1865                    replace: t.replace.clone(),
1866                    operation: format!("{:?}", t.operation).to_lowercase(),
1867                })
1868                .collect(),
1869            enabled: rule.enabled,
1870        });
1871    }
1872
1873    Ok(Json(serde_json::json!({
1874        "rules": rules
1875    })))
1876}
1877
1878/// Create a new proxy replacement rule
1879async fn create_proxy_rule(
1880    State(state): State<ManagementState>,
1881    Json(request): Json<ProxyRuleRequest>,
1882) -> Result<Json<serde_json::Value>, StatusCode> {
1883    let proxy_config = match &state.proxy_config {
1884        Some(config) => config,
1885        None => {
1886            return Ok(Json(serde_json::json!({
1887                "error": "Proxy not configured. Proxy config not available."
1888            })));
1889        }
1890    };
1891
1892    // Validate request
1893    if request.body_transforms.is_empty() {
1894        return Ok(Json(serde_json::json!({
1895            "error": "At least one body transform is required"
1896        })));
1897    }
1898
1899    let body_transforms: Vec<BodyTransform> = request
1900        .body_transforms
1901        .iter()
1902        .map(|t| {
1903            let op = match t.operation.as_str() {
1904                "replace" => TransformOperation::Replace,
1905                "add" => TransformOperation::Add,
1906                "remove" => TransformOperation::Remove,
1907                _ => TransformOperation::Replace,
1908            };
1909            BodyTransform {
1910                path: t.path.clone(),
1911                replace: t.replace.clone(),
1912                operation: op,
1913            }
1914        })
1915        .collect();
1916
1917    let new_rule = BodyTransformRule {
1918        pattern: request.pattern.clone(),
1919        status_codes: request.status_codes.clone(),
1920        body_transforms,
1921        enabled: request.enabled,
1922    };
1923
1924    let mut config = proxy_config.write().await;
1925
1926    let rule_id = if request.rule_type == "request" {
1927        config.request_replacements.push(new_rule);
1928        config.request_replacements.len() - 1
1929    } else if request.rule_type == "response" {
1930        config.response_replacements.push(new_rule);
1931        config.request_replacements.len() + config.response_replacements.len() - 1
1932    } else {
1933        return Ok(Json(serde_json::json!({
1934            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1935        })));
1936    };
1937
1938    Ok(Json(serde_json::json!({
1939        "id": rule_id,
1940        "message": "Rule created successfully"
1941    })))
1942}
1943
1944/// Get a specific proxy replacement rule
1945async fn get_proxy_rule(
1946    State(state): State<ManagementState>,
1947    Path(id): Path<String>,
1948) -> Result<Json<serde_json::Value>, StatusCode> {
1949    let proxy_config = match &state.proxy_config {
1950        Some(config) => config,
1951        None => {
1952            return Ok(Json(serde_json::json!({
1953                "error": "Proxy not configured. Proxy config not available."
1954            })));
1955        }
1956    };
1957
1958    let config = proxy_config.read().await;
1959    let rule_id: usize = match id.parse() {
1960        Ok(id) => id,
1961        Err(_) => {
1962            return Ok(Json(serde_json::json!({
1963                "error": format!("Invalid rule ID: {}", id)
1964            })));
1965        }
1966    };
1967
1968    let request_count = config.request_replacements.len();
1969
1970    if rule_id < request_count {
1971        // Request rule
1972        let rule = &config.request_replacements[rule_id];
1973        Ok(Json(serde_json::json!({
1974            "id": rule_id,
1975            "pattern": rule.pattern,
1976            "type": "request",
1977            "status_codes": [],
1978            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1979                "path": t.path,
1980                "replace": t.replace,
1981                "operation": format!("{:?}", t.operation).to_lowercase()
1982            })).collect::<Vec<_>>(),
1983            "enabled": rule.enabled
1984        })))
1985    } else if rule_id < request_count + config.response_replacements.len() {
1986        // Response rule
1987        let response_idx = rule_id - request_count;
1988        let rule = &config.response_replacements[response_idx];
1989        Ok(Json(serde_json::json!({
1990            "id": rule_id,
1991            "pattern": rule.pattern,
1992            "type": "response",
1993            "status_codes": rule.status_codes,
1994            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1995                "path": t.path,
1996                "replace": t.replace,
1997                "operation": format!("{:?}", t.operation).to_lowercase()
1998            })).collect::<Vec<_>>(),
1999            "enabled": rule.enabled
2000        })))
2001    } else {
2002        Ok(Json(serde_json::json!({
2003            "error": format!("Rule ID {} not found", rule_id)
2004        })))
2005    }
2006}
2007
2008/// Update a proxy replacement rule
2009async fn update_proxy_rule(
2010    State(state): State<ManagementState>,
2011    Path(id): Path<String>,
2012    Json(request): Json<ProxyRuleRequest>,
2013) -> Result<Json<serde_json::Value>, StatusCode> {
2014    let proxy_config = match &state.proxy_config {
2015        Some(config) => config,
2016        None => {
2017            return Ok(Json(serde_json::json!({
2018                "error": "Proxy not configured. Proxy config not available."
2019            })));
2020        }
2021    };
2022
2023    let mut config = proxy_config.write().await;
2024    let rule_id: usize = match id.parse() {
2025        Ok(id) => id,
2026        Err(_) => {
2027            return Ok(Json(serde_json::json!({
2028                "error": format!("Invalid rule ID: {}", id)
2029            })));
2030        }
2031    };
2032
2033    let body_transforms: Vec<BodyTransform> = request
2034        .body_transforms
2035        .iter()
2036        .map(|t| {
2037            let op = match t.operation.as_str() {
2038                "replace" => TransformOperation::Replace,
2039                "add" => TransformOperation::Add,
2040                "remove" => TransformOperation::Remove,
2041                _ => TransformOperation::Replace,
2042            };
2043            BodyTransform {
2044                path: t.path.clone(),
2045                replace: t.replace.clone(),
2046                operation: op,
2047            }
2048        })
2049        .collect();
2050
2051    let updated_rule = BodyTransformRule {
2052        pattern: request.pattern.clone(),
2053        status_codes: request.status_codes.clone(),
2054        body_transforms,
2055        enabled: request.enabled,
2056    };
2057
2058    let request_count = config.request_replacements.len();
2059
2060    if rule_id < request_count {
2061        // Update request rule
2062        config.request_replacements[rule_id] = updated_rule;
2063    } else if rule_id < request_count + config.response_replacements.len() {
2064        // Update response rule
2065        let response_idx = rule_id - request_count;
2066        config.response_replacements[response_idx] = updated_rule;
2067    } else {
2068        return Ok(Json(serde_json::json!({
2069            "error": format!("Rule ID {} not found", rule_id)
2070        })));
2071    }
2072
2073    Ok(Json(serde_json::json!({
2074        "id": rule_id,
2075        "message": "Rule updated successfully"
2076    })))
2077}
2078
2079/// Delete a proxy replacement rule
2080async fn delete_proxy_rule(
2081    State(state): State<ManagementState>,
2082    Path(id): Path<String>,
2083) -> Result<Json<serde_json::Value>, StatusCode> {
2084    let proxy_config = match &state.proxy_config {
2085        Some(config) => config,
2086        None => {
2087            return Ok(Json(serde_json::json!({
2088                "error": "Proxy not configured. Proxy config not available."
2089            })));
2090        }
2091    };
2092
2093    let mut config = proxy_config.write().await;
2094    let rule_id: usize = match id.parse() {
2095        Ok(id) => id,
2096        Err(_) => {
2097            return Ok(Json(serde_json::json!({
2098                "error": format!("Invalid rule ID: {}", id)
2099            })));
2100        }
2101    };
2102
2103    let request_count = config.request_replacements.len();
2104
2105    if rule_id < request_count {
2106        // Delete request rule
2107        config.request_replacements.remove(rule_id);
2108    } else if rule_id < request_count + config.response_replacements.len() {
2109        // Delete response rule
2110        let response_idx = rule_id - request_count;
2111        config.response_replacements.remove(response_idx);
2112    } else {
2113        return Ok(Json(serde_json::json!({
2114            "error": format!("Rule ID {} not found", rule_id)
2115        })));
2116    }
2117
2118    Ok(Json(serde_json::json!({
2119        "id": rule_id,
2120        "message": "Rule deleted successfully"
2121    })))
2122}
2123
2124/// Get recent intercepted requests/responses for inspection
2125/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2126async fn get_proxy_inspect(
2127    State(_state): State<ManagementState>,
2128    Query(params): Query<std::collections::HashMap<String, String>>,
2129) -> Result<Json<serde_json::Value>, StatusCode> {
2130    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2131
2132    // Note: Request/response inspection would require:
2133    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2134    // 2. Integrating with proxy middleware to capture traffic
2135    // 3. Implementing filtering and pagination for large volumes of traffic
2136    // For now, return an empty response structure indicating the feature is not yet implemented
2137    Ok(Json(serde_json::json!({
2138        "requests": [],
2139        "responses": [],
2140        "limit": limit,
2141        "total": 0,
2142        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2143    })))
2144}
2145
2146/// Build the management API router
2147pub fn management_router(state: ManagementState) -> Router {
2148    let router = Router::new()
2149        .route("/health", get(health_check))
2150        .route("/stats", get(get_stats))
2151        .route("/config", get(get_config))
2152        .route("/config/validate", post(validate_config))
2153        .route("/config/bulk", post(bulk_update_config))
2154        .route("/mocks", get(list_mocks))
2155        .route("/mocks", post(create_mock))
2156        .route("/mocks/{id}", get(get_mock))
2157        .route("/mocks/{id}", put(update_mock))
2158        .route("/mocks/{id}", delete(delete_mock))
2159        .route("/export", get(export_mocks))
2160        .route("/import", post(import_mocks));
2161
2162    #[cfg(feature = "smtp")]
2163    let router = router
2164        .route("/smtp/mailbox", get(list_smtp_emails))
2165        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2166        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2167        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2168        .route("/smtp/mailbox/search", get(search_smtp_emails));
2169
2170    #[cfg(not(feature = "smtp"))]
2171    let router = router;
2172
2173    // MQTT routes
2174    #[cfg(feature = "mqtt")]
2175    let router = router
2176        .route("/mqtt/stats", get(get_mqtt_stats))
2177        .route("/mqtt/clients", get(get_mqtt_clients))
2178        .route("/mqtt/topics", get(get_mqtt_topics))
2179        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2180        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2181        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2182        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2183
2184    #[cfg(not(feature = "mqtt"))]
2185    let router = router
2186        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2187        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2188
2189    #[cfg(feature = "kafka")]
2190    let router = router
2191        .route("/kafka/stats", get(get_kafka_stats))
2192        .route("/kafka/topics", get(get_kafka_topics))
2193        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2194        .route("/kafka/groups", get(get_kafka_groups))
2195        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2196        .route("/kafka/produce", post(produce_kafka_message))
2197        .route("/kafka/produce/batch", post(produce_kafka_batch))
2198        .route("/kafka/messages/stream", get(kafka_messages_stream));
2199
2200    #[cfg(not(feature = "kafka"))]
2201    let router = router;
2202
2203    // Migration pipeline routes
2204    let router = router
2205        .route("/migration/routes", get(get_migration_routes))
2206        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2207        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2208        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2209        .route("/migration/groups/{group}", put(set_group_migration_mode))
2210        .route("/migration/groups", get(get_migration_groups))
2211        .route("/migration/status", get(get_migration_status));
2212
2213    // Proxy replacement rules routes
2214    let router = router
2215        .route("/proxy/rules", get(list_proxy_rules))
2216        .route("/proxy/rules", post(create_proxy_rule))
2217        .route("/proxy/rules/{id}", get(get_proxy_rule))
2218        .route("/proxy/rules/{id}", put(update_proxy_rule))
2219        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2220        .route("/proxy/inspect", get(get_proxy_inspect));
2221
2222    // AI-powered features
2223    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2224
2225    // Snapshot diff endpoints
2226    let router = router.nest(
2227        "/snapshot-diff",
2228        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2229    );
2230
2231    #[cfg(feature = "behavioral-cloning")]
2232    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2233
2234    let router = router
2235        .route("/mockai/learn", post(learn_from_examples))
2236        .route("/mockai/rules/explanations", get(list_rule_explanations))
2237        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2238        .route("/chaos/config", get(get_chaos_config))
2239        .route("/chaos/config", post(update_chaos_config))
2240        .route("/network/profiles", get(list_network_profiles))
2241        .route("/network/profile/apply", post(apply_network_profile));
2242
2243    // State machine API routes
2244    let router =
2245        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2246
2247    router.with_state(state)
2248}
2249
2250#[cfg(feature = "kafka")]
2251#[derive(Debug, Clone, Serialize, Deserialize)]
2252pub struct KafkaBrokerStats {
2253    /// Number of topics
2254    pub topics: usize,
2255    /// Total number of partitions
2256    pub partitions: usize,
2257    /// Number of consumer groups
2258    pub consumer_groups: usize,
2259    /// Total messages produced
2260    pub messages_produced: u64,
2261    /// Total messages consumed
2262    pub messages_consumed: u64,
2263}
2264
2265#[cfg(feature = "kafka")]
2266#[derive(Debug, Clone, Serialize, Deserialize)]
2267pub struct KafkaTopicInfo {
2268    pub name: String,
2269    pub partitions: usize,
2270    pub replication_factor: i32,
2271}
2272
2273#[cfg(feature = "kafka")]
2274#[derive(Debug, Clone, Serialize, Deserialize)]
2275pub struct KafkaConsumerGroupInfo {
2276    pub group_id: String,
2277    pub members: usize,
2278    pub state: String,
2279}
2280
2281#[cfg(feature = "kafka")]
2282/// Get Kafka broker statistics
2283async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2284    if let Some(broker) = &state.kafka_broker {
2285        let topics = broker.topics.read().await;
2286        let consumer_groups = broker.consumer_groups.read().await;
2287
2288        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2289
2290        // Get metrics snapshot for message counts
2291        let metrics_snapshot = broker.metrics().snapshot();
2292
2293        let stats = KafkaBrokerStats {
2294            topics: topics.len(),
2295            partitions: total_partitions,
2296            consumer_groups: consumer_groups.groups().len(),
2297            messages_produced: metrics_snapshot.messages_produced_total,
2298            messages_consumed: metrics_snapshot.messages_consumed_total,
2299        };
2300
2301        Json(stats).into_response()
2302    } else {
2303        (
2304            StatusCode::SERVICE_UNAVAILABLE,
2305            Json(serde_json::json!({
2306                "error": "Kafka broker not available",
2307                "message": "Kafka broker is not enabled or not available."
2308            })),
2309        )
2310            .into_response()
2311    }
2312}
2313
2314#[cfg(feature = "kafka")]
2315/// List Kafka topics
2316async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2317    if let Some(broker) = &state.kafka_broker {
2318        let topics = broker.topics.read().await;
2319        let topic_list: Vec<KafkaTopicInfo> = topics
2320            .iter()
2321            .map(|(name, topic)| KafkaTopicInfo {
2322                name: name.clone(),
2323                partitions: topic.partitions.len(),
2324                replication_factor: topic.config.replication_factor as i32,
2325            })
2326            .collect();
2327
2328        Json(serde_json::json!({
2329            "topics": topic_list
2330        }))
2331        .into_response()
2332    } else {
2333        (
2334            StatusCode::SERVICE_UNAVAILABLE,
2335            Json(serde_json::json!({
2336                "error": "Kafka broker not available",
2337                "message": "Kafka broker is not enabled or not available."
2338            })),
2339        )
2340            .into_response()
2341    }
2342}
2343
2344#[cfg(feature = "kafka")]
2345/// Get Kafka topic details
2346async fn get_kafka_topic(
2347    State(state): State<ManagementState>,
2348    Path(topic_name): Path<String>,
2349) -> impl IntoResponse {
2350    if let Some(broker) = &state.kafka_broker {
2351        let topics = broker.topics.read().await;
2352        if let Some(topic) = topics.get(&topic_name) {
2353            Json(serde_json::json!({
2354                "name": topic_name,
2355                "partitions": topic.partitions.len(),
2356                "replication_factor": topic.config.replication_factor,
2357                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2358                    "id": idx as i32,
2359                    "leader": 0,
2360                    "replicas": vec![0],
2361                    "message_count": partition.messages.len()
2362                })).collect::<Vec<_>>()
2363            })).into_response()
2364        } else {
2365            (
2366                StatusCode::NOT_FOUND,
2367                Json(serde_json::json!({
2368                    "error": "Topic not found",
2369                    "topic": topic_name
2370                })),
2371            )
2372                .into_response()
2373        }
2374    } else {
2375        (
2376            StatusCode::SERVICE_UNAVAILABLE,
2377            Json(serde_json::json!({
2378                "error": "Kafka broker not available",
2379                "message": "Kafka broker is not enabled or not available."
2380            })),
2381        )
2382            .into_response()
2383    }
2384}
2385
2386#[cfg(feature = "kafka")]
2387/// List Kafka consumer groups
2388async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2389    if let Some(broker) = &state.kafka_broker {
2390        let consumer_groups = broker.consumer_groups.read().await;
2391        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2392            .groups()
2393            .iter()
2394            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2395                group_id: group_id.clone(),
2396                members: group.members.len(),
2397                state: "Stable".to_string(), // Simplified - could be more detailed
2398            })
2399            .collect();
2400
2401        Json(serde_json::json!({
2402            "groups": groups
2403        }))
2404        .into_response()
2405    } else {
2406        (
2407            StatusCode::SERVICE_UNAVAILABLE,
2408            Json(serde_json::json!({
2409                "error": "Kafka broker not available",
2410                "message": "Kafka broker is not enabled or not available."
2411            })),
2412        )
2413            .into_response()
2414    }
2415}
2416
2417#[cfg(feature = "kafka")]
2418/// Get Kafka consumer group details
2419async fn get_kafka_group(
2420    State(state): State<ManagementState>,
2421    Path(group_id): Path<String>,
2422) -> impl IntoResponse {
2423    if let Some(broker) = &state.kafka_broker {
2424        let consumer_groups = broker.consumer_groups.read().await;
2425        if let Some(group) = consumer_groups.groups().get(&group_id) {
2426            Json(serde_json::json!({
2427                "group_id": group_id,
2428                "members": group.members.len(),
2429                "state": "Stable",
2430                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2431                    "member_id": member_id,
2432                    "client_id": member.client_id,
2433                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2434                        "topic": a.topic,
2435                        "partitions": a.partitions
2436                    })).collect::<Vec<_>>()
2437                })).collect::<Vec<_>>(),
2438                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2439                    "topic": topic,
2440                    "partition": partition,
2441                    "offset": offset
2442                })).collect::<Vec<_>>()
2443            })).into_response()
2444        } else {
2445            (
2446                StatusCode::NOT_FOUND,
2447                Json(serde_json::json!({
2448                    "error": "Consumer group not found",
2449                    "group_id": group_id
2450                })),
2451            )
2452                .into_response()
2453        }
2454    } else {
2455        (
2456            StatusCode::SERVICE_UNAVAILABLE,
2457            Json(serde_json::json!({
2458                "error": "Kafka broker not available",
2459                "message": "Kafka broker is not enabled or not available."
2460            })),
2461        )
2462            .into_response()
2463    }
2464}
2465
2466// ========== Kafka Produce Handler ==========
2467
2468#[cfg(feature = "kafka")]
2469#[derive(Debug, Deserialize)]
2470pub struct KafkaProduceRequest {
2471    /// Topic to produce to
2472    pub topic: String,
2473    /// Message key (optional)
2474    #[serde(default)]
2475    pub key: Option<String>,
2476    /// Message value (JSON string or plain string)
2477    pub value: String,
2478    /// Partition ID (optional, auto-assigned if not provided)
2479    #[serde(default)]
2480    pub partition: Option<i32>,
2481    /// Message headers (optional, key-value pairs)
2482    #[serde(default)]
2483    pub headers: Option<std::collections::HashMap<String, String>>,
2484}
2485
2486#[cfg(feature = "kafka")]
2487/// Produce a message to a Kafka topic
2488async fn produce_kafka_message(
2489    State(state): State<ManagementState>,
2490    Json(request): Json<KafkaProduceRequest>,
2491) -> impl IntoResponse {
2492    if let Some(broker) = &state.kafka_broker {
2493        let mut topics = broker.topics.write().await;
2494
2495        // Get or create the topic
2496        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2497            mockforge_kafka::topics::Topic::new(
2498                request.topic.clone(),
2499                mockforge_kafka::topics::TopicConfig::default(),
2500            )
2501        });
2502
2503        // Determine partition
2504        let partition_id = if let Some(partition) = request.partition {
2505            partition
2506        } else {
2507            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2508        };
2509
2510        // Validate partition exists
2511        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2512            return (
2513                StatusCode::BAD_REQUEST,
2514                Json(serde_json::json!({
2515                    "error": "Invalid partition",
2516                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2517                })),
2518            )
2519                .into_response();
2520        }
2521
2522        // Create the message
2523        let key_clone = request.key.clone();
2524        let headers_clone = request.headers.clone();
2525        let message = mockforge_kafka::partitions::KafkaMessage {
2526            offset: 0, // Will be set by partition.append
2527            timestamp: chrono::Utc::now().timestamp_millis(),
2528            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2529            value: request.value.as_bytes().to_vec(),
2530            headers: headers_clone
2531                .clone()
2532                .unwrap_or_default()
2533                .into_iter()
2534                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2535                .collect(),
2536        };
2537
2538        // Produce to partition
2539        match topic_entry.produce(partition_id, message).await {
2540            Ok(offset) => {
2541                // Record metrics for successful message production
2542                if let Some(broker) = &state.kafka_broker {
2543                    broker.metrics().record_messages_produced(1);
2544                }
2545
2546                // Emit message event for real-time monitoring
2547                #[cfg(feature = "kafka")]
2548                {
2549                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2550                        topic: request.topic.clone(),
2551                        key: key_clone,
2552                        value: request.value.clone(),
2553                        partition: partition_id,
2554                        offset,
2555                        headers: headers_clone,
2556                        timestamp: chrono::Utc::now().to_rfc3339(),
2557                    });
2558                    let _ = state.message_events.send(event);
2559                }
2560
2561                Json(serde_json::json!({
2562                    "success": true,
2563                    "message": format!("Message produced to topic '{}'", request.topic),
2564                    "topic": request.topic,
2565                    "partition": partition_id,
2566                    "offset": offset
2567                }))
2568                .into_response()
2569            }
2570            Err(e) => (
2571                StatusCode::INTERNAL_SERVER_ERROR,
2572                Json(serde_json::json!({
2573                    "error": "Failed to produce message",
2574                    "message": e.to_string()
2575                })),
2576            )
2577                .into_response(),
2578        }
2579    } else {
2580        (
2581            StatusCode::SERVICE_UNAVAILABLE,
2582            Json(serde_json::json!({
2583                "error": "Kafka broker not available",
2584                "message": "Kafka broker is not enabled or not available."
2585            })),
2586        )
2587            .into_response()
2588    }
2589}
2590
2591#[cfg(feature = "kafka")]
2592#[derive(Debug, Deserialize)]
2593pub struct KafkaBatchProduceRequest {
2594    /// List of messages to produce
2595    pub messages: Vec<KafkaProduceRequest>,
2596    /// Delay between messages in milliseconds
2597    #[serde(default = "default_delay")]
2598    pub delay_ms: u64,
2599}
2600
2601#[cfg(feature = "kafka")]
2602/// Produce multiple messages to Kafka topics
2603async fn produce_kafka_batch(
2604    State(state): State<ManagementState>,
2605    Json(request): Json<KafkaBatchProduceRequest>,
2606) -> impl IntoResponse {
2607    if let Some(broker) = &state.kafka_broker {
2608        if request.messages.is_empty() {
2609            return (
2610                StatusCode::BAD_REQUEST,
2611                Json(serde_json::json!({
2612                    "error": "Empty batch",
2613                    "message": "At least one message is required"
2614                })),
2615            )
2616                .into_response();
2617        }
2618
2619        let mut results = Vec::new();
2620
2621        for (index, msg_request) in request.messages.iter().enumerate() {
2622            let mut topics = broker.topics.write().await;
2623
2624            // Get or create the topic
2625            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2626                mockforge_kafka::topics::Topic::new(
2627                    msg_request.topic.clone(),
2628                    mockforge_kafka::topics::TopicConfig::default(),
2629                )
2630            });
2631
2632            // Determine partition
2633            let partition_id = if let Some(partition) = msg_request.partition {
2634                partition
2635            } else {
2636                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2637            };
2638
2639            // Validate partition exists
2640            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2641                results.push(serde_json::json!({
2642                    "index": index,
2643                    "success": false,
2644                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2645                }));
2646                continue;
2647            }
2648
2649            // Create the message
2650            let message = mockforge_kafka::partitions::KafkaMessage {
2651                offset: 0,
2652                timestamp: chrono::Utc::now().timestamp_millis(),
2653                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2654                value: msg_request.value.as_bytes().to_vec(),
2655                headers: msg_request
2656                    .headers
2657                    .clone()
2658                    .unwrap_or_default()
2659                    .into_iter()
2660                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2661                    .collect(),
2662            };
2663
2664            // Produce to partition
2665            match topic_entry.produce(partition_id, message).await {
2666                Ok(offset) => {
2667                    // Record metrics for successful message production
2668                    if let Some(broker) = &state.kafka_broker {
2669                        broker.metrics().record_messages_produced(1);
2670                    }
2671
2672                    // Emit message event
2673                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2674                        topic: msg_request.topic.clone(),
2675                        key: msg_request.key.clone(),
2676                        value: msg_request.value.clone(),
2677                        partition: partition_id,
2678                        offset,
2679                        headers: msg_request.headers.clone(),
2680                        timestamp: chrono::Utc::now().to_rfc3339(),
2681                    });
2682                    let _ = state.message_events.send(event);
2683
2684                    results.push(serde_json::json!({
2685                        "index": index,
2686                        "success": true,
2687                        "topic": msg_request.topic,
2688                        "partition": partition_id,
2689                        "offset": offset
2690                    }));
2691                }
2692                Err(e) => {
2693                    results.push(serde_json::json!({
2694                        "index": index,
2695                        "success": false,
2696                        "error": e.to_string()
2697                    }));
2698                }
2699            }
2700
2701            // Add delay between messages (except for the last one)
2702            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2703                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2704            }
2705        }
2706
2707        let success_count =
2708            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2709
2710        Json(serde_json::json!({
2711            "success": true,
2712            "total": request.messages.len(),
2713            "succeeded": success_count,
2714            "failed": request.messages.len() - success_count,
2715            "results": results
2716        }))
2717        .into_response()
2718    } else {
2719        (
2720            StatusCode::SERVICE_UNAVAILABLE,
2721            Json(serde_json::json!({
2722                "error": "Kafka broker not available",
2723                "message": "Kafka broker is not enabled or not available."
2724            })),
2725        )
2726            .into_response()
2727    }
2728}
2729
2730// ========== Real-time Message Streaming (SSE) ==========
2731
2732#[cfg(feature = "mqtt")]
2733/// SSE stream for MQTT messages
2734async fn mqtt_messages_stream(
2735    State(state): State<ManagementState>,
2736    Query(params): Query<std::collections::HashMap<String, String>>,
2737) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2738    let rx = state.message_events.subscribe();
2739    let topic_filter = params.get("topic").cloned();
2740
2741    let stream = stream::unfold(rx, move |mut rx| {
2742        let topic_filter = topic_filter.clone();
2743
2744        async move {
2745            loop {
2746                match rx.recv().await {
2747                    Ok(MessageEvent::Mqtt(event)) => {
2748                        // Apply topic filter if specified
2749                        if let Some(filter) = &topic_filter {
2750                            if !event.topic.contains(filter) {
2751                                continue;
2752                            }
2753                        }
2754
2755                        let event_json = serde_json::json!({
2756                            "protocol": "mqtt",
2757                            "topic": event.topic,
2758                            "payload": event.payload,
2759                            "qos": event.qos,
2760                            "retain": event.retain,
2761                            "timestamp": event.timestamp,
2762                        });
2763
2764                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2765                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2766                            return Some((Ok(sse_event), rx));
2767                        }
2768                    }
2769                    #[cfg(feature = "kafka")]
2770                    Ok(MessageEvent::Kafka(_)) => {
2771                        // Skip Kafka events in MQTT stream
2772                        continue;
2773                    }
2774                    Err(broadcast::error::RecvError::Closed) => {
2775                        return None;
2776                    }
2777                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2778                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2779                        continue;
2780                    }
2781                }
2782            }
2783        }
2784    });
2785
2786    Sse::new(stream).keep_alive(
2787        axum::response::sse::KeepAlive::new()
2788            .interval(std::time::Duration::from_secs(15))
2789            .text("keep-alive-text"),
2790    )
2791}
2792
2793#[cfg(feature = "kafka")]
2794/// SSE stream for Kafka messages
2795async fn kafka_messages_stream(
2796    State(state): State<ManagementState>,
2797    Query(params): Query<std::collections::HashMap<String, String>>,
2798) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2799    let mut rx = state.message_events.subscribe();
2800    let topic_filter = params.get("topic").cloned();
2801
2802    let stream = stream::unfold(rx, move |mut rx| {
2803        let topic_filter = topic_filter.clone();
2804
2805        async move {
2806            loop {
2807                match rx.recv().await {
2808                    #[cfg(feature = "mqtt")]
2809                    Ok(MessageEvent::Mqtt(_)) => {
2810                        // Skip MQTT events in Kafka stream
2811                        continue;
2812                    }
2813                    Ok(MessageEvent::Kafka(event)) => {
2814                        // Apply topic filter if specified
2815                        if let Some(filter) = &topic_filter {
2816                            if !event.topic.contains(filter) {
2817                                continue;
2818                            }
2819                        }
2820
2821                        let event_json = serde_json::json!({
2822                            "protocol": "kafka",
2823                            "topic": event.topic,
2824                            "key": event.key,
2825                            "value": event.value,
2826                            "partition": event.partition,
2827                            "offset": event.offset,
2828                            "headers": event.headers,
2829                            "timestamp": event.timestamp,
2830                        });
2831
2832                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2833                            let sse_event =
2834                                Event::default().event("kafka_message").data(event_data);
2835                            return Some((Ok(sse_event), rx));
2836                        }
2837                    }
2838                    Err(broadcast::error::RecvError::Closed) => {
2839                        return None;
2840                    }
2841                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2842                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2843                        continue;
2844                    }
2845                }
2846            }
2847        }
2848    });
2849
2850    Sse::new(stream).keep_alive(
2851        axum::response::sse::KeepAlive::new()
2852            .interval(std::time::Duration::from_secs(15))
2853            .text("keep-alive-text"),
2854    )
2855}
2856
2857// ========== AI-Powered Features ==========
2858
2859/// Request for AI-powered API specification generation
2860#[derive(Debug, Deserialize)]
2861pub struct GenerateSpecRequest {
2862    /// Natural language description of the API to generate
2863    pub query: String,
2864    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2865    pub spec_type: String,
2866    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2867    pub api_version: Option<String>,
2868}
2869
2870/// Request for OpenAPI generation from recorded traffic
2871#[derive(Debug, Deserialize)]
2872pub struct GenerateOpenApiFromTrafficRequest {
2873    /// Path to recorder database (optional, defaults to ./recordings.db)
2874    #[serde(default)]
2875    pub database_path: Option<String>,
2876    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2877    #[serde(default)]
2878    pub since: Option<String>,
2879    /// End time for filtering (ISO 8601 format)
2880    #[serde(default)]
2881    pub until: Option<String>,
2882    /// Path pattern filter (supports wildcards, e.g., /api/*)
2883    #[serde(default)]
2884    pub path_pattern: Option<String>,
2885    /// Minimum confidence score for including paths (0.0 to 1.0)
2886    #[serde(default = "default_min_confidence")]
2887    pub min_confidence: f64,
2888}
2889
2890fn default_min_confidence() -> f64 {
2891    0.7
2892}
2893
2894/// Generate API specification from natural language using AI
2895#[cfg(feature = "data-faker")]
2896async fn generate_ai_spec(
2897    State(_state): State<ManagementState>,
2898    Json(request): Json<GenerateSpecRequest>,
2899) -> impl IntoResponse {
2900    use mockforge_data::rag::{
2901        config::{LlmProvider, RagConfig},
2902        engine::RagEngine,
2903        storage::DocumentStorage,
2904    };
2905    use std::sync::Arc;
2906
2907    // Build RAG config from environment variables
2908    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2909        .ok()
2910        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2911
2912    // Check if RAG is configured - require API key
2913    if api_key.is_none() {
2914        return (
2915            StatusCode::SERVICE_UNAVAILABLE,
2916            Json(serde_json::json!({
2917                "error": "AI service not configured",
2918                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2919            })),
2920        )
2921            .into_response();
2922    }
2923
2924    // Build RAG configuration
2925    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2926        .unwrap_or_else(|_| "openai".to_string())
2927        .to_lowercase();
2928
2929    let provider = match provider_str.as_str() {
2930        "openai" => LlmProvider::OpenAI,
2931        "anthropic" => LlmProvider::Anthropic,
2932        "ollama" => LlmProvider::Ollama,
2933        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2934        _ => LlmProvider::OpenAI,
2935    };
2936
2937    let api_endpoint =
2938        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2939            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2940            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2941            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2942            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2943        });
2944
2945    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2946        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2947        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2948        LlmProvider::Ollama => "llama2".to_string(),
2949        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2950    });
2951
2952    // Build RagConfig using default() and override fields
2953    let mut rag_config = RagConfig::default();
2954    rag_config.provider = provider;
2955    rag_config.api_endpoint = api_endpoint;
2956    rag_config.api_key = api_key;
2957    rag_config.model = model;
2958    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2959        .unwrap_or_else(|_| "4096".to_string())
2960        .parse()
2961        .unwrap_or(4096);
2962    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2963        .unwrap_or_else(|_| "0.3".to_string())
2964        .parse()
2965        .unwrap_or(0.3); // Lower temperature for more structured output
2966    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2967        .unwrap_or_else(|_| "60".to_string())
2968        .parse()
2969        .unwrap_or(60);
2970    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2971        .unwrap_or_else(|_| "4000".to_string())
2972        .parse()
2973        .unwrap_or(4000);
2974
2975    // Build the prompt for spec generation
2976    let spec_type_label = match request.spec_type.as_str() {
2977        "openapi" => "OpenAPI 3.0",
2978        "graphql" => "GraphQL",
2979        "asyncapi" => "AsyncAPI",
2980        _ => "OpenAPI 3.0",
2981    };
2982
2983    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2984
2985    let prompt = format!(
2986        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2987
2988User Requirements:
2989{}
2990
2991Instructions:
29921. Generate a complete, valid {} specification
29932. Include all paths, operations, request/response schemas, and components
29943. Use realistic field names and data types
29954. Include proper descriptions and examples
29965. Follow {} best practices
29976. Return ONLY the specification, no additional explanation
29987. For OpenAPI, use version {}
2999
3000Return the specification in {} format."#,
3001        spec_type_label,
3002        request.query,
3003        spec_type_label,
3004        spec_type_label,
3005        api_version,
3006        if request.spec_type == "graphql" {
3007            "GraphQL SDL"
3008        } else {
3009            "YAML"
3010        }
3011    );
3012
3013    // Create in-memory storage for RAG engine
3014    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3015    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3016    // a simpler approach: create InMemoryStorage directly
3017    use mockforge_data::rag::storage::InMemoryStorage;
3018    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3019
3020    // Create RAG engine
3021    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3022        Ok(engine) => engine,
3023        Err(e) => {
3024            return (
3025                StatusCode::INTERNAL_SERVER_ERROR,
3026                Json(serde_json::json!({
3027                    "error": "Failed to initialize RAG engine",
3028                    "message": e.to_string()
3029                })),
3030            )
3031                .into_response();
3032        }
3033    };
3034
3035    // Generate using RAG engine
3036    match rag_engine.generate(&prompt, None).await {
3037        Ok(generated_text) => {
3038            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3039            let spec = if request.spec_type == "graphql" {
3040                // For GraphQL, extract SDL
3041                extract_graphql_schema(&generated_text)
3042            } else {
3043                // For OpenAPI/AsyncAPI, extract YAML
3044                extract_yaml_spec(&generated_text)
3045            };
3046
3047            Json(serde_json::json!({
3048                "success": true,
3049                "spec": spec,
3050                "spec_type": request.spec_type,
3051            }))
3052            .into_response()
3053        }
3054        Err(e) => (
3055            StatusCode::INTERNAL_SERVER_ERROR,
3056            Json(serde_json::json!({
3057                "error": "AI generation failed",
3058                "message": e.to_string()
3059            })),
3060        )
3061            .into_response(),
3062    }
3063}
3064
3065#[cfg(not(feature = "data-faker"))]
3066async fn generate_ai_spec(
3067    State(_state): State<ManagementState>,
3068    Json(_request): Json<GenerateSpecRequest>,
3069) -> impl IntoResponse {
3070    (
3071        StatusCode::NOT_IMPLEMENTED,
3072        Json(serde_json::json!({
3073            "error": "AI features not enabled",
3074            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3075        })),
3076    )
3077        .into_response()
3078}
3079
3080/// Generate OpenAPI specification from recorded traffic
3081#[cfg(feature = "behavioral-cloning")]
3082async fn generate_openapi_from_traffic(
3083    State(_state): State<ManagementState>,
3084    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3085) -> impl IntoResponse {
3086    use chrono::{DateTime, Utc};
3087    use mockforge_core::intelligent_behavior::{
3088        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3089        IntelligentBehaviorConfig,
3090    };
3091    use mockforge_recorder::{
3092        database::RecorderDatabase,
3093        openapi_export::{QueryFilters, RecordingsToOpenApi},
3094    };
3095    use std::path::PathBuf;
3096
3097    // Determine database path
3098    let db_path = if let Some(ref path) = request.database_path {
3099        PathBuf::from(path)
3100    } else {
3101        std::env::current_dir()
3102            .unwrap_or_else(|_| PathBuf::from("."))
3103            .join("recordings.db")
3104    };
3105
3106    // Open database
3107    let db = match RecorderDatabase::new(&db_path).await {
3108        Ok(db) => db,
3109        Err(e) => {
3110            return (
3111                StatusCode::BAD_REQUEST,
3112                Json(serde_json::json!({
3113                    "error": "Database error",
3114                    "message": format!("Failed to open recorder database: {}", e)
3115                })),
3116            )
3117                .into_response();
3118        }
3119    };
3120
3121    // Parse time filters
3122    let since_dt = if let Some(ref since_str) = request.since {
3123        match DateTime::parse_from_rfc3339(since_str) {
3124            Ok(dt) => Some(dt.with_timezone(&Utc)),
3125            Err(e) => {
3126                return (
3127                    StatusCode::BAD_REQUEST,
3128                    Json(serde_json::json!({
3129                        "error": "Invalid date format",
3130                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3131                    })),
3132                )
3133                    .into_response();
3134            }
3135        }
3136    } else {
3137        None
3138    };
3139
3140    let until_dt = if let Some(ref until_str) = request.until {
3141        match DateTime::parse_from_rfc3339(until_str) {
3142            Ok(dt) => Some(dt.with_timezone(&Utc)),
3143            Err(e) => {
3144                return (
3145                    StatusCode::BAD_REQUEST,
3146                    Json(serde_json::json!({
3147                        "error": "Invalid date format",
3148                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3149                    })),
3150                )
3151                    .into_response();
3152            }
3153        }
3154    } else {
3155        None
3156    };
3157
3158    // Build query filters
3159    let query_filters = QueryFilters {
3160        since: since_dt,
3161        until: until_dt,
3162        path_pattern: request.path_pattern.clone(),
3163        min_status_code: None,
3164        max_requests: Some(1000),
3165    };
3166
3167    // Query HTTP exchanges
3168    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3169    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3170    // dependency, so we need to manually convert to the local version.
3171    let exchanges_from_recorder =
3172        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3173            Ok(exchanges) => exchanges,
3174            Err(e) => {
3175                return (
3176                    StatusCode::INTERNAL_SERVER_ERROR,
3177                    Json(serde_json::json!({
3178                        "error": "Query error",
3179                        "message": format!("Failed to query HTTP exchanges: {}", e)
3180                    })),
3181                )
3182                    .into_response();
3183            }
3184        };
3185
3186    if exchanges_from_recorder.is_empty() {
3187        return (
3188            StatusCode::NOT_FOUND,
3189            Json(serde_json::json!({
3190                "error": "No exchanges found",
3191                "message": "No HTTP exchanges found matching the specified filters"
3192            })),
3193        )
3194            .into_response();
3195    }
3196
3197    // Convert to local HttpExchange type to avoid version mismatch
3198    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3199    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3200        .into_iter()
3201        .map(|e| LocalHttpExchange {
3202            method: e.method,
3203            path: e.path,
3204            query_params: e.query_params,
3205            headers: e.headers,
3206            body: e.body,
3207            body_encoding: e.body_encoding,
3208            status_code: e.status_code,
3209            response_headers: e.response_headers,
3210            response_body: e.response_body,
3211            response_body_encoding: e.response_body_encoding,
3212            timestamp: e.timestamp,
3213        })
3214        .collect();
3215
3216    // Create OpenAPI generator config
3217    let behavior_config = IntelligentBehaviorConfig::default();
3218    let gen_config = OpenApiGenerationConfig {
3219        min_confidence: request.min_confidence,
3220        behavior_model: Some(behavior_config.behavior_model),
3221    };
3222
3223    // Generate OpenAPI spec
3224    let generator = OpenApiSpecGenerator::new(gen_config);
3225    let result = match generator.generate_from_exchanges(exchanges).await {
3226        Ok(result) => result,
3227        Err(e) => {
3228            return (
3229                StatusCode::INTERNAL_SERVER_ERROR,
3230                Json(serde_json::json!({
3231                    "error": "Generation error",
3232                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3233                })),
3234            )
3235                .into_response();
3236        }
3237    };
3238
3239    // Prepare response
3240    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3241        raw.clone()
3242    } else {
3243        match serde_json::to_value(&result.spec.spec) {
3244            Ok(json) => json,
3245            Err(e) => {
3246                return (
3247                    StatusCode::INTERNAL_SERVER_ERROR,
3248                    Json(serde_json::json!({
3249                        "error": "Serialization error",
3250                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3251                    })),
3252                )
3253                    .into_response();
3254            }
3255        }
3256    };
3257
3258    // Build response with metadata
3259    let response = serde_json::json!({
3260        "spec": spec_json,
3261        "metadata": {
3262            "requests_analyzed": result.metadata.requests_analyzed,
3263            "paths_inferred": result.metadata.paths_inferred,
3264            "path_confidence": result.metadata.path_confidence,
3265            "generated_at": result.metadata.generated_at.to_rfc3339(),
3266            "duration_ms": result.metadata.duration_ms,
3267        }
3268    });
3269
3270    Json(response).into_response()
3271}
3272
3273/// List all rule explanations
3274async fn list_rule_explanations(
3275    State(state): State<ManagementState>,
3276    Query(params): Query<std::collections::HashMap<String, String>>,
3277) -> impl IntoResponse {
3278    use mockforge_core::intelligent_behavior::RuleType;
3279
3280    let explanations = state.rule_explanations.read().await;
3281    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3282
3283    // Filter by rule type if provided
3284    if let Some(rule_type_str) = params.get("rule_type") {
3285        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3286            explanations_vec.retain(|e| e.rule_type == rule_type);
3287        }
3288    }
3289
3290    // Filter by minimum confidence if provided
3291    if let Some(min_confidence_str) = params.get("min_confidence") {
3292        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3293            explanations_vec.retain(|e| e.confidence >= min_confidence);
3294        }
3295    }
3296
3297    // Sort by confidence (descending) and then by generated_at (descending)
3298    explanations_vec.sort_by(|a, b| {
3299        b.confidence
3300            .partial_cmp(&a.confidence)
3301            .unwrap_or(std::cmp::Ordering::Equal)
3302            .then_with(|| b.generated_at.cmp(&a.generated_at))
3303    });
3304
3305    Json(serde_json::json!({
3306        "explanations": explanations_vec,
3307        "total": explanations_vec.len(),
3308    }))
3309    .into_response()
3310}
3311
3312/// Get a specific rule explanation by ID
3313async fn get_rule_explanation(
3314    State(state): State<ManagementState>,
3315    Path(rule_id): Path<String>,
3316) -> impl IntoResponse {
3317    let explanations = state.rule_explanations.read().await;
3318
3319    match explanations.get(&rule_id) {
3320        Some(explanation) => Json(serde_json::json!({
3321            "explanation": explanation,
3322        }))
3323        .into_response(),
3324        None => (
3325            StatusCode::NOT_FOUND,
3326            Json(serde_json::json!({
3327                "error": "Rule explanation not found",
3328                "message": format!("No explanation found for rule ID: {}", rule_id)
3329            })),
3330        )
3331            .into_response(),
3332    }
3333}
3334
3335/// Request for learning from examples
3336#[derive(Debug, Deserialize)]
3337pub struct LearnFromExamplesRequest {
3338    /// Example request/response pairs to learn from
3339    pub examples: Vec<ExamplePairRequest>,
3340    /// Optional configuration override
3341    #[serde(default)]
3342    pub config: Option<serde_json::Value>,
3343}
3344
3345/// Example pair request format
3346#[derive(Debug, Deserialize)]
3347pub struct ExamplePairRequest {
3348    /// Request data (method, path, body, etc.)
3349    pub request: serde_json::Value,
3350    /// Response data (status_code, body, etc.)
3351    pub response: serde_json::Value,
3352}
3353
3354/// Learn behavioral rules from example pairs
3355///
3356/// This endpoint accepts example request/response pairs, generates behavioral rules
3357/// with explanations, and stores the explanations for later retrieval.
3358async fn learn_from_examples(
3359    State(state): State<ManagementState>,
3360    Json(request): Json<LearnFromExamplesRequest>,
3361) -> impl IntoResponse {
3362    use mockforge_core::intelligent_behavior::{
3363        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3364        rule_generator::{ExamplePair, RuleGenerator},
3365    };
3366
3367    if request.examples.is_empty() {
3368        return (
3369            StatusCode::BAD_REQUEST,
3370            Json(serde_json::json!({
3371                "error": "No examples provided",
3372                "message": "At least one example pair is required"
3373            })),
3374        )
3375            .into_response();
3376    }
3377
3378    // Convert request examples to ExamplePair format
3379    let example_pairs: Result<Vec<ExamplePair>, String> = request
3380        .examples
3381        .into_iter()
3382        .enumerate()
3383        .map(|(idx, ex)| {
3384            // Parse request JSON to extract method, path, body, etc.
3385            let method = ex
3386                .request
3387                .get("method")
3388                .and_then(|v| v.as_str())
3389                .map(|s| s.to_string())
3390                .unwrap_or_else(|| "GET".to_string());
3391            let path = ex
3392                .request
3393                .get("path")
3394                .and_then(|v| v.as_str())
3395                .map(|s| s.to_string())
3396                .unwrap_or_else(|| "/".to_string());
3397            let request_body = ex.request.get("body").cloned();
3398            let query_params = ex
3399                .request
3400                .get("query_params")
3401                .and_then(|v| v.as_object())
3402                .map(|obj| {
3403                    obj.iter()
3404                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3405                        .collect()
3406                })
3407                .unwrap_or_default();
3408            let headers = ex
3409                .request
3410                .get("headers")
3411                .and_then(|v| v.as_object())
3412                .map(|obj| {
3413                    obj.iter()
3414                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3415                        .collect()
3416                })
3417                .unwrap_or_default();
3418
3419            // Parse response JSON to extract status, body, etc.
3420            let status = ex
3421                .response
3422                .get("status_code")
3423                .or_else(|| ex.response.get("status"))
3424                .and_then(|v| v.as_u64())
3425                .map(|n| n as u16)
3426                .unwrap_or(200);
3427            let response_body = ex.response.get("body").cloned();
3428
3429            Ok(ExamplePair {
3430                method,
3431                path,
3432                request: request_body,
3433                status,
3434                response: response_body,
3435                query_params,
3436                headers,
3437                metadata: {
3438                    let mut meta = std::collections::HashMap::new();
3439                    meta.insert("source".to_string(), "api".to_string());
3440                    meta.insert("example_index".to_string(), idx.to_string());
3441                    meta
3442                },
3443            })
3444        })
3445        .collect();
3446
3447    let example_pairs = match example_pairs {
3448        Ok(pairs) => pairs,
3449        Err(e) => {
3450            return (
3451                StatusCode::BAD_REQUEST,
3452                Json(serde_json::json!({
3453                    "error": "Invalid examples",
3454                    "message": e
3455                })),
3456            )
3457                .into_response();
3458        }
3459    };
3460
3461    // Create behavior config (use provided config or default)
3462    let behavior_config = if let Some(config_json) = request.config {
3463        // Try to deserialize custom config, fall back to default
3464        serde_json::from_value(config_json)
3465            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3466            .behavior_model
3467    } else {
3468        BehaviorModelConfig::default()
3469    };
3470
3471    // Create rule generator
3472    let generator = RuleGenerator::new(behavior_config);
3473
3474    // Generate rules with explanations
3475    let (rules, explanations) =
3476        match generator.generate_rules_with_explanations(example_pairs).await {
3477            Ok(result) => result,
3478            Err(e) => {
3479                return (
3480                    StatusCode::INTERNAL_SERVER_ERROR,
3481                    Json(serde_json::json!({
3482                        "error": "Rule generation failed",
3483                        "message": format!("Failed to generate rules: {}", e)
3484                    })),
3485                )
3486                    .into_response();
3487            }
3488        };
3489
3490    // Store explanations in ManagementState
3491    {
3492        let mut stored_explanations = state.rule_explanations.write().await;
3493        for explanation in &explanations {
3494            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3495        }
3496    }
3497
3498    // Prepare response
3499    let response = serde_json::json!({
3500        "success": true,
3501        "rules_generated": {
3502            "consistency_rules": rules.consistency_rules.len(),
3503            "schemas": rules.schemas.len(),
3504            "state_machines": rules.state_transitions.len(),
3505            "system_prompt": !rules.system_prompt.is_empty(),
3506        },
3507        "explanations": explanations.iter().map(|e| serde_json::json!({
3508            "rule_id": e.rule_id,
3509            "rule_type": e.rule_type,
3510            "confidence": e.confidence,
3511            "reasoning": e.reasoning,
3512        })).collect::<Vec<_>>(),
3513        "total_explanations": explanations.len(),
3514    });
3515
3516    Json(response).into_response()
3517}
3518
3519#[cfg(feature = "data-faker")]
3520fn extract_yaml_spec(text: &str) -> String {
3521    // Try to find YAML code blocks
3522    if let Some(start) = text.find("```yaml") {
3523        let yaml_start = text[start + 7..].trim_start();
3524        if let Some(end) = yaml_start.find("```") {
3525            return yaml_start[..end].trim().to_string();
3526        }
3527    }
3528    if let Some(start) = text.find("```") {
3529        let content_start = text[start + 3..].trim_start();
3530        if let Some(end) = content_start.find("```") {
3531            return content_start[..end].trim().to_string();
3532        }
3533    }
3534
3535    // Check if it starts with openapi: or asyncapi:
3536    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3537        return text.trim().to_string();
3538    }
3539
3540    // Return as-is if no code blocks found
3541    text.trim().to_string()
3542}
3543
3544/// Extract GraphQL schema from text content
3545#[cfg(feature = "data-faker")]
3546fn extract_graphql_schema(text: &str) -> String {
3547    // Try to find GraphQL code blocks
3548    if let Some(start) = text.find("```graphql") {
3549        let schema_start = text[start + 10..].trim_start();
3550        if let Some(end) = schema_start.find("```") {
3551            return schema_start[..end].trim().to_string();
3552        }
3553    }
3554    if let Some(start) = text.find("```") {
3555        let content_start = text[start + 3..].trim_start();
3556        if let Some(end) = content_start.find("```") {
3557            return content_start[..end].trim().to_string();
3558        }
3559    }
3560
3561    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3562    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3563        return text.trim().to_string();
3564    }
3565
3566    text.trim().to_string()
3567}
3568
3569// ========== Chaos Engineering Management ==========
3570
3571/// Get current chaos engineering configuration
3572async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3573    #[cfg(feature = "chaos")]
3574    {
3575        if let Some(chaos_state) = &_state.chaos_api_state {
3576            let config = chaos_state.config.read().await;
3577            // Convert ChaosConfig to JSON response format
3578            Json(serde_json::json!({
3579                "enabled": config.enabled,
3580                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3581                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3582                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3583                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3584            }))
3585            .into_response()
3586        } else {
3587            // Chaos API not available, return default
3588            Json(serde_json::json!({
3589                "enabled": false,
3590                "latency": null,
3591                "fault_injection": null,
3592                "rate_limit": null,
3593                "traffic_shaping": null,
3594            }))
3595            .into_response()
3596        }
3597    }
3598    #[cfg(not(feature = "chaos"))]
3599    {
3600        // Chaos feature not enabled
3601        Json(serde_json::json!({
3602            "enabled": false,
3603            "latency": null,
3604            "fault_injection": null,
3605            "rate_limit": null,
3606            "traffic_shaping": null,
3607        }))
3608        .into_response()
3609    }
3610}
3611
3612/// Request to update chaos configuration
3613#[derive(Debug, Deserialize)]
3614pub struct ChaosConfigUpdate {
3615    /// Whether to enable chaos engineering
3616    pub enabled: Option<bool>,
3617    /// Latency configuration
3618    pub latency: Option<serde_json::Value>,
3619    /// Fault injection configuration
3620    pub fault_injection: Option<serde_json::Value>,
3621    /// Rate limiting configuration
3622    pub rate_limit: Option<serde_json::Value>,
3623    /// Traffic shaping configuration
3624    pub traffic_shaping: Option<serde_json::Value>,
3625}
3626
3627/// Update chaos engineering configuration
3628async fn update_chaos_config(
3629    State(_state): State<ManagementState>,
3630    Json(_config_update): Json<ChaosConfigUpdate>,
3631) -> impl IntoResponse {
3632    #[cfg(feature = "chaos")]
3633    {
3634        if let Some(chaos_state) = &_state.chaos_api_state {
3635            use mockforge_chaos::config::{
3636                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3637                TrafficShapingConfig,
3638            };
3639
3640            let mut config = chaos_state.config.write().await;
3641
3642            // Update enabled flag if provided
3643            if let Some(enabled) = _config_update.enabled {
3644                config.enabled = enabled;
3645            }
3646
3647            // Update latency config if provided
3648            if let Some(latency_json) = _config_update.latency {
3649                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3650                    config.latency = Some(latency);
3651                }
3652            }
3653
3654            // Update fault injection config if provided
3655            if let Some(fault_json) = _config_update.fault_injection {
3656                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3657                    config.fault_injection = Some(fault);
3658                }
3659            }
3660
3661            // Update rate limit config if provided
3662            if let Some(rate_json) = _config_update.rate_limit {
3663                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3664                    config.rate_limit = Some(rate);
3665                }
3666            }
3667
3668            // Update traffic shaping config if provided
3669            if let Some(traffic_json) = _config_update.traffic_shaping {
3670                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3671                    config.traffic_shaping = Some(traffic);
3672                }
3673            }
3674
3675            // Reinitialize middleware injectors with new config
3676            // The middleware will pick up the changes on the next request
3677            drop(config);
3678
3679            info!("Chaos configuration updated successfully");
3680            Json(serde_json::json!({
3681                "success": true,
3682                "message": "Chaos configuration updated and applied"
3683            }))
3684            .into_response()
3685        } else {
3686            (
3687                StatusCode::SERVICE_UNAVAILABLE,
3688                Json(serde_json::json!({
3689                    "success": false,
3690                    "error": "Chaos API not available",
3691                    "message": "Chaos engineering is not enabled or configured"
3692                })),
3693            )
3694                .into_response()
3695        }
3696    }
3697    #[cfg(not(feature = "chaos"))]
3698    {
3699        (
3700            StatusCode::NOT_IMPLEMENTED,
3701            Json(serde_json::json!({
3702                "success": false,
3703                "error": "Chaos feature not enabled",
3704                "message": "Chaos engineering feature is not compiled into this build"
3705            })),
3706        )
3707            .into_response()
3708    }
3709}
3710
3711// ========== Network Profile Management ==========
3712
3713/// List available network profiles
3714async fn list_network_profiles() -> impl IntoResponse {
3715    use mockforge_core::network_profiles::NetworkProfileCatalog;
3716
3717    let catalog = NetworkProfileCatalog::default();
3718    let profiles: Vec<serde_json::Value> = catalog
3719        .list_profiles_with_description()
3720        .iter()
3721        .map(|(name, description)| {
3722            serde_json::json!({
3723                "name": name,
3724                "description": description,
3725            })
3726        })
3727        .collect();
3728
3729    Json(serde_json::json!({
3730        "profiles": profiles
3731    }))
3732    .into_response()
3733}
3734
3735#[derive(Debug, Deserialize)]
3736/// Request to apply a network profile
3737pub struct ApplyNetworkProfileRequest {
3738    /// Name of the network profile to apply
3739    pub profile_name: String,
3740}
3741
3742/// Apply a network profile
3743async fn apply_network_profile(
3744    State(state): State<ManagementState>,
3745    Json(request): Json<ApplyNetworkProfileRequest>,
3746) -> impl IntoResponse {
3747    use mockforge_core::network_profiles::NetworkProfileCatalog;
3748
3749    let catalog = NetworkProfileCatalog::default();
3750    if let Some(profile) = catalog.get(&request.profile_name) {
3751        // Apply profile to server configuration if available
3752        // NetworkProfile contains latency and traffic_shaping configs
3753        if let Some(server_config) = &state.server_config {
3754            let mut config = server_config.write().await;
3755
3756            // Apply network profile's traffic shaping to core config
3757            use mockforge_core::config::NetworkShapingConfig;
3758
3759            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3760            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3761            // which has bandwidth and burst_loss fields
3762            let network_shaping = NetworkShapingConfig {
3763                enabled: profile.traffic_shaping.bandwidth.enabled
3764                    || profile.traffic_shaping.burst_loss.enabled,
3765                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3766                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3767                max_connections: 1000, // Default value
3768            };
3769
3770            // Update chaos config if it exists, or create it
3771            // Chaos config is in observability.chaos, not core.chaos
3772            if let Some(ref mut chaos) = config.observability.chaos {
3773                chaos.traffic_shaping = Some(network_shaping);
3774            } else {
3775                // Create minimal chaos config with traffic shaping
3776                use mockforge_core::config::ChaosEngConfig;
3777                config.observability.chaos = Some(ChaosEngConfig {
3778                    enabled: true,
3779                    latency: None,
3780                    fault_injection: None,
3781                    rate_limit: None,
3782                    traffic_shaping: Some(network_shaping),
3783                    scenario: None,
3784                });
3785            }
3786
3787            info!("Network profile '{}' applied to server configuration", request.profile_name);
3788        } else {
3789            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3790        }
3791
3792        // Also update chaos API state if available
3793        #[cfg(feature = "chaos")]
3794        {
3795            if let Some(chaos_state) = &state.chaos_api_state {
3796                use mockforge_chaos::config::TrafficShapingConfig;
3797
3798                let mut chaos_config = chaos_state.config.write().await;
3799                // Apply profile's traffic shaping to chaos API state
3800                let chaos_traffic_shaping = TrafficShapingConfig {
3801                    enabled: profile.traffic_shaping.bandwidth.enabled
3802                        || profile.traffic_shaping.burst_loss.enabled,
3803                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3804                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3805                    max_connections: 0,
3806                    connection_timeout_ms: 30000,
3807                };
3808                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3809                chaos_config.enabled = true; // Enable chaos when applying a profile
3810                drop(chaos_config);
3811                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3812            }
3813        }
3814
3815        Json(serde_json::json!({
3816            "success": true,
3817            "message": format!("Network profile '{}' applied", request.profile_name),
3818            "profile": {
3819                "name": profile.name,
3820                "description": profile.description,
3821            }
3822        }))
3823        .into_response()
3824    } else {
3825        (
3826            StatusCode::NOT_FOUND,
3827            Json(serde_json::json!({
3828                "error": "Profile not found",
3829                "message": format!("Network profile '{}' not found", request.profile_name)
3830            })),
3831        )
3832            .into_response()
3833    }
3834}
3835
3836/// Build the management API router with UI Builder support
3837pub fn management_router_with_ui_builder(
3838    state: ManagementState,
3839    server_config: mockforge_core::config::ServerConfig,
3840) -> Router {
3841    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3842
3843    // Create the base management router
3844    let management = management_router(state);
3845
3846    // Create UI Builder state and router
3847    let ui_builder_state = UIBuilderState::new(server_config);
3848    let ui_builder = create_ui_builder_router(ui_builder_state);
3849
3850    // Nest UI Builder under /ui-builder
3851    management.nest("/ui-builder", ui_builder)
3852}
3853
3854/// Build management router with spec import API
3855pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3856    use crate::spec_import::{spec_import_router, SpecImportState};
3857
3858    // Create base management router
3859    let management = management_router(state);
3860
3861    // Merge with spec import router
3862    Router::new()
3863        .merge(management)
3864        .merge(spec_import_router(SpecImportState::new()))
3865}
3866
3867#[cfg(test)]
3868mod tests {
3869    use super::*;
3870
3871    #[tokio::test]
3872    async fn test_create_and_get_mock() {
3873        let state = ManagementState::new(None, None, 3000);
3874
3875        let mock = MockConfig {
3876            id: "test-1".to_string(),
3877            name: "Test Mock".to_string(),
3878            method: "GET".to_string(),
3879            path: "/test".to_string(),
3880            response: MockResponse {
3881                body: serde_json::json!({"message": "test"}),
3882                headers: None,
3883            },
3884            enabled: true,
3885            latency_ms: None,
3886            status_code: Some(200),
3887            request_match: None,
3888            priority: None,
3889            scenario: None,
3890            required_scenario_state: None,
3891            new_scenario_state: None,
3892        };
3893
3894        // Create mock
3895        {
3896            let mut mocks = state.mocks.write().await;
3897            mocks.push(mock.clone());
3898        }
3899
3900        // Get mock
3901        let mocks = state.mocks.read().await;
3902        let found = mocks.iter().find(|m| m.id == "test-1");
3903        assert!(found.is_some());
3904        assert_eq!(found.unwrap().name, "Test Mock");
3905    }
3906
3907    #[tokio::test]
3908    async fn test_server_stats() {
3909        let state = ManagementState::new(None, None, 3000);
3910
3911        // Add some mocks
3912        {
3913            let mut mocks = state.mocks.write().await;
3914            mocks.push(MockConfig {
3915                id: "1".to_string(),
3916                name: "Mock 1".to_string(),
3917                method: "GET".to_string(),
3918                path: "/test1".to_string(),
3919                response: MockResponse {
3920                    body: serde_json::json!({}),
3921                    headers: None,
3922                },
3923                enabled: true,
3924                latency_ms: None,
3925                status_code: Some(200),
3926                request_match: None,
3927                priority: None,
3928                scenario: None,
3929                required_scenario_state: None,
3930                new_scenario_state: None,
3931            });
3932            mocks.push(MockConfig {
3933                id: "2".to_string(),
3934                name: "Mock 2".to_string(),
3935                method: "POST".to_string(),
3936                path: "/test2".to_string(),
3937                response: MockResponse {
3938                    body: serde_json::json!({}),
3939                    headers: None,
3940                },
3941                enabled: false,
3942                latency_ms: None,
3943                status_code: Some(201),
3944                request_match: None,
3945                priority: None,
3946                scenario: None,
3947                required_scenario_state: None,
3948                new_scenario_state: None,
3949            });
3950        }
3951
3952        let mocks = state.mocks.read().await;
3953        assert_eq!(mocks.len(), 2);
3954        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3955    }
3956}