mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28/// Default broadcast channel capacity for message events
29const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
30
31/// Get the broadcast channel capacity from environment or use default
32fn get_message_broadcast_capacity() -> usize {
33    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
34        .ok()
35        .and_then(|s| s.parse().ok())
36        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
37}
38
39/// Message event types for real-time monitoring
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "protocol", content = "data")]
42#[serde(rename_all = "lowercase")]
43pub enum MessageEvent {
44    #[cfg(feature = "mqtt")]
45    /// MQTT message event
46    Mqtt(MqttMessageEvent),
47    #[cfg(feature = "kafka")]
48    /// Kafka message event
49    Kafka(KafkaMessageEvent),
50}
51
52#[cfg(feature = "mqtt")]
53/// MQTT message event for real-time monitoring
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct MqttMessageEvent {
56    /// MQTT topic name
57    pub topic: String,
58    /// Message payload content
59    pub payload: String,
60    /// Quality of Service level (0, 1, or 2)
61    pub qos: u8,
62    /// Whether the message is retained
63    pub retain: bool,
64    /// RFC3339 formatted timestamp
65    pub timestamp: String,
66}
67
68#[cfg(feature = "kafka")]
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct KafkaMessageEvent {
71    pub topic: String,
72    pub key: Option<String>,
73    pub value: String,
74    pub partition: i32,
75    pub offset: i64,
76    pub headers: Option<std::collections::HashMap<String, String>>,
77    pub timestamp: String,
78}
79
80/// Mock configuration representation
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MockConfig {
83    /// Unique identifier for the mock
84    #[serde(skip_serializing_if = "String::is_empty")]
85    pub id: String,
86    /// Human-readable name for the mock
87    pub name: String,
88    /// HTTP method (GET, POST, etc.)
89    pub method: String,
90    /// API path pattern to match
91    pub path: String,
92    /// Response configuration
93    pub response: MockResponse,
94    /// Whether this mock is currently enabled
95    #[serde(default = "default_true")]
96    pub enabled: bool,
97    /// Optional latency to inject in milliseconds
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub latency_ms: Option<u64>,
100    /// Optional HTTP status code override
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub status_code: Option<u16>,
103    /// Request matching criteria (headers, query params, body patterns)
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub request_match: Option<RequestMatchCriteria>,
106    /// Priority for mock ordering (higher priority mocks are matched first)
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub priority: Option<i32>,
109    /// Scenario name for stateful mocking
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub scenario: Option<String>,
112    /// Required scenario state for this mock to be active
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub required_scenario_state: Option<String>,
115    /// New scenario state after this mock is matched
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub new_scenario_state: Option<String>,
118}
119
120fn default_true() -> bool {
121    true
122}
123
124/// Mock response configuration
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct MockResponse {
127    /// Response body as JSON
128    pub body: serde_json::Value,
129    /// Optional custom response headers
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub headers: Option<std::collections::HashMap<String, String>>,
132}
133
134/// Request matching criteria for advanced request matching
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
136pub struct RequestMatchCriteria {
137    /// Headers that must be present and match (case-insensitive header names)
138    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
139    pub headers: std::collections::HashMap<String, String>,
140    /// Query parameters that must be present and match
141    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
142    pub query_params: std::collections::HashMap<String, String>,
143    /// Request body pattern (supports exact match or regex)
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub body_pattern: Option<String>,
146    /// JSONPath expression for JSON body matching
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub json_path: Option<String>,
149    /// XPath expression for XML body matching
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub xpath: Option<String>,
152    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub custom_matcher: Option<String>,
155}
156
157/// Check if a request matches the given mock configuration
158///
159/// This function implements comprehensive request matching including:
160/// - Method and path matching
161/// - Header matching (with regex support)
162/// - Query parameter matching
163/// - Body pattern matching (exact, regex, JSONPath, XPath)
164/// - Custom matcher expressions
165pub fn mock_matches_request(
166    mock: &MockConfig,
167    method: &str,
168    path: &str,
169    headers: &std::collections::HashMap<String, String>,
170    query_params: &std::collections::HashMap<String, String>,
171    body: Option<&[u8]>,
172) -> bool {
173    use regex::Regex;
174
175    // Check if mock is enabled
176    if !mock.enabled {
177        return false;
178    }
179
180    // Check method (case-insensitive)
181    if mock.method.to_uppercase() != method.to_uppercase() {
182        return false;
183    }
184
185    // Check path pattern (supports wildcards and path parameters)
186    if !path_matches_pattern(&mock.path, path) {
187        return false;
188    }
189
190    // Check request matching criteria if present
191    if let Some(criteria) = &mock.request_match {
192        // Check headers
193        for (key, expected_value) in &criteria.headers {
194            let header_key_lower = key.to_lowercase();
195            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
196
197            if let Some((_, actual_value)) = found {
198                // Try regex match first, then exact match
199                if let Ok(re) = Regex::new(expected_value) {
200                    if !re.is_match(actual_value) {
201                        return false;
202                    }
203                } else if actual_value != expected_value {
204                    return false;
205                }
206            } else {
207                return false; // Header not found
208            }
209        }
210
211        // Check query parameters
212        for (key, expected_value) in &criteria.query_params {
213            if let Some(actual_value) = query_params.get(key) {
214                if actual_value != expected_value {
215                    return false;
216                }
217            } else {
218                return false; // Query param not found
219            }
220        }
221
222        // Check body pattern
223        if let Some(pattern) = &criteria.body_pattern {
224            if let Some(body_bytes) = body {
225                let body_str = String::from_utf8_lossy(body_bytes);
226                // Try regex first, then exact match
227                if let Ok(re) = Regex::new(pattern) {
228                    if !re.is_match(&body_str) {
229                        return false;
230                    }
231                } else if body_str.as_ref() != pattern {
232                    return false;
233                }
234            } else {
235                return false; // Body required but not present
236            }
237        }
238
239        // Check JSONPath (simplified implementation)
240        if let Some(json_path) = &criteria.json_path {
241            if let Some(body_bytes) = body {
242                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
243                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
244                        // Simple JSONPath check
245                        if !json_path_exists(&json_value, json_path) {
246                            return false;
247                        }
248                    }
249                }
250            }
251        }
252
253        // Check XPath (placeholder - requires XML/XPath library for full implementation)
254        if let Some(_xpath) = &criteria.xpath {
255            // XPath matching would require an XML/XPath library
256            // For now, this is a placeholder that warns but doesn't fail
257            tracing::warn!("XPath matching not yet fully implemented");
258        }
259
260        // Check custom matcher
261        if let Some(custom) = &criteria.custom_matcher {
262            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
263                return false;
264            }
265        }
266    }
267
268    true
269}
270
271/// Check if a path matches a pattern (supports wildcards and path parameters)
272fn path_matches_pattern(pattern: &str, path: &str) -> bool {
273    // Exact match
274    if pattern == path {
275        return true;
276    }
277
278    // Wildcard match
279    if pattern == "*" {
280        return true;
281    }
282
283    // Path parameter matching (e.g., /users/{id} matches /users/123)
284    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
285    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
286
287    if pattern_parts.len() != path_parts.len() {
288        // Check for wildcard patterns
289        if pattern.contains('*') {
290            return matches_wildcard_pattern(pattern, path);
291        }
292        return false;
293    }
294
295    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
296        // Check for path parameters {param}
297        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
298            continue; // Matches any value
299        }
300
301        if pattern_part != path_part {
302            return false;
303        }
304    }
305
306    true
307}
308
309/// Check if path matches a wildcard pattern
310fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
311    use regex::Regex;
312
313    // Convert pattern to regex
314    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
315
316    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
317        return re.is_match(path);
318    }
319
320    false
321}
322
323/// Check if a JSONPath exists in a JSON value (simplified implementation)
324fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
325    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
326    // This handles simple paths like $.field or $.field.subfield
327    if let Some(path) = json_path.strip_prefix("$.") {
328        let parts: Vec<&str> = path.split('.').collect();
329
330        let mut current = json;
331        for part in parts {
332            if let Some(obj) = current.as_object() {
333                if let Some(value) = obj.get(part) {
334                    current = value;
335                } else {
336                    return false;
337                }
338            } else {
339                return false;
340            }
341        }
342        true
343    } else {
344        // For complex JSONPath expressions, would need a proper JSONPath library
345        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
346        false
347    }
348}
349
350/// Evaluate a custom matcher expression
351fn evaluate_custom_matcher(
352    expression: &str,
353    method: &str,
354    path: &str,
355    headers: &std::collections::HashMap<String, String>,
356    query_params: &std::collections::HashMap<String, String>,
357    body: Option<&[u8]>,
358) -> bool {
359    use regex::Regex;
360
361    let expr = expression.trim();
362
363    // Handle equality expressions (field == "value")
364    if expr.contains("==") {
365        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
366        if parts.len() != 2 {
367            return false;
368        }
369
370        let field = parts[0];
371        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
372
373        match field {
374            "method" => method == expected_value,
375            "path" => path == expected_value,
376            _ if field.starts_with("headers.") => {
377                let header_name = &field[8..];
378                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
379            }
380            _ if field.starts_with("query.") => {
381                let param_name = &field[6..];
382                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
383            }
384            _ => false,
385        }
386    }
387    // Handle regex match expressions (field =~ "pattern")
388    else if expr.contains("=~") {
389        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
390        if parts.len() != 2 {
391            return false;
392        }
393
394        let field = parts[0];
395        let pattern = parts[1].trim_matches('"').trim_matches('\'');
396
397        if let Ok(re) = Regex::new(pattern) {
398            match field {
399                "method" => re.is_match(method),
400                "path" => re.is_match(path),
401                _ if field.starts_with("headers.") => {
402                    let header_name = &field[8..];
403                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
404                }
405                _ if field.starts_with("query.") => {
406                    let param_name = &field[6..];
407                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
408                }
409                _ => false,
410            }
411        } else {
412            false
413        }
414    }
415    // Handle contains expressions (field contains "value")
416    else if expr.contains("contains") {
417        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
418        if parts.len() != 2 {
419            return false;
420        }
421
422        let field = parts[0];
423        let search_value = parts[1].trim_matches('"').trim_matches('\'');
424
425        match field {
426            "path" => path.contains(search_value),
427            _ if field.starts_with("headers.") => {
428                let header_name = &field[8..];
429                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
430            }
431            _ if field.starts_with("body") => {
432                if let Some(body_bytes) = body {
433                    let body_str = String::from_utf8_lossy(body_bytes);
434                    body_str.contains(search_value)
435                } else {
436                    false
437                }
438            }
439            _ => false,
440        }
441    } else {
442        // Unknown expression format
443        tracing::warn!("Unknown custom matcher expression format: {}", expr);
444        false
445    }
446}
447
448/// Server statistics
449#[derive(Debug, Clone, Serialize, Deserialize)]
450pub struct ServerStats {
451    /// Server uptime in seconds
452    pub uptime_seconds: u64,
453    /// Total number of requests processed
454    pub total_requests: u64,
455    /// Number of active mock configurations
456    pub active_mocks: usize,
457    /// Number of currently enabled mocks
458    pub enabled_mocks: usize,
459    /// Number of registered API routes
460    pub registered_routes: usize,
461}
462
463/// Server configuration info
464#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct ServerConfig {
466    /// MockForge version string
467    pub version: String,
468    /// Server port number
469    pub port: u16,
470    /// Whether an OpenAPI spec is loaded
471    pub has_openapi_spec: bool,
472    /// Optional path to the OpenAPI spec file
473    #[serde(skip_serializing_if = "Option::is_none")]
474    pub spec_path: Option<String>,
475}
476
477/// Shared state for the management API
478#[derive(Clone)]
479pub struct ManagementState {
480    /// Collection of mock configurations
481    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
482    /// Optional OpenAPI specification
483    pub spec: Option<Arc<OpenApiSpec>>,
484    /// Optional path to the OpenAPI spec file
485    pub spec_path: Option<String>,
486    /// Server port number
487    pub port: u16,
488    /// Server start time for uptime calculation
489    pub start_time: std::time::Instant,
490    /// Counter for total requests processed
491    pub request_counter: Arc<RwLock<u64>>,
492    /// Optional proxy configuration for migration pipeline
493    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
494    /// Optional SMTP registry for email mocking
495    #[cfg(feature = "smtp")]
496    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
497    /// Optional MQTT broker for message mocking
498    #[cfg(feature = "mqtt")]
499    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
500    /// Optional Kafka broker for event streaming
501    #[cfg(feature = "kafka")]
502    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
503    /// Broadcast channel for message events (MQTT & Kafka)
504    #[cfg(any(feature = "mqtt", feature = "kafka"))]
505    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
506    /// State machine manager for scenario state machines
507    pub state_machine_manager:
508        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
509    /// Optional WebSocket broadcast channel for real-time updates
510    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
511    /// Lifecycle hook registry for extensibility
512    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
513    /// Rule explanations storage (in-memory for now)
514    pub rule_explanations: Arc<
515        RwLock<
516            std::collections::HashMap<
517                String,
518                mockforge_core::intelligent_behavior::RuleExplanation,
519            >,
520        >,
521    >,
522    /// Optional chaos API state for chaos config management
523    #[cfg(feature = "chaos")]
524    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
525    /// Optional server configuration for profile application
526    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
527}
528
529impl ManagementState {
530    /// Create a new management state
531    ///
532    /// # Arguments
533    /// * `spec` - Optional OpenAPI specification
534    /// * `spec_path` - Optional path to the OpenAPI spec file
535    /// * `port` - Server port number
536    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
537        Self {
538            mocks: Arc::new(RwLock::new(Vec::new())),
539            spec,
540            spec_path,
541            port,
542            start_time: std::time::Instant::now(),
543            request_counter: Arc::new(RwLock::new(0)),
544            proxy_config: None,
545            #[cfg(feature = "smtp")]
546            smtp_registry: None,
547            #[cfg(feature = "mqtt")]
548            mqtt_broker: None,
549            #[cfg(feature = "kafka")]
550            kafka_broker: None,
551            #[cfg(any(feature = "mqtt", feature = "kafka"))]
552            message_events: {
553                let capacity = get_message_broadcast_capacity();
554                let (tx, _) = broadcast::channel(capacity);
555                Arc::new(tx)
556            },
557            state_machine_manager: Arc::new(RwLock::new(
558                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
559            )),
560            ws_broadcast: None,
561            lifecycle_hooks: None,
562            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
563            #[cfg(feature = "chaos")]
564            chaos_api_state: None,
565            server_config: None,
566        }
567    }
568
569    /// Add lifecycle hook registry to management state
570    pub fn with_lifecycle_hooks(
571        mut self,
572        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
573    ) -> Self {
574        self.lifecycle_hooks = Some(hooks);
575        self
576    }
577
578    /// Add WebSocket broadcast channel to management state
579    pub fn with_ws_broadcast(
580        mut self,
581        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
582    ) -> Self {
583        self.ws_broadcast = Some(ws_broadcast);
584        self
585    }
586
587    /// Add proxy configuration to management state
588    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
589        self.proxy_config = Some(proxy_config);
590        self
591    }
592
593    #[cfg(feature = "smtp")]
594    /// Add SMTP registry to management state
595    pub fn with_smtp_registry(
596        mut self,
597        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
598    ) -> Self {
599        self.smtp_registry = Some(smtp_registry);
600        self
601    }
602
603    #[cfg(feature = "mqtt")]
604    /// Add MQTT broker to management state
605    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
606        self.mqtt_broker = Some(mqtt_broker);
607        self
608    }
609
610    #[cfg(feature = "kafka")]
611    /// Add Kafka broker to management state
612    pub fn with_kafka_broker(
613        mut self,
614        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
615    ) -> Self {
616        self.kafka_broker = Some(kafka_broker);
617        self
618    }
619
620    #[cfg(feature = "chaos")]
621    /// Add chaos API state to management state
622    pub fn with_chaos_api_state(
623        mut self,
624        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
625    ) -> Self {
626        self.chaos_api_state = Some(chaos_api_state);
627        self
628    }
629
630    /// Add server configuration to management state
631    pub fn with_server_config(
632        mut self,
633        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
634    ) -> Self {
635        self.server_config = Some(server_config);
636        self
637    }
638}
639
640/// List all mocks
641async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
642    let mocks = state.mocks.read().await;
643    Json(serde_json::json!({
644        "mocks": *mocks,
645        "total": mocks.len(),
646        "enabled": mocks.iter().filter(|m| m.enabled).count()
647    }))
648}
649
650/// Get a specific mock by ID
651async fn get_mock(
652    State(state): State<ManagementState>,
653    Path(id): Path<String>,
654) -> Result<Json<MockConfig>, StatusCode> {
655    let mocks = state.mocks.read().await;
656    mocks
657        .iter()
658        .find(|m| m.id == id)
659        .cloned()
660        .map(Json)
661        .ok_or(StatusCode::NOT_FOUND)
662}
663
664/// Create a new mock
665async fn create_mock(
666    State(state): State<ManagementState>,
667    Json(mut mock): Json<MockConfig>,
668) -> Result<Json<MockConfig>, StatusCode> {
669    let mut mocks = state.mocks.write().await;
670
671    // Generate ID if not provided
672    if mock.id.is_empty() {
673        mock.id = uuid::Uuid::new_v4().to_string();
674    }
675
676    // Check for duplicate ID
677    if mocks.iter().any(|m| m.id == mock.id) {
678        return Err(StatusCode::CONFLICT);
679    }
680
681    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
682
683    // Invoke lifecycle hooks
684    if let Some(hooks) = &state.lifecycle_hooks {
685        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
686            id: mock.id.clone(),
687            name: mock.name.clone(),
688            config: serde_json::to_value(&mock).unwrap_or_default(),
689        };
690        hooks.invoke_mock_created(&event).await;
691    }
692
693    mocks.push(mock.clone());
694
695    // Broadcast WebSocket event
696    if let Some(tx) = &state.ws_broadcast {
697        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
698    }
699
700    Ok(Json(mock))
701}
702
703/// Update an existing mock
704async fn update_mock(
705    State(state): State<ManagementState>,
706    Path(id): Path<String>,
707    Json(updated_mock): Json<MockConfig>,
708) -> Result<Json<MockConfig>, StatusCode> {
709    let mut mocks = state.mocks.write().await;
710
711    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
712
713    // Get old mock for comparison
714    let old_mock = mocks[position].clone();
715
716    info!("Updating mock: {}", id);
717    mocks[position] = updated_mock.clone();
718
719    // Invoke lifecycle hooks
720    if let Some(hooks) = &state.lifecycle_hooks {
721        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
722            id: updated_mock.id.clone(),
723            name: updated_mock.name.clone(),
724            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
725        };
726        hooks.invoke_mock_updated(&event).await;
727
728        // Check if enabled state changed
729        if old_mock.enabled != updated_mock.enabled {
730            let state_event = if updated_mock.enabled {
731                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
732                    id: updated_mock.id.clone(),
733                }
734            } else {
735                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
736                    id: updated_mock.id.clone(),
737                }
738            };
739            hooks.invoke_mock_state_changed(&state_event).await;
740        }
741    }
742
743    // Broadcast WebSocket event
744    if let Some(tx) = &state.ws_broadcast {
745        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
746    }
747
748    Ok(Json(updated_mock))
749}
750
751/// Delete a mock
752async fn delete_mock(
753    State(state): State<ManagementState>,
754    Path(id): Path<String>,
755) -> Result<StatusCode, StatusCode> {
756    let mut mocks = state.mocks.write().await;
757
758    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
759
760    // Get mock info before deletion for lifecycle hooks
761    let deleted_mock = mocks[position].clone();
762
763    info!("Deleting mock: {}", id);
764    mocks.remove(position);
765
766    // Invoke lifecycle hooks
767    if let Some(hooks) = &state.lifecycle_hooks {
768        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
769            id: deleted_mock.id.clone(),
770            name: deleted_mock.name.clone(),
771        };
772        hooks.invoke_mock_deleted(&event).await;
773    }
774
775    // Broadcast WebSocket event
776    if let Some(tx) = &state.ws_broadcast {
777        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
778    }
779
780    Ok(StatusCode::NO_CONTENT)
781}
782
783/// Request to validate configuration
784#[derive(Debug, Deserialize)]
785pub struct ValidateConfigRequest {
786    /// Configuration to validate (as JSON)
787    pub config: serde_json::Value,
788    /// Format of the configuration ("json" or "yaml")
789    #[serde(default = "default_format")]
790    pub format: String,
791}
792
793fn default_format() -> String {
794    "json".to_string()
795}
796
797/// Validate configuration without applying it
798async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
799    use mockforge_core::config::ServerConfig;
800
801    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
802        "yaml" | "yml" => {
803            let yaml_str = match serde_json::to_string(&request.config) {
804                Ok(s) => s,
805                Err(e) => {
806                    return (
807                        StatusCode::BAD_REQUEST,
808                        Json(serde_json::json!({
809                            "valid": false,
810                            "error": format!("Failed to convert to string: {}", e),
811                            "message": "Configuration validation failed"
812                        })),
813                    )
814                        .into_response();
815                }
816            };
817            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
818        }
819        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
820    };
821
822    match config_result {
823        Ok(_) => Json(serde_json::json!({
824            "valid": true,
825            "message": "Configuration is valid"
826        }))
827        .into_response(),
828        Err(e) => (
829            StatusCode::BAD_REQUEST,
830            Json(serde_json::json!({
831                "valid": false,
832                "error": format!("Invalid configuration: {}", e),
833                "message": "Configuration validation failed"
834            })),
835        )
836            .into_response(),
837    }
838}
839
840/// Request for bulk configuration update
841#[derive(Debug, Deserialize)]
842pub struct BulkConfigUpdateRequest {
843    /// Partial configuration updates (only specified fields will be updated)
844    pub updates: serde_json::Value,
845}
846
847/// Bulk update configuration
848///
849/// This endpoint allows updating multiple configuration options at once.
850/// Only the specified fields in the updates object will be modified.
851///
852/// Configuration updates are applied to the server configuration if available
853/// in ManagementState. Changes take effect immediately for supported settings.
854async fn bulk_update_config(
855    State(state): State<ManagementState>,
856    Json(request): Json<BulkConfigUpdateRequest>,
857) -> impl IntoResponse {
858    // Validate the updates structure
859    if !request.updates.is_object() {
860        return (
861            StatusCode::BAD_REQUEST,
862            Json(serde_json::json!({
863                "error": "Invalid request",
864                "message": "Updates must be a JSON object"
865            })),
866        )
867            .into_response();
868    }
869
870    // Try to validate as partial ServerConfig
871    use mockforge_core::config::ServerConfig;
872
873    // Create a minimal valid config and try to merge updates
874    let base_config = ServerConfig::default();
875    let base_json = match serde_json::to_value(&base_config) {
876        Ok(v) => v,
877        Err(e) => {
878            return (
879                StatusCode::INTERNAL_SERVER_ERROR,
880                Json(serde_json::json!({
881                    "error": "Internal error",
882                    "message": format!("Failed to serialize base config: {}", e)
883                })),
884            )
885                .into_response();
886        }
887    };
888
889    // Merge updates into base config (simplified merge)
890    let mut merged = base_json.clone();
891    if let (Some(merged_obj), Some(updates_obj)) =
892        (merged.as_object_mut(), request.updates.as_object())
893    {
894        for (key, value) in updates_obj {
895            merged_obj.insert(key.clone(), value.clone());
896        }
897    }
898
899    // Validate the merged config
900    match serde_json::from_value::<ServerConfig>(merged) {
901        Ok(_) => {
902            // Config is valid
903            // Note: Runtime application of config changes would require:
904            // 1. Storing ServerConfig in ManagementState
905            // 2. Implementing hot-reload mechanism for server configuration
906            // 3. Updating router state and middleware based on new config
907            // For now, this endpoint only validates the configuration structure
908            Json(serde_json::json!({
909                "success": true,
910                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
911                "updates_received": request.updates,
912                "validated": true
913            }))
914            .into_response()
915        }
916        Err(e) => (
917            StatusCode::BAD_REQUEST,
918            Json(serde_json::json!({
919                "error": "Invalid configuration",
920                "message": format!("Configuration validation failed: {}", e),
921                "validated": false
922            })),
923        )
924            .into_response(),
925    }
926}
927
928/// Get server statistics
929async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
930    let mocks = state.mocks.read().await;
931    let request_count = *state.request_counter.read().await;
932
933    Json(ServerStats {
934        uptime_seconds: state.start_time.elapsed().as_secs(),
935        total_requests: request_count,
936        active_mocks: mocks.len(),
937        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
938        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
939    })
940}
941
942/// Get server configuration
943async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
944    Json(ServerConfig {
945        version: env!("CARGO_PKG_VERSION").to_string(),
946        port: state.port,
947        has_openapi_spec: state.spec.is_some(),
948        spec_path: state.spec_path.clone(),
949    })
950}
951
952/// Health check endpoint
953async fn health_check() -> Json<serde_json::Value> {
954    Json(serde_json::json!({
955        "status": "healthy",
956        "service": "mockforge-management",
957        "timestamp": chrono::Utc::now().to_rfc3339()
958    }))
959}
960
961/// Export format for mock configurations
962#[derive(Debug, Clone, Serialize, Deserialize)]
963#[serde(rename_all = "lowercase")]
964pub enum ExportFormat {
965    /// JSON format
966    Json,
967    /// YAML format
968    Yaml,
969}
970
971/// Export mocks in specified format
972async fn export_mocks(
973    State(state): State<ManagementState>,
974    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
975) -> Result<(StatusCode, String), StatusCode> {
976    let mocks = state.mocks.read().await;
977
978    let format = params
979        .get("format")
980        .map(|f| match f.as_str() {
981            "yaml" | "yml" => ExportFormat::Yaml,
982            _ => ExportFormat::Json,
983        })
984        .unwrap_or(ExportFormat::Json);
985
986    match format {
987        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
988            .map(|json| (StatusCode::OK, json))
989            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
990        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
991            .map(|yaml| (StatusCode::OK, yaml))
992            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
993    }
994}
995
996/// Import mocks from JSON/YAML
997async fn import_mocks(
998    State(state): State<ManagementState>,
999    Json(mocks): Json<Vec<MockConfig>>,
1000) -> impl IntoResponse {
1001    let mut current_mocks = state.mocks.write().await;
1002    current_mocks.clear();
1003    current_mocks.extend(mocks);
1004    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1005}
1006
1007#[cfg(feature = "smtp")]
1008/// List SMTP emails in mailbox
1009async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1010    if let Some(ref smtp_registry) = state.smtp_registry {
1011        match smtp_registry.get_emails() {
1012            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1013            Err(e) => (
1014                StatusCode::INTERNAL_SERVER_ERROR,
1015                Json(serde_json::json!({
1016                    "error": "Failed to retrieve emails",
1017                    "message": e.to_string()
1018                })),
1019            ),
1020        }
1021    } else {
1022        (
1023            StatusCode::NOT_IMPLEMENTED,
1024            Json(serde_json::json!({
1025                "error": "SMTP mailbox management not available",
1026                "message": "SMTP server is not enabled or registry not available."
1027            })),
1028        )
1029    }
1030}
1031
1032/// Get specific SMTP email
1033#[cfg(feature = "smtp")]
1034async fn get_smtp_email(
1035    State(state): State<ManagementState>,
1036    Path(id): Path<String>,
1037) -> impl IntoResponse {
1038    if let Some(ref smtp_registry) = state.smtp_registry {
1039        match smtp_registry.get_email_by_id(&id) {
1040            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1041            Ok(None) => (
1042                StatusCode::NOT_FOUND,
1043                Json(serde_json::json!({
1044                    "error": "Email not found",
1045                    "id": id
1046                })),
1047            ),
1048            Err(e) => (
1049                StatusCode::INTERNAL_SERVER_ERROR,
1050                Json(serde_json::json!({
1051                    "error": "Failed to retrieve email",
1052                    "message": e.to_string()
1053                })),
1054            ),
1055        }
1056    } else {
1057        (
1058            StatusCode::NOT_IMPLEMENTED,
1059            Json(serde_json::json!({
1060                "error": "SMTP mailbox management not available",
1061                "message": "SMTP server is not enabled or registry not available."
1062            })),
1063        )
1064    }
1065}
1066
1067/// Clear SMTP mailbox
1068#[cfg(feature = "smtp")]
1069async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1070    if let Some(ref smtp_registry) = state.smtp_registry {
1071        match smtp_registry.clear_mailbox() {
1072            Ok(()) => (
1073                StatusCode::OK,
1074                Json(serde_json::json!({
1075                    "message": "Mailbox cleared successfully"
1076                })),
1077            ),
1078            Err(e) => (
1079                StatusCode::INTERNAL_SERVER_ERROR,
1080                Json(serde_json::json!({
1081                    "error": "Failed to clear mailbox",
1082                    "message": e.to_string()
1083                })),
1084            ),
1085        }
1086    } else {
1087        (
1088            StatusCode::NOT_IMPLEMENTED,
1089            Json(serde_json::json!({
1090                "error": "SMTP mailbox management not available",
1091                "message": "SMTP server is not enabled or registry not available."
1092            })),
1093        )
1094    }
1095}
1096
1097/// Export SMTP mailbox
1098#[cfg(feature = "smtp")]
1099async fn export_smtp_mailbox(
1100    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1101) -> impl IntoResponse {
1102    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1103    (
1104        StatusCode::NOT_IMPLEMENTED,
1105        Json(serde_json::json!({
1106            "error": "SMTP mailbox management not available via HTTP API",
1107            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1108            "requested_format": format
1109        })),
1110    )
1111}
1112
1113/// Search SMTP emails
1114#[cfg(feature = "smtp")]
1115async fn search_smtp_emails(
1116    State(state): State<ManagementState>,
1117    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1118) -> impl IntoResponse {
1119    if let Some(ref smtp_registry) = state.smtp_registry {
1120        let filters = EmailSearchFilters {
1121            sender: params.get("sender").cloned(),
1122            recipient: params.get("recipient").cloned(),
1123            subject: params.get("subject").cloned(),
1124            body: params.get("body").cloned(),
1125            since: params
1126                .get("since")
1127                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1128                .map(|dt| dt.with_timezone(&chrono::Utc)),
1129            until: params
1130                .get("until")
1131                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1132                .map(|dt| dt.with_timezone(&chrono::Utc)),
1133            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1134            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1135        };
1136
1137        match smtp_registry.search_emails(filters) {
1138            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1139            Err(e) => (
1140                StatusCode::INTERNAL_SERVER_ERROR,
1141                Json(serde_json::json!({
1142                    "error": "Failed to search emails",
1143                    "message": e.to_string()
1144                })),
1145            ),
1146        }
1147    } else {
1148        (
1149            StatusCode::NOT_IMPLEMENTED,
1150            Json(serde_json::json!({
1151                "error": "SMTP mailbox management not available",
1152                "message": "SMTP server is not enabled or registry not available."
1153            })),
1154        )
1155    }
1156}
1157
1158/// MQTT broker statistics
1159#[cfg(feature = "mqtt")]
1160#[derive(Debug, Clone, Serialize, Deserialize)]
1161pub struct MqttBrokerStats {
1162    /// Number of connected MQTT clients
1163    pub connected_clients: usize,
1164    /// Number of active MQTT topics
1165    pub active_topics: usize,
1166    /// Number of retained messages
1167    pub retained_messages: usize,
1168    /// Total number of subscriptions
1169    pub total_subscriptions: usize,
1170}
1171
1172/// MQTT management handlers
1173#[cfg(feature = "mqtt")]
1174async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1175    if let Some(broker) = &state.mqtt_broker {
1176        let connected_clients = broker.get_connected_clients().await.len();
1177        let active_topics = broker.get_active_topics().await.len();
1178        let stats = broker.get_topic_stats().await;
1179
1180        let broker_stats = MqttBrokerStats {
1181            connected_clients,
1182            active_topics,
1183            retained_messages: stats.retained_messages,
1184            total_subscriptions: stats.total_subscriptions,
1185        };
1186
1187        Json(broker_stats).into_response()
1188    } else {
1189        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1190    }
1191}
1192
1193#[cfg(feature = "mqtt")]
1194async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1195    if let Some(broker) = &state.mqtt_broker {
1196        let clients = broker.get_connected_clients().await;
1197        Json(serde_json::json!({
1198            "clients": clients
1199        }))
1200        .into_response()
1201    } else {
1202        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1203    }
1204}
1205
1206#[cfg(feature = "mqtt")]
1207async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1208    if let Some(broker) = &state.mqtt_broker {
1209        let topics = broker.get_active_topics().await;
1210        Json(serde_json::json!({
1211            "topics": topics
1212        }))
1213        .into_response()
1214    } else {
1215        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1216    }
1217}
1218
1219#[cfg(feature = "mqtt")]
1220async fn disconnect_mqtt_client(
1221    State(state): State<ManagementState>,
1222    Path(client_id): Path<String>,
1223) -> impl IntoResponse {
1224    if let Some(broker) = &state.mqtt_broker {
1225        match broker.disconnect_client(&client_id).await {
1226            Ok(_) => {
1227                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1228            }
1229            Err(e) => {
1230                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1231                    .into_response()
1232            }
1233        }
1234    } else {
1235        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1236    }
1237}
1238
1239// ========== MQTT Publish Handler ==========
1240
1241#[cfg(feature = "mqtt")]
1242/// Request to publish a single MQTT message
1243#[derive(Debug, Deserialize)]
1244pub struct MqttPublishRequest {
1245    /// Topic to publish to
1246    pub topic: String,
1247    /// Message payload (string or JSON)
1248    pub payload: String,
1249    /// QoS level (0, 1, or 2)
1250    #[serde(default = "default_qos")]
1251    pub qos: u8,
1252    /// Whether to retain the message
1253    #[serde(default)]
1254    pub retain: bool,
1255}
1256
1257#[cfg(feature = "mqtt")]
1258fn default_qos() -> u8 {
1259    0
1260}
1261
1262#[cfg(feature = "mqtt")]
1263/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1264async fn publish_mqtt_message_handler(
1265    State(state): State<ManagementState>,
1266    Json(request): Json<serde_json::Value>,
1267) -> impl IntoResponse {
1268    // Extract fields from JSON manually
1269    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1270    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1271    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1272    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1273
1274    if topic.is_none() || payload.is_none() {
1275        return (
1276            StatusCode::BAD_REQUEST,
1277            Json(serde_json::json!({
1278                "error": "Invalid request",
1279                "message": "Missing required fields: topic and payload"
1280            })),
1281        );
1282    }
1283
1284    let topic = topic.unwrap();
1285    let payload = payload.unwrap();
1286
1287    if let Some(broker) = &state.mqtt_broker {
1288        // Validate QoS
1289        if qos > 2 {
1290            return (
1291                StatusCode::BAD_REQUEST,
1292                Json(serde_json::json!({
1293                    "error": "Invalid QoS",
1294                    "message": "QoS must be 0, 1, or 2"
1295                })),
1296            );
1297        }
1298
1299        // Convert payload to bytes
1300        let payload_bytes = payload.as_bytes().to_vec();
1301        let client_id = "mockforge-management-api".to_string();
1302
1303        let publish_result = broker
1304            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1305            .await
1306            .map_err(|e| format!("{}", e));
1307
1308        match publish_result {
1309            Ok(_) => {
1310                // Emit message event for real-time monitoring
1311                let event = MessageEvent::Mqtt(MqttMessageEvent {
1312                    topic: topic.clone(),
1313                    payload: payload.clone(),
1314                    qos,
1315                    retain,
1316                    timestamp: chrono::Utc::now().to_rfc3339(),
1317                });
1318                let _ = state.message_events.send(event);
1319
1320                (
1321                    StatusCode::OK,
1322                    Json(serde_json::json!({
1323                        "success": true,
1324                        "message": format!("Message published to topic '{}'", topic),
1325                        "topic": topic,
1326                        "qos": qos,
1327                        "retain": retain
1328                    })),
1329                )
1330            }
1331            Err(error_msg) => (
1332                StatusCode::INTERNAL_SERVER_ERROR,
1333                Json(serde_json::json!({
1334                    "error": "Failed to publish message",
1335                    "message": error_msg
1336                })),
1337            ),
1338        }
1339    } else {
1340        (
1341            StatusCode::SERVICE_UNAVAILABLE,
1342            Json(serde_json::json!({
1343                "error": "MQTT broker not available",
1344                "message": "MQTT broker is not enabled or not available."
1345            })),
1346        )
1347    }
1348}
1349
1350#[cfg(not(feature = "mqtt"))]
1351/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1352async fn publish_mqtt_message_handler(
1353    State(_state): State<ManagementState>,
1354    Json(_request): Json<serde_json::Value>,
1355) -> impl IntoResponse {
1356    (
1357        StatusCode::SERVICE_UNAVAILABLE,
1358        Json(serde_json::json!({
1359            "error": "MQTT feature not enabled",
1360            "message": "MQTT support is not compiled into this build"
1361        })),
1362    )
1363}
1364
1365#[cfg(feature = "mqtt")]
1366/// Request to publish multiple MQTT messages
1367#[derive(Debug, Deserialize)]
1368pub struct MqttBatchPublishRequest {
1369    /// List of messages to publish
1370    pub messages: Vec<MqttPublishRequest>,
1371    /// Delay between messages in milliseconds
1372    #[serde(default = "default_delay")]
1373    pub delay_ms: u64,
1374}
1375
1376#[cfg(feature = "mqtt")]
1377fn default_delay() -> u64 {
1378    100
1379}
1380
1381#[cfg(feature = "mqtt")]
1382/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1383async fn publish_mqtt_batch_handler(
1384    State(state): State<ManagementState>,
1385    Json(request): Json<serde_json::Value>,
1386) -> impl IntoResponse {
1387    // Extract fields from JSON manually
1388    let messages_json = request.get("messages").and_then(|v| v.as_array());
1389    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1390
1391    if messages_json.is_none() {
1392        return (
1393            StatusCode::BAD_REQUEST,
1394            Json(serde_json::json!({
1395                "error": "Invalid request",
1396                "message": "Missing required field: messages"
1397            })),
1398        );
1399    }
1400
1401    let messages_json = messages_json.unwrap();
1402
1403    if let Some(broker) = &state.mqtt_broker {
1404        if messages_json.is_empty() {
1405            return (
1406                StatusCode::BAD_REQUEST,
1407                Json(serde_json::json!({
1408                    "error": "Empty batch",
1409                    "message": "At least one message is required"
1410                })),
1411            );
1412        }
1413
1414        let mut results = Vec::new();
1415        let client_id = "mockforge-management-api".to_string();
1416
1417        for (index, msg_json) in messages_json.iter().enumerate() {
1418            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1419            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1420            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1421            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1422
1423            if topic.is_none() || payload.is_none() {
1424                results.push(serde_json::json!({
1425                    "index": index,
1426                    "success": false,
1427                    "error": "Missing required fields: topic and payload"
1428                }));
1429                continue;
1430            }
1431
1432            let topic = topic.unwrap();
1433            let payload = payload.unwrap();
1434
1435            // Validate QoS
1436            if qos > 2 {
1437                results.push(serde_json::json!({
1438                    "index": index,
1439                    "success": false,
1440                    "error": "Invalid QoS (must be 0, 1, or 2)"
1441                }));
1442                continue;
1443            }
1444
1445            // Convert payload to bytes
1446            let payload_bytes = payload.as_bytes().to_vec();
1447
1448            let publish_result = broker
1449                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1450                .await
1451                .map_err(|e| format!("{}", e));
1452
1453            match publish_result {
1454                Ok(_) => {
1455                    // Emit message event
1456                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1457                        topic: topic.clone(),
1458                        payload: payload.clone(),
1459                        qos,
1460                        retain,
1461                        timestamp: chrono::Utc::now().to_rfc3339(),
1462                    });
1463                    let _ = state.message_events.send(event);
1464
1465                    results.push(serde_json::json!({
1466                        "index": index,
1467                        "success": true,
1468                        "topic": topic,
1469                        "qos": qos
1470                    }));
1471                }
1472                Err(error_msg) => {
1473                    results.push(serde_json::json!({
1474                        "index": index,
1475                        "success": false,
1476                        "error": error_msg
1477                    }));
1478                }
1479            }
1480
1481            // Add delay between messages (except for the last one)
1482            if index < messages_json.len() - 1 && delay_ms > 0 {
1483                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1484            }
1485        }
1486
1487        let success_count =
1488            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1489
1490        (
1491            StatusCode::OK,
1492            Json(serde_json::json!({
1493                "success": true,
1494                "total": messages_json.len(),
1495                "succeeded": success_count,
1496                "failed": messages_json.len() - success_count,
1497                "results": results
1498            })),
1499        )
1500    } else {
1501        (
1502            StatusCode::SERVICE_UNAVAILABLE,
1503            Json(serde_json::json!({
1504                "error": "MQTT broker not available",
1505                "message": "MQTT broker is not enabled or not available."
1506            })),
1507        )
1508    }
1509}
1510
1511#[cfg(not(feature = "mqtt"))]
1512/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1513async fn publish_mqtt_batch_handler(
1514    State(_state): State<ManagementState>,
1515    Json(_request): Json<serde_json::Value>,
1516) -> impl IntoResponse {
1517    (
1518        StatusCode::SERVICE_UNAVAILABLE,
1519        Json(serde_json::json!({
1520            "error": "MQTT feature not enabled",
1521            "message": "MQTT support is not compiled into this build"
1522        })),
1523    )
1524}
1525
1526// Migration pipeline handlers
1527
1528/// Request to set migration mode
1529#[derive(Debug, Deserialize)]
1530struct SetMigrationModeRequest {
1531    mode: String,
1532}
1533
1534/// Get all migration routes
1535async fn get_migration_routes(
1536    State(state): State<ManagementState>,
1537) -> Result<Json<serde_json::Value>, StatusCode> {
1538    let proxy_config = match &state.proxy_config {
1539        Some(config) => config,
1540        None => {
1541            return Ok(Json(serde_json::json!({
1542                "error": "Migration not configured. Proxy config not available."
1543            })));
1544        }
1545    };
1546
1547    let config = proxy_config.read().await;
1548    let routes = config.get_migration_routes();
1549
1550    Ok(Json(serde_json::json!({
1551        "routes": routes
1552    })))
1553}
1554
1555/// Toggle a route's migration mode
1556async fn toggle_route_migration(
1557    State(state): State<ManagementState>,
1558    Path(pattern): Path<String>,
1559) -> Result<Json<serde_json::Value>, StatusCode> {
1560    let proxy_config = match &state.proxy_config {
1561        Some(config) => config,
1562        None => {
1563            return Ok(Json(serde_json::json!({
1564                "error": "Migration not configured. Proxy config not available."
1565            })));
1566        }
1567    };
1568
1569    let mut config = proxy_config.write().await;
1570    let new_mode = match config.toggle_route_migration(&pattern) {
1571        Some(mode) => mode,
1572        None => {
1573            return Ok(Json(serde_json::json!({
1574                "error": format!("Route pattern not found: {}", pattern)
1575            })));
1576        }
1577    };
1578
1579    Ok(Json(serde_json::json!({
1580        "pattern": pattern,
1581        "mode": format!("{:?}", new_mode).to_lowercase()
1582    })))
1583}
1584
1585/// Set a route's migration mode explicitly
1586async fn set_route_migration_mode(
1587    State(state): State<ManagementState>,
1588    Path(pattern): Path<String>,
1589    Json(request): Json<SetMigrationModeRequest>,
1590) -> Result<Json<serde_json::Value>, StatusCode> {
1591    let proxy_config = match &state.proxy_config {
1592        Some(config) => config,
1593        None => {
1594            return Ok(Json(serde_json::json!({
1595                "error": "Migration not configured. Proxy config not available."
1596            })));
1597        }
1598    };
1599
1600    use mockforge_core::proxy::config::MigrationMode;
1601    let mode = match request.mode.to_lowercase().as_str() {
1602        "mock" => MigrationMode::Mock,
1603        "shadow" => MigrationMode::Shadow,
1604        "real" => MigrationMode::Real,
1605        "auto" => MigrationMode::Auto,
1606        _ => {
1607            return Ok(Json(serde_json::json!({
1608                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1609            })));
1610        }
1611    };
1612
1613    let mut config = proxy_config.write().await;
1614    let updated = config.update_rule_migration_mode(&pattern, mode);
1615
1616    if !updated {
1617        return Ok(Json(serde_json::json!({
1618            "error": format!("Route pattern not found: {}", pattern)
1619        })));
1620    }
1621
1622    Ok(Json(serde_json::json!({
1623        "pattern": pattern,
1624        "mode": format!("{:?}", mode).to_lowercase()
1625    })))
1626}
1627
1628/// Toggle a group's migration mode
1629async fn toggle_group_migration(
1630    State(state): State<ManagementState>,
1631    Path(group): Path<String>,
1632) -> Result<Json<serde_json::Value>, StatusCode> {
1633    let proxy_config = match &state.proxy_config {
1634        Some(config) => config,
1635        None => {
1636            return Ok(Json(serde_json::json!({
1637                "error": "Migration not configured. Proxy config not available."
1638            })));
1639        }
1640    };
1641
1642    let mut config = proxy_config.write().await;
1643    let new_mode = config.toggle_group_migration(&group);
1644
1645    Ok(Json(serde_json::json!({
1646        "group": group,
1647        "mode": format!("{:?}", new_mode).to_lowercase()
1648    })))
1649}
1650
1651/// Set a group's migration mode explicitly
1652async fn set_group_migration_mode(
1653    State(state): State<ManagementState>,
1654    Path(group): Path<String>,
1655    Json(request): Json<SetMigrationModeRequest>,
1656) -> Result<Json<serde_json::Value>, StatusCode> {
1657    let proxy_config = match &state.proxy_config {
1658        Some(config) => config,
1659        None => {
1660            return Ok(Json(serde_json::json!({
1661                "error": "Migration not configured. Proxy config not available."
1662            })));
1663        }
1664    };
1665
1666    use mockforge_core::proxy::config::MigrationMode;
1667    let mode = match request.mode.to_lowercase().as_str() {
1668        "mock" => MigrationMode::Mock,
1669        "shadow" => MigrationMode::Shadow,
1670        "real" => MigrationMode::Real,
1671        "auto" => MigrationMode::Auto,
1672        _ => {
1673            return Ok(Json(serde_json::json!({
1674                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1675            })));
1676        }
1677    };
1678
1679    let mut config = proxy_config.write().await;
1680    config.update_group_migration_mode(&group, mode);
1681
1682    Ok(Json(serde_json::json!({
1683        "group": group,
1684        "mode": format!("{:?}", mode).to_lowercase()
1685    })))
1686}
1687
1688/// Get all migration groups
1689async fn get_migration_groups(
1690    State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692    let proxy_config = match &state.proxy_config {
1693        Some(config) => config,
1694        None => {
1695            return Ok(Json(serde_json::json!({
1696                "error": "Migration not configured. Proxy config not available."
1697            })));
1698        }
1699    };
1700
1701    let config = proxy_config.read().await;
1702    let groups = config.get_migration_groups();
1703
1704    // Convert to JSON-serializable format
1705    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1706        .into_iter()
1707        .map(|(name, info)| {
1708            (
1709                name,
1710                serde_json::json!({
1711                    "name": info.name,
1712                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1713                    "route_count": info.route_count
1714                }),
1715            )
1716        })
1717        .collect();
1718
1719    Ok(Json(serde_json::json!(groups_json)))
1720}
1721
1722/// Get overall migration status
1723async fn get_migration_status(
1724    State(state): State<ManagementState>,
1725) -> Result<Json<serde_json::Value>, StatusCode> {
1726    let proxy_config = match &state.proxy_config {
1727        Some(config) => config,
1728        None => {
1729            return Ok(Json(serde_json::json!({
1730                "error": "Migration not configured. Proxy config not available."
1731            })));
1732        }
1733    };
1734
1735    let config = proxy_config.read().await;
1736    let routes = config.get_migration_routes();
1737    let groups = config.get_migration_groups();
1738
1739    let mut mock_count = 0;
1740    let mut shadow_count = 0;
1741    let mut real_count = 0;
1742    let mut auto_count = 0;
1743
1744    for route in &routes {
1745        match route.migration_mode {
1746            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1747            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1748            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1749            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1750        }
1751    }
1752
1753    Ok(Json(serde_json::json!({
1754        "total_routes": routes.len(),
1755        "mock_routes": mock_count,
1756        "shadow_routes": shadow_count,
1757        "real_routes": real_count,
1758        "auto_routes": auto_count,
1759        "total_groups": groups.len(),
1760        "migration_enabled": config.migration_enabled
1761    })))
1762}
1763
1764// ========== Proxy Replacement Rules Management ==========
1765
1766/// Request body for creating/updating proxy replacement rules
1767#[derive(Debug, Deserialize, Serialize)]
1768pub struct ProxyRuleRequest {
1769    /// URL pattern to match (supports wildcards like "/api/users/*")
1770    pub pattern: String,
1771    /// Rule type: "request" or "response"
1772    #[serde(rename = "type")]
1773    pub rule_type: String,
1774    /// Optional status code filter for response rules
1775    #[serde(default)]
1776    pub status_codes: Vec<u16>,
1777    /// Body transformations to apply
1778    pub body_transforms: Vec<BodyTransformRequest>,
1779    /// Whether this rule is enabled
1780    #[serde(default = "default_true")]
1781    pub enabled: bool,
1782}
1783
1784/// Request body for individual body transformations
1785#[derive(Debug, Deserialize, Serialize)]
1786pub struct BodyTransformRequest {
1787    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1788    pub path: String,
1789    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1790    pub replace: String,
1791    /// Operation to perform: "replace", "add", or "remove"
1792    #[serde(default)]
1793    pub operation: String,
1794}
1795
1796/// Response format for proxy rules
1797#[derive(Debug, Serialize)]
1798pub struct ProxyRuleResponse {
1799    /// Rule ID (index in the array)
1800    pub id: usize,
1801    /// URL pattern
1802    pub pattern: String,
1803    /// Rule type
1804    #[serde(rename = "type")]
1805    pub rule_type: String,
1806    /// Status codes (for response rules)
1807    pub status_codes: Vec<u16>,
1808    /// Body transformations
1809    pub body_transforms: Vec<BodyTransformRequest>,
1810    /// Whether enabled
1811    pub enabled: bool,
1812}
1813
1814/// List all proxy replacement rules
1815async fn list_proxy_rules(
1816    State(state): State<ManagementState>,
1817) -> Result<Json<serde_json::Value>, StatusCode> {
1818    let proxy_config = match &state.proxy_config {
1819        Some(config) => config,
1820        None => {
1821            return Ok(Json(serde_json::json!({
1822                "error": "Proxy not configured. Proxy config not available."
1823            })));
1824        }
1825    };
1826
1827    let config = proxy_config.read().await;
1828
1829    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1830
1831    // Add request replacement rules
1832    for (idx, rule) in config.request_replacements.iter().enumerate() {
1833        rules.push(ProxyRuleResponse {
1834            id: idx,
1835            pattern: rule.pattern.clone(),
1836            rule_type: "request".to_string(),
1837            status_codes: Vec::new(),
1838            body_transforms: rule
1839                .body_transforms
1840                .iter()
1841                .map(|t| BodyTransformRequest {
1842                    path: t.path.clone(),
1843                    replace: t.replace.clone(),
1844                    operation: format!("{:?}", t.operation).to_lowercase(),
1845                })
1846                .collect(),
1847            enabled: rule.enabled,
1848        });
1849    }
1850
1851    // Add response replacement rules
1852    let request_count = config.request_replacements.len();
1853    for (idx, rule) in config.response_replacements.iter().enumerate() {
1854        rules.push(ProxyRuleResponse {
1855            id: request_count + idx,
1856            pattern: rule.pattern.clone(),
1857            rule_type: "response".to_string(),
1858            status_codes: rule.status_codes.clone(),
1859            body_transforms: rule
1860                .body_transforms
1861                .iter()
1862                .map(|t| BodyTransformRequest {
1863                    path: t.path.clone(),
1864                    replace: t.replace.clone(),
1865                    operation: format!("{:?}", t.operation).to_lowercase(),
1866                })
1867                .collect(),
1868            enabled: rule.enabled,
1869        });
1870    }
1871
1872    Ok(Json(serde_json::json!({
1873        "rules": rules
1874    })))
1875}
1876
1877/// Create a new proxy replacement rule
1878async fn create_proxy_rule(
1879    State(state): State<ManagementState>,
1880    Json(request): Json<ProxyRuleRequest>,
1881) -> Result<Json<serde_json::Value>, StatusCode> {
1882    let proxy_config = match &state.proxy_config {
1883        Some(config) => config,
1884        None => {
1885            return Ok(Json(serde_json::json!({
1886                "error": "Proxy not configured. Proxy config not available."
1887            })));
1888        }
1889    };
1890
1891    // Validate request
1892    if request.body_transforms.is_empty() {
1893        return Ok(Json(serde_json::json!({
1894            "error": "At least one body transform is required"
1895        })));
1896    }
1897
1898    let body_transforms: Vec<BodyTransform> = request
1899        .body_transforms
1900        .iter()
1901        .map(|t| {
1902            let op = match t.operation.as_str() {
1903                "replace" => TransformOperation::Replace,
1904                "add" => TransformOperation::Add,
1905                "remove" => TransformOperation::Remove,
1906                _ => TransformOperation::Replace,
1907            };
1908            BodyTransform {
1909                path: t.path.clone(),
1910                replace: t.replace.clone(),
1911                operation: op,
1912            }
1913        })
1914        .collect();
1915
1916    let new_rule = BodyTransformRule {
1917        pattern: request.pattern.clone(),
1918        status_codes: request.status_codes.clone(),
1919        body_transforms,
1920        enabled: request.enabled,
1921    };
1922
1923    let mut config = proxy_config.write().await;
1924
1925    let rule_id = if request.rule_type == "request" {
1926        config.request_replacements.push(new_rule);
1927        config.request_replacements.len() - 1
1928    } else if request.rule_type == "response" {
1929        config.response_replacements.push(new_rule);
1930        config.request_replacements.len() + config.response_replacements.len() - 1
1931    } else {
1932        return Ok(Json(serde_json::json!({
1933            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1934        })));
1935    };
1936
1937    Ok(Json(serde_json::json!({
1938        "id": rule_id,
1939        "message": "Rule created successfully"
1940    })))
1941}
1942
1943/// Get a specific proxy replacement rule
1944async fn get_proxy_rule(
1945    State(state): State<ManagementState>,
1946    Path(id): Path<String>,
1947) -> Result<Json<serde_json::Value>, StatusCode> {
1948    let proxy_config = match &state.proxy_config {
1949        Some(config) => config,
1950        None => {
1951            return Ok(Json(serde_json::json!({
1952                "error": "Proxy not configured. Proxy config not available."
1953            })));
1954        }
1955    };
1956
1957    let config = proxy_config.read().await;
1958    let rule_id: usize = match id.parse() {
1959        Ok(id) => id,
1960        Err(_) => {
1961            return Ok(Json(serde_json::json!({
1962                "error": format!("Invalid rule ID: {}", id)
1963            })));
1964        }
1965    };
1966
1967    let request_count = config.request_replacements.len();
1968
1969    if rule_id < request_count {
1970        // Request rule
1971        let rule = &config.request_replacements[rule_id];
1972        Ok(Json(serde_json::json!({
1973            "id": rule_id,
1974            "pattern": rule.pattern,
1975            "type": "request",
1976            "status_codes": [],
1977            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1978                "path": t.path,
1979                "replace": t.replace,
1980                "operation": format!("{:?}", t.operation).to_lowercase()
1981            })).collect::<Vec<_>>(),
1982            "enabled": rule.enabled
1983        })))
1984    } else if rule_id < request_count + config.response_replacements.len() {
1985        // Response rule
1986        let response_idx = rule_id - request_count;
1987        let rule = &config.response_replacements[response_idx];
1988        Ok(Json(serde_json::json!({
1989            "id": rule_id,
1990            "pattern": rule.pattern,
1991            "type": "response",
1992            "status_codes": rule.status_codes,
1993            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1994                "path": t.path,
1995                "replace": t.replace,
1996                "operation": format!("{:?}", t.operation).to_lowercase()
1997            })).collect::<Vec<_>>(),
1998            "enabled": rule.enabled
1999        })))
2000    } else {
2001        Ok(Json(serde_json::json!({
2002            "error": format!("Rule ID {} not found", rule_id)
2003        })))
2004    }
2005}
2006
2007/// Update a proxy replacement rule
2008async fn update_proxy_rule(
2009    State(state): State<ManagementState>,
2010    Path(id): Path<String>,
2011    Json(request): Json<ProxyRuleRequest>,
2012) -> Result<Json<serde_json::Value>, StatusCode> {
2013    let proxy_config = match &state.proxy_config {
2014        Some(config) => config,
2015        None => {
2016            return Ok(Json(serde_json::json!({
2017                "error": "Proxy not configured. Proxy config not available."
2018            })));
2019        }
2020    };
2021
2022    let mut config = proxy_config.write().await;
2023    let rule_id: usize = match id.parse() {
2024        Ok(id) => id,
2025        Err(_) => {
2026            return Ok(Json(serde_json::json!({
2027                "error": format!("Invalid rule ID: {}", id)
2028            })));
2029        }
2030    };
2031
2032    let body_transforms: Vec<BodyTransform> = request
2033        .body_transforms
2034        .iter()
2035        .map(|t| {
2036            let op = match t.operation.as_str() {
2037                "replace" => TransformOperation::Replace,
2038                "add" => TransformOperation::Add,
2039                "remove" => TransformOperation::Remove,
2040                _ => TransformOperation::Replace,
2041            };
2042            BodyTransform {
2043                path: t.path.clone(),
2044                replace: t.replace.clone(),
2045                operation: op,
2046            }
2047        })
2048        .collect();
2049
2050    let updated_rule = BodyTransformRule {
2051        pattern: request.pattern.clone(),
2052        status_codes: request.status_codes.clone(),
2053        body_transforms,
2054        enabled: request.enabled,
2055    };
2056
2057    let request_count = config.request_replacements.len();
2058
2059    if rule_id < request_count {
2060        // Update request rule
2061        config.request_replacements[rule_id] = updated_rule;
2062    } else if rule_id < request_count + config.response_replacements.len() {
2063        // Update response rule
2064        let response_idx = rule_id - request_count;
2065        config.response_replacements[response_idx] = updated_rule;
2066    } else {
2067        return Ok(Json(serde_json::json!({
2068            "error": format!("Rule ID {} not found", rule_id)
2069        })));
2070    }
2071
2072    Ok(Json(serde_json::json!({
2073        "id": rule_id,
2074        "message": "Rule updated successfully"
2075    })))
2076}
2077
2078/// Delete a proxy replacement rule
2079async fn delete_proxy_rule(
2080    State(state): State<ManagementState>,
2081    Path(id): Path<String>,
2082) -> Result<Json<serde_json::Value>, StatusCode> {
2083    let proxy_config = match &state.proxy_config {
2084        Some(config) => config,
2085        None => {
2086            return Ok(Json(serde_json::json!({
2087                "error": "Proxy not configured. Proxy config not available."
2088            })));
2089        }
2090    };
2091
2092    let mut config = proxy_config.write().await;
2093    let rule_id: usize = match id.parse() {
2094        Ok(id) => id,
2095        Err(_) => {
2096            return Ok(Json(serde_json::json!({
2097                "error": format!("Invalid rule ID: {}", id)
2098            })));
2099        }
2100    };
2101
2102    let request_count = config.request_replacements.len();
2103
2104    if rule_id < request_count {
2105        // Delete request rule
2106        config.request_replacements.remove(rule_id);
2107    } else if rule_id < request_count + config.response_replacements.len() {
2108        // Delete response rule
2109        let response_idx = rule_id - request_count;
2110        config.response_replacements.remove(response_idx);
2111    } else {
2112        return Ok(Json(serde_json::json!({
2113            "error": format!("Rule ID {} not found", rule_id)
2114        })));
2115    }
2116
2117    Ok(Json(serde_json::json!({
2118        "id": rule_id,
2119        "message": "Rule deleted successfully"
2120    })))
2121}
2122
2123/// Get recent intercepted requests/responses for inspection
2124/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2125async fn get_proxy_inspect(
2126    State(_state): State<ManagementState>,
2127    Query(params): Query<std::collections::HashMap<String, String>>,
2128) -> Result<Json<serde_json::Value>, StatusCode> {
2129    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2130
2131    // Note: Request/response inspection would require:
2132    // 1. Storing intercepted requests/responses in ManagementState or a separate store
2133    // 2. Integrating with proxy middleware to capture traffic
2134    // 3. Implementing filtering and pagination for large volumes of traffic
2135    // For now, return an empty response structure indicating the feature is not yet implemented
2136    Ok(Json(serde_json::json!({
2137        "requests": [],
2138        "responses": [],
2139        "limit": limit,
2140        "total": 0,
2141        "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2142    })))
2143}
2144
2145/// Build the management API router
2146pub fn management_router(state: ManagementState) -> Router {
2147    let router = Router::new()
2148        .route("/health", get(health_check))
2149        .route("/stats", get(get_stats))
2150        .route("/config", get(get_config))
2151        .route("/config/validate", post(validate_config))
2152        .route("/config/bulk", post(bulk_update_config))
2153        .route("/mocks", get(list_mocks))
2154        .route("/mocks", post(create_mock))
2155        .route("/mocks/{id}", get(get_mock))
2156        .route("/mocks/{id}", put(update_mock))
2157        .route("/mocks/{id}", delete(delete_mock))
2158        .route("/export", get(export_mocks))
2159        .route("/import", post(import_mocks));
2160
2161    #[cfg(feature = "smtp")]
2162    let router = router
2163        .route("/smtp/mailbox", get(list_smtp_emails))
2164        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2165        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2166        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2167        .route("/smtp/mailbox/search", get(search_smtp_emails));
2168
2169    #[cfg(not(feature = "smtp"))]
2170    let router = router;
2171
2172    // MQTT routes
2173    #[cfg(feature = "mqtt")]
2174    let router = router
2175        .route("/mqtt/stats", get(get_mqtt_stats))
2176        .route("/mqtt/clients", get(get_mqtt_clients))
2177        .route("/mqtt/topics", get(get_mqtt_topics))
2178        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2179        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2180        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2181        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2182
2183    #[cfg(not(feature = "mqtt"))]
2184    let router = router
2185        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2186        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2187
2188    #[cfg(feature = "kafka")]
2189    let router = router
2190        .route("/kafka/stats", get(get_kafka_stats))
2191        .route("/kafka/topics", get(get_kafka_topics))
2192        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2193        .route("/kafka/groups", get(get_kafka_groups))
2194        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2195        .route("/kafka/produce", post(produce_kafka_message))
2196        .route("/kafka/produce/batch", post(produce_kafka_batch))
2197        .route("/kafka/messages/stream", get(kafka_messages_stream));
2198
2199    #[cfg(not(feature = "kafka"))]
2200    let router = router;
2201
2202    // Migration pipeline routes
2203    let router = router
2204        .route("/migration/routes", get(get_migration_routes))
2205        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2206        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2207        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2208        .route("/migration/groups/{group}", put(set_group_migration_mode))
2209        .route("/migration/groups", get(get_migration_groups))
2210        .route("/migration/status", get(get_migration_status));
2211
2212    // Proxy replacement rules routes
2213    let router = router
2214        .route("/proxy/rules", get(list_proxy_rules))
2215        .route("/proxy/rules", post(create_proxy_rule))
2216        .route("/proxy/rules/{id}", get(get_proxy_rule))
2217        .route("/proxy/rules/{id}", put(update_proxy_rule))
2218        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2219        .route("/proxy/inspect", get(get_proxy_inspect));
2220
2221    // AI-powered features
2222    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2223
2224    // Snapshot diff endpoints
2225    let router = router.nest(
2226        "/snapshot-diff",
2227        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2228    );
2229
2230    #[cfg(feature = "behavioral-cloning")]
2231    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2232
2233    let router = router
2234        .route("/mockai/learn", post(learn_from_examples))
2235        .route("/mockai/rules/explanations", get(list_rule_explanations))
2236        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2237        .route("/chaos/config", get(get_chaos_config))
2238        .route("/chaos/config", post(update_chaos_config))
2239        .route("/network/profiles", get(list_network_profiles))
2240        .route("/network/profile/apply", post(apply_network_profile));
2241
2242    // State machine API routes
2243    let router =
2244        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2245
2246    router.with_state(state)
2247}
2248
2249#[cfg(feature = "kafka")]
2250#[derive(Debug, Clone, Serialize, Deserialize)]
2251pub struct KafkaBrokerStats {
2252    /// Number of topics
2253    pub topics: usize,
2254    /// Total number of partitions
2255    pub partitions: usize,
2256    /// Number of consumer groups
2257    pub consumer_groups: usize,
2258    /// Total messages produced
2259    pub messages_produced: u64,
2260    /// Total messages consumed
2261    pub messages_consumed: u64,
2262}
2263
2264#[cfg(feature = "kafka")]
2265#[derive(Debug, Clone, Serialize, Deserialize)]
2266pub struct KafkaTopicInfo {
2267    pub name: String,
2268    pub partitions: usize,
2269    pub replication_factor: i32,
2270}
2271
2272#[cfg(feature = "kafka")]
2273#[derive(Debug, Clone, Serialize, Deserialize)]
2274pub struct KafkaConsumerGroupInfo {
2275    pub group_id: String,
2276    pub members: usize,
2277    pub state: String,
2278}
2279
2280#[cfg(feature = "kafka")]
2281/// Get Kafka broker statistics
2282async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2283    if let Some(broker) = &state.kafka_broker {
2284        let topics = broker.topics.read().await;
2285        let consumer_groups = broker.consumer_groups.read().await;
2286
2287        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2288
2289        // Get metrics snapshot for message counts
2290        let metrics_snapshot = broker.metrics().snapshot();
2291
2292        let stats = KafkaBrokerStats {
2293            topics: topics.len(),
2294            partitions: total_partitions,
2295            consumer_groups: consumer_groups.groups().len(),
2296            messages_produced: metrics_snapshot.messages_produced_total,
2297            messages_consumed: metrics_snapshot.messages_consumed_total,
2298        };
2299
2300        Json(stats).into_response()
2301    } else {
2302        (
2303            StatusCode::SERVICE_UNAVAILABLE,
2304            Json(serde_json::json!({
2305                "error": "Kafka broker not available",
2306                "message": "Kafka broker is not enabled or not available."
2307            })),
2308        )
2309            .into_response()
2310    }
2311}
2312
2313#[cfg(feature = "kafka")]
2314/// List Kafka topics
2315async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2316    if let Some(broker) = &state.kafka_broker {
2317        let topics = broker.topics.read().await;
2318        let topic_list: Vec<KafkaTopicInfo> = topics
2319            .iter()
2320            .map(|(name, topic)| KafkaTopicInfo {
2321                name: name.clone(),
2322                partitions: topic.partitions.len(),
2323                replication_factor: topic.config.replication_factor as i32,
2324            })
2325            .collect();
2326
2327        Json(serde_json::json!({
2328            "topics": topic_list
2329        }))
2330        .into_response()
2331    } else {
2332        (
2333            StatusCode::SERVICE_UNAVAILABLE,
2334            Json(serde_json::json!({
2335                "error": "Kafka broker not available",
2336                "message": "Kafka broker is not enabled or not available."
2337            })),
2338        )
2339            .into_response()
2340    }
2341}
2342
2343#[cfg(feature = "kafka")]
2344/// Get Kafka topic details
2345async fn get_kafka_topic(
2346    State(state): State<ManagementState>,
2347    Path(topic_name): Path<String>,
2348) -> impl IntoResponse {
2349    if let Some(broker) = &state.kafka_broker {
2350        let topics = broker.topics.read().await;
2351        if let Some(topic) = topics.get(&topic_name) {
2352            Json(serde_json::json!({
2353                "name": topic_name,
2354                "partitions": topic.partitions.len(),
2355                "replication_factor": topic.config.replication_factor,
2356                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2357                    "id": idx as i32,
2358                    "leader": 0,
2359                    "replicas": vec![0],
2360                    "message_count": partition.messages.len()
2361                })).collect::<Vec<_>>()
2362            })).into_response()
2363        } else {
2364            (
2365                StatusCode::NOT_FOUND,
2366                Json(serde_json::json!({
2367                    "error": "Topic not found",
2368                    "topic": topic_name
2369                })),
2370            )
2371                .into_response()
2372        }
2373    } else {
2374        (
2375            StatusCode::SERVICE_UNAVAILABLE,
2376            Json(serde_json::json!({
2377                "error": "Kafka broker not available",
2378                "message": "Kafka broker is not enabled or not available."
2379            })),
2380        )
2381            .into_response()
2382    }
2383}
2384
2385#[cfg(feature = "kafka")]
2386/// List Kafka consumer groups
2387async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2388    if let Some(broker) = &state.kafka_broker {
2389        let consumer_groups = broker.consumer_groups.read().await;
2390        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2391            .groups()
2392            .iter()
2393            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2394                group_id: group_id.clone(),
2395                members: group.members.len(),
2396                state: "Stable".to_string(), // Simplified - could be more detailed
2397            })
2398            .collect();
2399
2400        Json(serde_json::json!({
2401            "groups": groups
2402        }))
2403        .into_response()
2404    } else {
2405        (
2406            StatusCode::SERVICE_UNAVAILABLE,
2407            Json(serde_json::json!({
2408                "error": "Kafka broker not available",
2409                "message": "Kafka broker is not enabled or not available."
2410            })),
2411        )
2412            .into_response()
2413    }
2414}
2415
2416#[cfg(feature = "kafka")]
2417/// Get Kafka consumer group details
2418async fn get_kafka_group(
2419    State(state): State<ManagementState>,
2420    Path(group_id): Path<String>,
2421) -> impl IntoResponse {
2422    if let Some(broker) = &state.kafka_broker {
2423        let consumer_groups = broker.consumer_groups.read().await;
2424        if let Some(group) = consumer_groups.groups().get(&group_id) {
2425            Json(serde_json::json!({
2426                "group_id": group_id,
2427                "members": group.members.len(),
2428                "state": "Stable",
2429                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2430                    "member_id": member_id,
2431                    "client_id": member.client_id,
2432                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2433                        "topic": a.topic,
2434                        "partitions": a.partitions
2435                    })).collect::<Vec<_>>()
2436                })).collect::<Vec<_>>(),
2437                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2438                    "topic": topic,
2439                    "partition": partition,
2440                    "offset": offset
2441                })).collect::<Vec<_>>()
2442            })).into_response()
2443        } else {
2444            (
2445                StatusCode::NOT_FOUND,
2446                Json(serde_json::json!({
2447                    "error": "Consumer group not found",
2448                    "group_id": group_id
2449                })),
2450            )
2451                .into_response()
2452        }
2453    } else {
2454        (
2455            StatusCode::SERVICE_UNAVAILABLE,
2456            Json(serde_json::json!({
2457                "error": "Kafka broker not available",
2458                "message": "Kafka broker is not enabled or not available."
2459            })),
2460        )
2461            .into_response()
2462    }
2463}
2464
2465// ========== Kafka Produce Handler ==========
2466
2467#[cfg(feature = "kafka")]
2468#[derive(Debug, Deserialize)]
2469pub struct KafkaProduceRequest {
2470    /// Topic to produce to
2471    pub topic: String,
2472    /// Message key (optional)
2473    #[serde(default)]
2474    pub key: Option<String>,
2475    /// Message value (JSON string or plain string)
2476    pub value: String,
2477    /// Partition ID (optional, auto-assigned if not provided)
2478    #[serde(default)]
2479    pub partition: Option<i32>,
2480    /// Message headers (optional, key-value pairs)
2481    #[serde(default)]
2482    pub headers: Option<std::collections::HashMap<String, String>>,
2483}
2484
2485#[cfg(feature = "kafka")]
2486/// Produce a message to a Kafka topic
2487async fn produce_kafka_message(
2488    State(state): State<ManagementState>,
2489    Json(request): Json<KafkaProduceRequest>,
2490) -> impl IntoResponse {
2491    if let Some(broker) = &state.kafka_broker {
2492        let mut topics = broker.topics.write().await;
2493
2494        // Get or create the topic
2495        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2496            mockforge_kafka::topics::Topic::new(
2497                request.topic.clone(),
2498                mockforge_kafka::topics::TopicConfig::default(),
2499            )
2500        });
2501
2502        // Determine partition
2503        let partition_id = if let Some(partition) = request.partition {
2504            partition
2505        } else {
2506            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2507        };
2508
2509        // Validate partition exists
2510        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2511            return (
2512                StatusCode::BAD_REQUEST,
2513                Json(serde_json::json!({
2514                    "error": "Invalid partition",
2515                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2516                })),
2517            )
2518                .into_response();
2519        }
2520
2521        // Create the message
2522        let key_clone = request.key.clone();
2523        let headers_clone = request.headers.clone();
2524        let message = mockforge_kafka::partitions::KafkaMessage {
2525            offset: 0, // Will be set by partition.append
2526            timestamp: chrono::Utc::now().timestamp_millis(),
2527            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2528            value: request.value.as_bytes().to_vec(),
2529            headers: headers_clone
2530                .clone()
2531                .unwrap_or_default()
2532                .into_iter()
2533                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2534                .collect(),
2535        };
2536
2537        // Produce to partition
2538        match topic_entry.produce(partition_id, message).await {
2539            Ok(offset) => {
2540                // Record metrics for successful message production
2541                if let Some(broker) = &state.kafka_broker {
2542                    broker.metrics().record_messages_produced(1);
2543                }
2544
2545                // Emit message event for real-time monitoring
2546                #[cfg(feature = "kafka")]
2547                {
2548                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2549                        topic: request.topic.clone(),
2550                        key: key_clone,
2551                        value: request.value.clone(),
2552                        partition: partition_id,
2553                        offset,
2554                        headers: headers_clone,
2555                        timestamp: chrono::Utc::now().to_rfc3339(),
2556                    });
2557                    let _ = state.message_events.send(event);
2558                }
2559
2560                Json(serde_json::json!({
2561                    "success": true,
2562                    "message": format!("Message produced to topic '{}'", request.topic),
2563                    "topic": request.topic,
2564                    "partition": partition_id,
2565                    "offset": offset
2566                }))
2567                .into_response()
2568            }
2569            Err(e) => (
2570                StatusCode::INTERNAL_SERVER_ERROR,
2571                Json(serde_json::json!({
2572                    "error": "Failed to produce message",
2573                    "message": e.to_string()
2574                })),
2575            )
2576                .into_response(),
2577        }
2578    } else {
2579        (
2580            StatusCode::SERVICE_UNAVAILABLE,
2581            Json(serde_json::json!({
2582                "error": "Kafka broker not available",
2583                "message": "Kafka broker is not enabled or not available."
2584            })),
2585        )
2586            .into_response()
2587    }
2588}
2589
2590#[cfg(feature = "kafka")]
2591#[derive(Debug, Deserialize)]
2592pub struct KafkaBatchProduceRequest {
2593    /// List of messages to produce
2594    pub messages: Vec<KafkaProduceRequest>,
2595    /// Delay between messages in milliseconds
2596    #[serde(default = "default_delay")]
2597    pub delay_ms: u64,
2598}
2599
2600#[cfg(feature = "kafka")]
2601/// Produce multiple messages to Kafka topics
2602async fn produce_kafka_batch(
2603    State(state): State<ManagementState>,
2604    Json(request): Json<KafkaBatchProduceRequest>,
2605) -> impl IntoResponse {
2606    if let Some(broker) = &state.kafka_broker {
2607        if request.messages.is_empty() {
2608            return (
2609                StatusCode::BAD_REQUEST,
2610                Json(serde_json::json!({
2611                    "error": "Empty batch",
2612                    "message": "At least one message is required"
2613                })),
2614            )
2615                .into_response();
2616        }
2617
2618        let mut results = Vec::new();
2619
2620        for (index, msg_request) in request.messages.iter().enumerate() {
2621            let mut topics = broker.topics.write().await;
2622
2623            // Get or create the topic
2624            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2625                mockforge_kafka::topics::Topic::new(
2626                    msg_request.topic.clone(),
2627                    mockforge_kafka::topics::TopicConfig::default(),
2628                )
2629            });
2630
2631            // Determine partition
2632            let partition_id = if let Some(partition) = msg_request.partition {
2633                partition
2634            } else {
2635                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2636            };
2637
2638            // Validate partition exists
2639            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2640                results.push(serde_json::json!({
2641                    "index": index,
2642                    "success": false,
2643                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2644                }));
2645                continue;
2646            }
2647
2648            // Create the message
2649            let message = mockforge_kafka::partitions::KafkaMessage {
2650                offset: 0,
2651                timestamp: chrono::Utc::now().timestamp_millis(),
2652                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2653                value: msg_request.value.as_bytes().to_vec(),
2654                headers: msg_request
2655                    .headers
2656                    .clone()
2657                    .unwrap_or_default()
2658                    .into_iter()
2659                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2660                    .collect(),
2661            };
2662
2663            // Produce to partition
2664            match topic_entry.produce(partition_id, message).await {
2665                Ok(offset) => {
2666                    // Record metrics for successful message production
2667                    if let Some(broker) = &state.kafka_broker {
2668                        broker.metrics().record_messages_produced(1);
2669                    }
2670
2671                    // Emit message event
2672                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2673                        topic: msg_request.topic.clone(),
2674                        key: msg_request.key.clone(),
2675                        value: msg_request.value.clone(),
2676                        partition: partition_id,
2677                        offset,
2678                        headers: msg_request.headers.clone(),
2679                        timestamp: chrono::Utc::now().to_rfc3339(),
2680                    });
2681                    let _ = state.message_events.send(event);
2682
2683                    results.push(serde_json::json!({
2684                        "index": index,
2685                        "success": true,
2686                        "topic": msg_request.topic,
2687                        "partition": partition_id,
2688                        "offset": offset
2689                    }));
2690                }
2691                Err(e) => {
2692                    results.push(serde_json::json!({
2693                        "index": index,
2694                        "success": false,
2695                        "error": e.to_string()
2696                    }));
2697                }
2698            }
2699
2700            // Add delay between messages (except for the last one)
2701            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2702                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2703            }
2704        }
2705
2706        let success_count =
2707            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2708
2709        Json(serde_json::json!({
2710            "success": true,
2711            "total": request.messages.len(),
2712            "succeeded": success_count,
2713            "failed": request.messages.len() - success_count,
2714            "results": results
2715        }))
2716        .into_response()
2717    } else {
2718        (
2719            StatusCode::SERVICE_UNAVAILABLE,
2720            Json(serde_json::json!({
2721                "error": "Kafka broker not available",
2722                "message": "Kafka broker is not enabled or not available."
2723            })),
2724        )
2725            .into_response()
2726    }
2727}
2728
2729// ========== Real-time Message Streaming (SSE) ==========
2730
2731#[cfg(feature = "mqtt")]
2732/// SSE stream for MQTT messages
2733async fn mqtt_messages_stream(
2734    State(state): State<ManagementState>,
2735    Query(params): Query<std::collections::HashMap<String, String>>,
2736) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2737    let rx = state.message_events.subscribe();
2738    let topic_filter = params.get("topic").cloned();
2739
2740    let stream = stream::unfold(rx, move |mut rx| {
2741        let topic_filter = topic_filter.clone();
2742
2743        async move {
2744            loop {
2745                match rx.recv().await {
2746                    Ok(MessageEvent::Mqtt(event)) => {
2747                        // Apply topic filter if specified
2748                        if let Some(filter) = &topic_filter {
2749                            if !event.topic.contains(filter) {
2750                                continue;
2751                            }
2752                        }
2753
2754                        let event_json = serde_json::json!({
2755                            "protocol": "mqtt",
2756                            "topic": event.topic,
2757                            "payload": event.payload,
2758                            "qos": event.qos,
2759                            "retain": event.retain,
2760                            "timestamp": event.timestamp,
2761                        });
2762
2763                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2764                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2765                            return Some((Ok(sse_event), rx));
2766                        }
2767                    }
2768                    #[cfg(feature = "kafka")]
2769                    Ok(MessageEvent::Kafka(_)) => {
2770                        // Skip Kafka events in MQTT stream
2771                        continue;
2772                    }
2773                    Err(broadcast::error::RecvError::Closed) => {
2774                        return None;
2775                    }
2776                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2777                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2778                        continue;
2779                    }
2780                }
2781            }
2782        }
2783    });
2784
2785    Sse::new(stream).keep_alive(
2786        axum::response::sse::KeepAlive::new()
2787            .interval(std::time::Duration::from_secs(15))
2788            .text("keep-alive-text"),
2789    )
2790}
2791
2792#[cfg(feature = "kafka")]
2793/// SSE stream for Kafka messages
2794async fn kafka_messages_stream(
2795    State(state): State<ManagementState>,
2796    Query(params): Query<std::collections::HashMap<String, String>>,
2797) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2798    let mut rx = state.message_events.subscribe();
2799    let topic_filter = params.get("topic").cloned();
2800
2801    let stream = stream::unfold(rx, move |mut rx| {
2802        let topic_filter = topic_filter.clone();
2803
2804        async move {
2805            loop {
2806                match rx.recv().await {
2807                    #[cfg(feature = "mqtt")]
2808                    Ok(MessageEvent::Mqtt(_)) => {
2809                        // Skip MQTT events in Kafka stream
2810                        continue;
2811                    }
2812                    Ok(MessageEvent::Kafka(event)) => {
2813                        // Apply topic filter if specified
2814                        if let Some(filter) = &topic_filter {
2815                            if !event.topic.contains(filter) {
2816                                continue;
2817                            }
2818                        }
2819
2820                        let event_json = serde_json::json!({
2821                            "protocol": "kafka",
2822                            "topic": event.topic,
2823                            "key": event.key,
2824                            "value": event.value,
2825                            "partition": event.partition,
2826                            "offset": event.offset,
2827                            "headers": event.headers,
2828                            "timestamp": event.timestamp,
2829                        });
2830
2831                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2832                            let sse_event =
2833                                Event::default().event("kafka_message").data(event_data);
2834                            return Some((Ok(sse_event), rx));
2835                        }
2836                    }
2837                    Err(broadcast::error::RecvError::Closed) => {
2838                        return None;
2839                    }
2840                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2841                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
2842                        continue;
2843                    }
2844                }
2845            }
2846        }
2847    });
2848
2849    Sse::new(stream).keep_alive(
2850        axum::response::sse::KeepAlive::new()
2851            .interval(std::time::Duration::from_secs(15))
2852            .text("keep-alive-text"),
2853    )
2854}
2855
2856// ========== AI-Powered Features ==========
2857
2858/// Request for AI-powered API specification generation
2859#[derive(Debug, Deserialize)]
2860pub struct GenerateSpecRequest {
2861    /// Natural language description of the API to generate
2862    pub query: String,
2863    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
2864    pub spec_type: String,
2865    /// Optional API version (e.g., "3.0.0" for OpenAPI)
2866    pub api_version: Option<String>,
2867}
2868
2869/// Request for OpenAPI generation from recorded traffic
2870#[derive(Debug, Deserialize)]
2871pub struct GenerateOpenApiFromTrafficRequest {
2872    /// Path to recorder database (optional, defaults to ./recordings.db)
2873    #[serde(default)]
2874    pub database_path: Option<String>,
2875    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
2876    #[serde(default)]
2877    pub since: Option<String>,
2878    /// End time for filtering (ISO 8601 format)
2879    #[serde(default)]
2880    pub until: Option<String>,
2881    /// Path pattern filter (supports wildcards, e.g., /api/*)
2882    #[serde(default)]
2883    pub path_pattern: Option<String>,
2884    /// Minimum confidence score for including paths (0.0 to 1.0)
2885    #[serde(default = "default_min_confidence")]
2886    pub min_confidence: f64,
2887}
2888
2889fn default_min_confidence() -> f64 {
2890    0.7
2891}
2892
2893/// Generate API specification from natural language using AI
2894#[cfg(feature = "data-faker")]
2895async fn generate_ai_spec(
2896    State(_state): State<ManagementState>,
2897    Json(request): Json<GenerateSpecRequest>,
2898) -> impl IntoResponse {
2899    use mockforge_data::rag::{
2900        config::{EmbeddingProvider, LlmProvider, RagConfig},
2901        engine::RagEngine,
2902        storage::{DocumentStorage, StorageFactory},
2903    };
2904    use std::sync::Arc;
2905
2906    // Build RAG config from environment variables
2907    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2908        .ok()
2909        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2910
2911    // Check if RAG is configured - require API key
2912    if api_key.is_none() {
2913        return (
2914            StatusCode::SERVICE_UNAVAILABLE,
2915            Json(serde_json::json!({
2916                "error": "AI service not configured",
2917                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2918            })),
2919        )
2920            .into_response();
2921    }
2922
2923    // Build RAG configuration
2924    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2925        .unwrap_or_else(|_| "openai".to_string())
2926        .to_lowercase();
2927
2928    let provider = match provider_str.as_str() {
2929        "openai" => LlmProvider::OpenAI,
2930        "anthropic" => LlmProvider::Anthropic,
2931        "ollama" => LlmProvider::Ollama,
2932        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2933        _ => LlmProvider::OpenAI,
2934    };
2935
2936    let api_endpoint =
2937        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2938            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2939            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2940            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2941            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2942        });
2943
2944    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2945        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2946        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2947        LlmProvider::Ollama => "llama2".to_string(),
2948        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2949    });
2950
2951    // Build RagConfig using default() and override fields
2952    let mut rag_config = RagConfig::default();
2953    rag_config.provider = provider;
2954    rag_config.api_endpoint = api_endpoint;
2955    rag_config.api_key = api_key;
2956    rag_config.model = model;
2957    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2958        .unwrap_or_else(|_| "4096".to_string())
2959        .parse()
2960        .unwrap_or(4096);
2961    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2962        .unwrap_or_else(|_| "0.3".to_string())
2963        .parse()
2964        .unwrap_or(0.3); // Lower temperature for more structured output
2965    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2966        .unwrap_or_else(|_| "60".to_string())
2967        .parse()
2968        .unwrap_or(60);
2969    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2970        .unwrap_or_else(|_| "4000".to_string())
2971        .parse()
2972        .unwrap_or(4000);
2973
2974    // Build the prompt for spec generation
2975    let spec_type_label = match request.spec_type.as_str() {
2976        "openapi" => "OpenAPI 3.0",
2977        "graphql" => "GraphQL",
2978        "asyncapi" => "AsyncAPI",
2979        _ => "OpenAPI 3.0",
2980    };
2981
2982    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2983
2984    let prompt = format!(
2985        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2986
2987User Requirements:
2988{}
2989
2990Instructions:
29911. Generate a complete, valid {} specification
29922. Include all paths, operations, request/response schemas, and components
29933. Use realistic field names and data types
29944. Include proper descriptions and examples
29955. Follow {} best practices
29966. Return ONLY the specification, no additional explanation
29977. For OpenAPI, use version {}
2998
2999Return the specification in {} format."#,
3000        spec_type_label,
3001        request.query,
3002        spec_type_label,
3003        spec_type_label,
3004        api_version,
3005        if request.spec_type == "graphql" {
3006            "GraphQL SDL"
3007        } else {
3008            "YAML"
3009        }
3010    );
3011
3012    // Create in-memory storage for RAG engine
3013    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3014    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3015    // a simpler approach: create InMemoryStorage directly
3016    use mockforge_data::rag::storage::InMemoryStorage;
3017    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3018
3019    // Create RAG engine
3020    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3021        Ok(engine) => engine,
3022        Err(e) => {
3023            return (
3024                StatusCode::INTERNAL_SERVER_ERROR,
3025                Json(serde_json::json!({
3026                    "error": "Failed to initialize RAG engine",
3027                    "message": e.to_string()
3028                })),
3029            )
3030                .into_response();
3031        }
3032    };
3033
3034    // Generate using RAG engine
3035    match rag_engine.generate(&prompt, None).await {
3036        Ok(generated_text) => {
3037            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3038            let spec = if request.spec_type == "graphql" {
3039                // For GraphQL, extract SDL
3040                extract_graphql_schema(&generated_text)
3041            } else {
3042                // For OpenAPI/AsyncAPI, extract YAML
3043                extract_yaml_spec(&generated_text)
3044            };
3045
3046            Json(serde_json::json!({
3047                "success": true,
3048                "spec": spec,
3049                "spec_type": request.spec_type,
3050            }))
3051            .into_response()
3052        }
3053        Err(e) => (
3054            StatusCode::INTERNAL_SERVER_ERROR,
3055            Json(serde_json::json!({
3056                "error": "AI generation failed",
3057                "message": e.to_string()
3058            })),
3059        )
3060            .into_response(),
3061    }
3062}
3063
3064#[cfg(not(feature = "data-faker"))]
3065async fn generate_ai_spec(
3066    State(_state): State<ManagementState>,
3067    Json(_request): Json<GenerateSpecRequest>,
3068) -> impl IntoResponse {
3069    (
3070        StatusCode::NOT_IMPLEMENTED,
3071        Json(serde_json::json!({
3072            "error": "AI features not enabled",
3073            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3074        })),
3075    )
3076        .into_response()
3077}
3078
3079/// Generate OpenAPI specification from recorded traffic
3080#[cfg(feature = "behavioral-cloning")]
3081async fn generate_openapi_from_traffic(
3082    State(_state): State<ManagementState>,
3083    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3084) -> impl IntoResponse {
3085    use chrono::{DateTime, Utc};
3086    use mockforge_core::intelligent_behavior::{
3087        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3088        IntelligentBehaviorConfig,
3089    };
3090    use mockforge_recorder::{
3091        database::RecorderDatabase,
3092        openapi_export::{QueryFilters, RecordingsToOpenApi},
3093    };
3094    use std::path::PathBuf;
3095
3096    // Determine database path
3097    let db_path = if let Some(ref path) = request.database_path {
3098        PathBuf::from(path)
3099    } else {
3100        std::env::current_dir()
3101            .unwrap_or_else(|_| PathBuf::from("."))
3102            .join("recordings.db")
3103    };
3104
3105    // Open database
3106    let db = match RecorderDatabase::new(&db_path).await {
3107        Ok(db) => db,
3108        Err(e) => {
3109            return (
3110                StatusCode::BAD_REQUEST,
3111                Json(serde_json::json!({
3112                    "error": "Database error",
3113                    "message": format!("Failed to open recorder database: {}", e)
3114                })),
3115            )
3116                .into_response();
3117        }
3118    };
3119
3120    // Parse time filters
3121    let since_dt = if let Some(ref since_str) = request.since {
3122        match DateTime::parse_from_rfc3339(since_str) {
3123            Ok(dt) => Some(dt.with_timezone(&Utc)),
3124            Err(e) => {
3125                return (
3126                    StatusCode::BAD_REQUEST,
3127                    Json(serde_json::json!({
3128                        "error": "Invalid date format",
3129                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3130                    })),
3131                )
3132                    .into_response();
3133            }
3134        }
3135    } else {
3136        None
3137    };
3138
3139    let until_dt = if let Some(ref until_str) = request.until {
3140        match DateTime::parse_from_rfc3339(until_str) {
3141            Ok(dt) => Some(dt.with_timezone(&Utc)),
3142            Err(e) => {
3143                return (
3144                    StatusCode::BAD_REQUEST,
3145                    Json(serde_json::json!({
3146                        "error": "Invalid date format",
3147                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3148                    })),
3149                )
3150                    .into_response();
3151            }
3152        }
3153    } else {
3154        None
3155    };
3156
3157    // Build query filters
3158    let query_filters = QueryFilters {
3159        since: since_dt,
3160        until: until_dt,
3161        path_pattern: request.path_pattern.clone(),
3162        min_status_code: None,
3163        max_requests: Some(1000),
3164    };
3165
3166    // Query HTTP exchanges
3167    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3168    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3169    // dependency, so we need to manually convert to the local version.
3170    let exchanges_from_recorder =
3171        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3172            Ok(exchanges) => exchanges,
3173            Err(e) => {
3174                return (
3175                    StatusCode::INTERNAL_SERVER_ERROR,
3176                    Json(serde_json::json!({
3177                        "error": "Query error",
3178                        "message": format!("Failed to query HTTP exchanges: {}", e)
3179                    })),
3180                )
3181                    .into_response();
3182            }
3183        };
3184
3185    if exchanges_from_recorder.is_empty() {
3186        return (
3187            StatusCode::NOT_FOUND,
3188            Json(serde_json::json!({
3189                "error": "No exchanges found",
3190                "message": "No HTTP exchanges found matching the specified filters"
3191            })),
3192        )
3193            .into_response();
3194    }
3195
3196    // Convert to local HttpExchange type to avoid version mismatch
3197    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3198    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3199        .into_iter()
3200        .map(|e| LocalHttpExchange {
3201            method: e.method,
3202            path: e.path,
3203            query_params: e.query_params,
3204            headers: e.headers,
3205            body: e.body,
3206            body_encoding: e.body_encoding,
3207            status_code: e.status_code,
3208            response_headers: e.response_headers,
3209            response_body: e.response_body,
3210            response_body_encoding: e.response_body_encoding,
3211            timestamp: e.timestamp,
3212        })
3213        .collect();
3214
3215    // Create OpenAPI generator config
3216    let behavior_config = IntelligentBehaviorConfig::default();
3217    let gen_config = OpenApiGenerationConfig {
3218        min_confidence: request.min_confidence,
3219        behavior_model: Some(behavior_config.behavior_model),
3220    };
3221
3222    // Generate OpenAPI spec
3223    let generator = OpenApiSpecGenerator::new(gen_config);
3224    let result = match generator.generate_from_exchanges(exchanges).await {
3225        Ok(result) => result,
3226        Err(e) => {
3227            return (
3228                StatusCode::INTERNAL_SERVER_ERROR,
3229                Json(serde_json::json!({
3230                    "error": "Generation error",
3231                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3232                })),
3233            )
3234                .into_response();
3235        }
3236    };
3237
3238    // Prepare response
3239    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3240        raw.clone()
3241    } else {
3242        match serde_json::to_value(&result.spec.spec) {
3243            Ok(json) => json,
3244            Err(e) => {
3245                return (
3246                    StatusCode::INTERNAL_SERVER_ERROR,
3247                    Json(serde_json::json!({
3248                        "error": "Serialization error",
3249                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3250                    })),
3251                )
3252                    .into_response();
3253            }
3254        }
3255    };
3256
3257    // Build response with metadata
3258    let response = serde_json::json!({
3259        "spec": spec_json,
3260        "metadata": {
3261            "requests_analyzed": result.metadata.requests_analyzed,
3262            "paths_inferred": result.metadata.paths_inferred,
3263            "path_confidence": result.metadata.path_confidence,
3264            "generated_at": result.metadata.generated_at.to_rfc3339(),
3265            "duration_ms": result.metadata.duration_ms,
3266        }
3267    });
3268
3269    Json(response).into_response()
3270}
3271
3272/// List all rule explanations
3273async fn list_rule_explanations(
3274    State(state): State<ManagementState>,
3275    Query(params): Query<std::collections::HashMap<String, String>>,
3276) -> impl IntoResponse {
3277    use mockforge_core::intelligent_behavior::RuleType;
3278
3279    let explanations = state.rule_explanations.read().await;
3280    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3281
3282    // Filter by rule type if provided
3283    if let Some(rule_type_str) = params.get("rule_type") {
3284        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3285            explanations_vec.retain(|e| e.rule_type == rule_type);
3286        }
3287    }
3288
3289    // Filter by minimum confidence if provided
3290    if let Some(min_confidence_str) = params.get("min_confidence") {
3291        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3292            explanations_vec.retain(|e| e.confidence >= min_confidence);
3293        }
3294    }
3295
3296    // Sort by confidence (descending) and then by generated_at (descending)
3297    explanations_vec.sort_by(|a, b| {
3298        b.confidence
3299            .partial_cmp(&a.confidence)
3300            .unwrap_or(std::cmp::Ordering::Equal)
3301            .then_with(|| b.generated_at.cmp(&a.generated_at))
3302    });
3303
3304    Json(serde_json::json!({
3305        "explanations": explanations_vec,
3306        "total": explanations_vec.len(),
3307    }))
3308    .into_response()
3309}
3310
3311/// Get a specific rule explanation by ID
3312async fn get_rule_explanation(
3313    State(state): State<ManagementState>,
3314    Path(rule_id): Path<String>,
3315) -> impl IntoResponse {
3316    let explanations = state.rule_explanations.read().await;
3317
3318    match explanations.get(&rule_id) {
3319        Some(explanation) => Json(serde_json::json!({
3320            "explanation": explanation,
3321        }))
3322        .into_response(),
3323        None => (
3324            StatusCode::NOT_FOUND,
3325            Json(serde_json::json!({
3326                "error": "Rule explanation not found",
3327                "message": format!("No explanation found for rule ID: {}", rule_id)
3328            })),
3329        )
3330            .into_response(),
3331    }
3332}
3333
3334/// Request for learning from examples
3335#[derive(Debug, Deserialize)]
3336pub struct LearnFromExamplesRequest {
3337    /// Example request/response pairs to learn from
3338    pub examples: Vec<ExamplePairRequest>,
3339    /// Optional configuration override
3340    #[serde(default)]
3341    pub config: Option<serde_json::Value>,
3342}
3343
3344/// Example pair request format
3345#[derive(Debug, Deserialize)]
3346pub struct ExamplePairRequest {
3347    /// Request data (method, path, body, etc.)
3348    pub request: serde_json::Value,
3349    /// Response data (status_code, body, etc.)
3350    pub response: serde_json::Value,
3351}
3352
3353/// Learn behavioral rules from example pairs
3354///
3355/// This endpoint accepts example request/response pairs, generates behavioral rules
3356/// with explanations, and stores the explanations for later retrieval.
3357async fn learn_from_examples(
3358    State(state): State<ManagementState>,
3359    Json(request): Json<LearnFromExamplesRequest>,
3360) -> impl IntoResponse {
3361    use mockforge_core::intelligent_behavior::{
3362        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3363        rule_generator::{ExamplePair, RuleGenerator},
3364    };
3365
3366    if request.examples.is_empty() {
3367        return (
3368            StatusCode::BAD_REQUEST,
3369            Json(serde_json::json!({
3370                "error": "No examples provided",
3371                "message": "At least one example pair is required"
3372            })),
3373        )
3374            .into_response();
3375    }
3376
3377    // Convert request examples to ExamplePair format
3378    let example_pairs: Result<Vec<ExamplePair>, String> = request
3379        .examples
3380        .into_iter()
3381        .enumerate()
3382        .map(|(idx, ex)| {
3383            // Parse request JSON to extract method, path, body, etc.
3384            let method = ex
3385                .request
3386                .get("method")
3387                .and_then(|v| v.as_str())
3388                .map(|s| s.to_string())
3389                .unwrap_or_else(|| "GET".to_string());
3390            let path = ex
3391                .request
3392                .get("path")
3393                .and_then(|v| v.as_str())
3394                .map(|s| s.to_string())
3395                .unwrap_or_else(|| "/".to_string());
3396            let request_body = ex.request.get("body").cloned();
3397            let query_params = ex
3398                .request
3399                .get("query_params")
3400                .and_then(|v| v.as_object())
3401                .map(|obj| {
3402                    obj.iter()
3403                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3404                        .collect()
3405                })
3406                .unwrap_or_default();
3407            let headers = ex
3408                .request
3409                .get("headers")
3410                .and_then(|v| v.as_object())
3411                .map(|obj| {
3412                    obj.iter()
3413                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3414                        .collect()
3415                })
3416                .unwrap_or_default();
3417
3418            // Parse response JSON to extract status, body, etc.
3419            let status = ex
3420                .response
3421                .get("status_code")
3422                .or_else(|| ex.response.get("status"))
3423                .and_then(|v| v.as_u64())
3424                .map(|n| n as u16)
3425                .unwrap_or(200);
3426            let response_body = ex.response.get("body").cloned();
3427
3428            Ok(ExamplePair {
3429                method,
3430                path,
3431                request: request_body,
3432                status,
3433                response: response_body,
3434                query_params,
3435                headers,
3436                metadata: {
3437                    let mut meta = std::collections::HashMap::new();
3438                    meta.insert("source".to_string(), "api".to_string());
3439                    meta.insert("example_index".to_string(), idx.to_string());
3440                    meta
3441                },
3442            })
3443        })
3444        .collect();
3445
3446    let example_pairs = match example_pairs {
3447        Ok(pairs) => pairs,
3448        Err(e) => {
3449            return (
3450                StatusCode::BAD_REQUEST,
3451                Json(serde_json::json!({
3452                    "error": "Invalid examples",
3453                    "message": e
3454                })),
3455            )
3456                .into_response();
3457        }
3458    };
3459
3460    // Create behavior config (use provided config or default)
3461    let behavior_config = if let Some(config_json) = request.config {
3462        // Try to deserialize custom config, fall back to default
3463        serde_json::from_value(config_json)
3464            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3465            .behavior_model
3466    } else {
3467        BehaviorModelConfig::default()
3468    };
3469
3470    // Create rule generator
3471    let generator = RuleGenerator::new(behavior_config);
3472
3473    // Generate rules with explanations
3474    let (rules, explanations) =
3475        match generator.generate_rules_with_explanations(example_pairs).await {
3476            Ok(result) => result,
3477            Err(e) => {
3478                return (
3479                    StatusCode::INTERNAL_SERVER_ERROR,
3480                    Json(serde_json::json!({
3481                        "error": "Rule generation failed",
3482                        "message": format!("Failed to generate rules: {}", e)
3483                    })),
3484                )
3485                    .into_response();
3486            }
3487        };
3488
3489    // Store explanations in ManagementState
3490    {
3491        let mut stored_explanations = state.rule_explanations.write().await;
3492        for explanation in &explanations {
3493            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3494        }
3495    }
3496
3497    // Prepare response
3498    let response = serde_json::json!({
3499        "success": true,
3500        "rules_generated": {
3501            "consistency_rules": rules.consistency_rules.len(),
3502            "schemas": rules.schemas.len(),
3503            "state_machines": rules.state_transitions.len(),
3504            "system_prompt": !rules.system_prompt.is_empty(),
3505        },
3506        "explanations": explanations.iter().map(|e| serde_json::json!({
3507            "rule_id": e.rule_id,
3508            "rule_type": e.rule_type,
3509            "confidence": e.confidence,
3510            "reasoning": e.reasoning,
3511        })).collect::<Vec<_>>(),
3512        "total_explanations": explanations.len(),
3513    });
3514
3515    Json(response).into_response()
3516}
3517
3518fn extract_yaml_spec(text: &str) -> String {
3519    // Try to find YAML code blocks
3520    if let Some(start) = text.find("```yaml") {
3521        let yaml_start = text[start + 7..].trim_start();
3522        if let Some(end) = yaml_start.find("```") {
3523            return yaml_start[..end].trim().to_string();
3524        }
3525    }
3526    if let Some(start) = text.find("```") {
3527        let content_start = text[start + 3..].trim_start();
3528        if let Some(end) = content_start.find("```") {
3529            return content_start[..end].trim().to_string();
3530        }
3531    }
3532
3533    // Check if it starts with openapi: or asyncapi:
3534    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3535        return text.trim().to_string();
3536    }
3537
3538    // Return as-is if no code blocks found
3539    text.trim().to_string()
3540}
3541
3542/// Extract GraphQL schema from text content
3543fn extract_graphql_schema(text: &str) -> String {
3544    // Try to find GraphQL code blocks
3545    if let Some(start) = text.find("```graphql") {
3546        let schema_start = text[start + 10..].trim_start();
3547        if let Some(end) = schema_start.find("```") {
3548            return schema_start[..end].trim().to_string();
3549        }
3550    }
3551    if let Some(start) = text.find("```") {
3552        let content_start = text[start + 3..].trim_start();
3553        if let Some(end) = content_start.find("```") {
3554            return content_start[..end].trim().to_string();
3555        }
3556    }
3557
3558    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3559    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3560        return text.trim().to_string();
3561    }
3562
3563    text.trim().to_string()
3564}
3565
3566// ========== Chaos Engineering Management ==========
3567
3568/// Get current chaos engineering configuration
3569async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3570    #[cfg(feature = "chaos")]
3571    {
3572        if let Some(chaos_state) = &state.chaos_api_state {
3573            let config = chaos_state.config.read().await;
3574            // Convert ChaosConfig to JSON response format
3575            Json(serde_json::json!({
3576                "enabled": config.enabled,
3577                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3578                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3579                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3580                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3581            }))
3582            .into_response()
3583        } else {
3584            // Chaos API not available, return default
3585            Json(serde_json::json!({
3586                "enabled": false,
3587                "latency": null,
3588                "fault_injection": null,
3589                "rate_limit": null,
3590                "traffic_shaping": null,
3591            }))
3592            .into_response()
3593        }
3594    }
3595    #[cfg(not(feature = "chaos"))]
3596    {
3597        // Chaos feature not enabled
3598        Json(serde_json::json!({
3599            "enabled": false,
3600            "latency": null,
3601            "fault_injection": null,
3602            "rate_limit": null,
3603            "traffic_shaping": null,
3604        }))
3605        .into_response()
3606    }
3607}
3608
3609/// Request to update chaos configuration
3610#[derive(Debug, Deserialize)]
3611pub struct ChaosConfigUpdate {
3612    /// Whether to enable chaos engineering
3613    pub enabled: Option<bool>,
3614    /// Latency configuration
3615    pub latency: Option<serde_json::Value>,
3616    /// Fault injection configuration
3617    pub fault_injection: Option<serde_json::Value>,
3618    /// Rate limiting configuration
3619    pub rate_limit: Option<serde_json::Value>,
3620    /// Traffic shaping configuration
3621    pub traffic_shaping: Option<serde_json::Value>,
3622}
3623
3624/// Update chaos engineering configuration
3625async fn update_chaos_config(
3626    State(state): State<ManagementState>,
3627    Json(config_update): Json<ChaosConfigUpdate>,
3628) -> impl IntoResponse {
3629    #[cfg(feature = "chaos")]
3630    {
3631        if let Some(chaos_state) = &state.chaos_api_state {
3632            use mockforge_chaos::config::{
3633                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3634                TrafficShapingConfig,
3635            };
3636
3637            let mut config = chaos_state.config.write().await;
3638
3639            // Update enabled flag if provided
3640            if let Some(enabled) = config_update.enabled {
3641                config.enabled = enabled;
3642            }
3643
3644            // Update latency config if provided
3645            if let Some(latency_json) = config_update.latency {
3646                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3647                    config.latency = Some(latency);
3648                }
3649            }
3650
3651            // Update fault injection config if provided
3652            if let Some(fault_json) = config_update.fault_injection {
3653                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3654                    config.fault_injection = Some(fault);
3655                }
3656            }
3657
3658            // Update rate limit config if provided
3659            if let Some(rate_json) = config_update.rate_limit {
3660                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3661                    config.rate_limit = Some(rate);
3662                }
3663            }
3664
3665            // Update traffic shaping config if provided
3666            if let Some(traffic_json) = config_update.traffic_shaping {
3667                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3668                    config.traffic_shaping = Some(traffic);
3669                }
3670            }
3671
3672            // Reinitialize middleware injectors with new config
3673            // The middleware will pick up the changes on the next request
3674            drop(config);
3675
3676            info!("Chaos configuration updated successfully");
3677            Json(serde_json::json!({
3678                "success": true,
3679                "message": "Chaos configuration updated and applied"
3680            }))
3681            .into_response()
3682        } else {
3683            (
3684                StatusCode::SERVICE_UNAVAILABLE,
3685                Json(serde_json::json!({
3686                    "success": false,
3687                    "error": "Chaos API not available",
3688                    "message": "Chaos engineering is not enabled or configured"
3689                })),
3690            )
3691                .into_response()
3692        }
3693    }
3694    #[cfg(not(feature = "chaos"))]
3695    {
3696        (
3697            StatusCode::NOT_IMPLEMENTED,
3698            Json(serde_json::json!({
3699                "success": false,
3700                "error": "Chaos feature not enabled",
3701                "message": "Chaos engineering feature is not compiled into this build"
3702            })),
3703        )
3704            .into_response()
3705    }
3706}
3707
3708// ========== Network Profile Management ==========
3709
3710/// List available network profiles
3711async fn list_network_profiles() -> impl IntoResponse {
3712    use mockforge_core::network_profiles::NetworkProfileCatalog;
3713
3714    let catalog = NetworkProfileCatalog::default();
3715    let profiles: Vec<serde_json::Value> = catalog
3716        .list_profiles_with_description()
3717        .iter()
3718        .map(|(name, description)| {
3719            serde_json::json!({
3720                "name": name,
3721                "description": description,
3722            })
3723        })
3724        .collect();
3725
3726    Json(serde_json::json!({
3727        "profiles": profiles
3728    }))
3729    .into_response()
3730}
3731
3732#[derive(Debug, Deserialize)]
3733/// Request to apply a network profile
3734pub struct ApplyNetworkProfileRequest {
3735    /// Name of the network profile to apply
3736    pub profile_name: String,
3737}
3738
3739/// Apply a network profile
3740async fn apply_network_profile(
3741    State(state): State<ManagementState>,
3742    Json(request): Json<ApplyNetworkProfileRequest>,
3743) -> impl IntoResponse {
3744    use mockforge_core::network_profiles::NetworkProfileCatalog;
3745
3746    let catalog = NetworkProfileCatalog::default();
3747    if let Some(profile) = catalog.get(&request.profile_name) {
3748        // Apply profile to server configuration if available
3749        // NetworkProfile contains latency and traffic_shaping configs
3750        if let Some(server_config) = &state.server_config {
3751            let mut config = server_config.write().await;
3752
3753            // Apply network profile's traffic shaping to core config
3754            use mockforge_core::config::NetworkShapingConfig;
3755
3756            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3757            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3758            // which has bandwidth and burst_loss fields
3759            let network_shaping = NetworkShapingConfig {
3760                enabled: profile.traffic_shaping.bandwidth.enabled
3761                    || profile.traffic_shaping.burst_loss.enabled,
3762                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3763                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3764                max_connections: 1000, // Default value
3765            };
3766
3767            // Update chaos config if it exists, or create it
3768            // Chaos config is in observability.chaos, not core.chaos
3769            if let Some(ref mut chaos) = config.observability.chaos {
3770                chaos.traffic_shaping = Some(network_shaping);
3771            } else {
3772                // Create minimal chaos config with traffic shaping
3773                use mockforge_core::config::ChaosEngConfig;
3774                config.observability.chaos = Some(ChaosEngConfig {
3775                    enabled: true,
3776                    latency: None,
3777                    fault_injection: None,
3778                    rate_limit: None,
3779                    traffic_shaping: Some(network_shaping),
3780                    scenario: None,
3781                });
3782            }
3783
3784            info!("Network profile '{}' applied to server configuration", request.profile_name);
3785        } else {
3786            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3787        }
3788
3789        // Also update chaos API state if available
3790        #[cfg(feature = "chaos")]
3791        {
3792            if let Some(chaos_state) = &state.chaos_api_state {
3793                use mockforge_chaos::config::TrafficShapingConfig;
3794
3795                let mut chaos_config = chaos_state.config.write().await;
3796                // Apply profile's traffic shaping to chaos API state
3797                let chaos_traffic_shaping = TrafficShapingConfig {
3798                    enabled: profile.traffic_shaping.bandwidth.enabled
3799                        || profile.traffic_shaping.burst_loss.enabled,
3800                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3801                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3802                    max_connections: 0,
3803                    connection_timeout_ms: 30000,
3804                };
3805                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3806                chaos_config.enabled = true; // Enable chaos when applying a profile
3807                drop(chaos_config);
3808                info!("Network profile '{}' applied to chaos API state", request.profile_name);
3809            }
3810        }
3811
3812        Json(serde_json::json!({
3813            "success": true,
3814            "message": format!("Network profile '{}' applied", request.profile_name),
3815            "profile": {
3816                "name": profile.name,
3817                "description": profile.description,
3818            }
3819        }))
3820        .into_response()
3821    } else {
3822        (
3823            StatusCode::NOT_FOUND,
3824            Json(serde_json::json!({
3825                "error": "Profile not found",
3826                "message": format!("Network profile '{}' not found", request.profile_name)
3827            })),
3828        )
3829            .into_response()
3830    }
3831}
3832
3833/// Build the management API router with UI Builder support
3834pub fn management_router_with_ui_builder(
3835    state: ManagementState,
3836    server_config: mockforge_core::config::ServerConfig,
3837) -> Router {
3838    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3839
3840    // Create the base management router
3841    let management = management_router(state);
3842
3843    // Create UI Builder state and router
3844    let ui_builder_state = UIBuilderState::new(server_config);
3845    let ui_builder = create_ui_builder_router(ui_builder_state);
3846
3847    // Nest UI Builder under /ui-builder
3848    management.nest("/ui-builder", ui_builder)
3849}
3850
3851/// Build management router with spec import API
3852pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3853    use crate::spec_import::{spec_import_router, SpecImportState};
3854
3855    // Create base management router
3856    let management = management_router(state);
3857
3858    // Merge with spec import router
3859    Router::new()
3860        .merge(management)
3861        .merge(spec_import_router(SpecImportState::new()))
3862}
3863
3864#[cfg(test)]
3865mod tests {
3866    use super::*;
3867
3868    #[tokio::test]
3869    async fn test_create_and_get_mock() {
3870        let state = ManagementState::new(None, None, 3000);
3871
3872        let mock = MockConfig {
3873            id: "test-1".to_string(),
3874            name: "Test Mock".to_string(),
3875            method: "GET".to_string(),
3876            path: "/test".to_string(),
3877            response: MockResponse {
3878                body: serde_json::json!({"message": "test"}),
3879                headers: None,
3880            },
3881            enabled: true,
3882            latency_ms: None,
3883            status_code: Some(200),
3884            request_match: None,
3885            priority: None,
3886            scenario: None,
3887            required_scenario_state: None,
3888            new_scenario_state: None,
3889        };
3890
3891        // Create mock
3892        {
3893            let mut mocks = state.mocks.write().await;
3894            mocks.push(mock.clone());
3895        }
3896
3897        // Get mock
3898        let mocks = state.mocks.read().await;
3899        let found = mocks.iter().find(|m| m.id == "test-1");
3900        assert!(found.is_some());
3901        assert_eq!(found.unwrap().name, "Test Mock");
3902    }
3903
3904    #[tokio::test]
3905    async fn test_server_stats() {
3906        let state = ManagementState::new(None, None, 3000);
3907
3908        // Add some mocks
3909        {
3910            let mut mocks = state.mocks.write().await;
3911            mocks.push(MockConfig {
3912                id: "1".to_string(),
3913                name: "Mock 1".to_string(),
3914                method: "GET".to_string(),
3915                path: "/test1".to_string(),
3916                response: MockResponse {
3917                    body: serde_json::json!({}),
3918                    headers: None,
3919                },
3920                enabled: true,
3921                latency_ms: None,
3922                status_code: Some(200),
3923                request_match: None,
3924                priority: None,
3925                scenario: None,
3926                required_scenario_state: None,
3927                new_scenario_state: None,
3928            });
3929            mocks.push(MockConfig {
3930                id: "2".to_string(),
3931                name: "Mock 2".to_string(),
3932                method: "POST".to_string(),
3933                path: "/test2".to_string(),
3934                response: MockResponse {
3935                    body: serde_json::json!({}),
3936                    headers: None,
3937                },
3938                enabled: false,
3939                latency_ms: None,
3940                status_code: Some(201),
3941                request_match: None,
3942                priority: None,
3943                scenario: None,
3944                required_scenario_state: None,
3945                new_scenario_state: None,
3946            });
3947        }
3948
3949        let mocks = state.mocks.read().await;
3950        assert_eq!(mocks.len(), 2);
3951        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3952    }
3953}