Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128/// Mock response configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131    /// Response body as JSON
132    pub body: serde_json::Value,
133    /// Optional custom response headers
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138/// Request matching criteria for advanced request matching
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141    /// Headers that must be present and match (case-insensitive header names)
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub headers: std::collections::HashMap<String, String>,
144    /// Query parameters that must be present and match
145    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146    pub query_params: std::collections::HashMap<String, String>,
147    /// Request body pattern (supports exact match or regex)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub body_pattern: Option<String>,
150    /// JSONPath expression for JSON body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub json_path: Option<String>,
153    /// XPath expression for XML body matching
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub xpath: Option<String>,
156    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub custom_matcher: Option<String>,
159}
160
161/// Check if a request matches the given mock configuration
162///
163/// This function implements comprehensive request matching including:
164/// - Method and path matching
165/// - Header matching (with regex support)
166/// - Query parameter matching
167/// - Body pattern matching (exact, regex, JSONPath, XPath)
168/// - Custom matcher expressions
169pub fn mock_matches_request(
170    mock: &MockConfig,
171    method: &str,
172    path: &str,
173    headers: &std::collections::HashMap<String, String>,
174    query_params: &std::collections::HashMap<String, String>,
175    body: Option<&[u8]>,
176) -> bool {
177    use regex::Regex;
178
179    // Check if mock is enabled
180    if !mock.enabled {
181        return false;
182    }
183
184    // Check method (case-insensitive)
185    if mock.method.to_uppercase() != method.to_uppercase() {
186        return false;
187    }
188
189    // Check path pattern (supports wildcards and path parameters)
190    if !path_matches_pattern(&mock.path, path) {
191        return false;
192    }
193
194    // Check request matching criteria if present
195    if let Some(criteria) = &mock.request_match {
196        // Check headers
197        for (key, expected_value) in &criteria.headers {
198            let header_key_lower = key.to_lowercase();
199            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201            if let Some((_, actual_value)) = found {
202                // Try regex match first, then exact match
203                if let Ok(re) = Regex::new(expected_value) {
204                    if !re.is_match(actual_value) {
205                        return false;
206                    }
207                } else if actual_value != expected_value {
208                    return false;
209                }
210            } else {
211                return false; // Header not found
212            }
213        }
214
215        // Check query parameters
216        for (key, expected_value) in &criteria.query_params {
217            if let Some(actual_value) = query_params.get(key) {
218                if actual_value != expected_value {
219                    return false;
220                }
221            } else {
222                return false; // Query param not found
223            }
224        }
225
226        // Check body pattern
227        if let Some(pattern) = &criteria.body_pattern {
228            if let Some(body_bytes) = body {
229                let body_str = String::from_utf8_lossy(body_bytes);
230                // Try regex first, then exact match
231                if let Ok(re) = Regex::new(pattern) {
232                    if !re.is_match(&body_str) {
233                        return false;
234                    }
235                } else if body_str.as_ref() != pattern {
236                    return false;
237                }
238            } else {
239                return false; // Body required but not present
240            }
241        }
242
243        // Check JSONPath (simplified implementation)
244        if let Some(json_path) = &criteria.json_path {
245            if let Some(body_bytes) = body {
246                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248                        // Simple JSONPath check
249                        if !json_path_exists(&json_value, json_path) {
250                            return false;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Check XPath (supports a focused subset)
258        if let Some(xpath) = &criteria.xpath {
259            if let Some(body_bytes) = body {
260                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261                    if !xml_xpath_exists(body_str, xpath) {
262                        return false;
263                    }
264                } else {
265                    return false;
266                }
267            } else {
268                return false; // Body required but not present
269            }
270        }
271
272        // Check custom matcher
273        if let Some(custom) = &criteria.custom_matcher {
274            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275                return false;
276            }
277        }
278    }
279
280    true
281}
282
283/// Check if a path matches a pattern (supports wildcards and path parameters)
284fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285    // Exact match
286    if pattern == path {
287        return true;
288    }
289
290    // Wildcard match
291    if pattern == "*" {
292        return true;
293    }
294
295    // Path parameter matching (e.g., /users/{id} matches /users/123)
296    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299    if pattern_parts.len() != path_parts.len() {
300        // Check for wildcard patterns
301        if pattern.contains('*') {
302            return matches_wildcard_pattern(pattern, path);
303        }
304        return false;
305    }
306
307    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308        // Check for path parameters {param}
309        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310            continue; // Matches any value
311        }
312
313        if pattern_part != path_part {
314            return false;
315        }
316    }
317
318    true
319}
320
321/// Check if path matches a wildcard pattern
322fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323    use regex::Regex;
324
325    // Convert pattern to regex
326    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329        return re.is_match(path);
330    }
331
332    false
333}
334
335/// Check if a JSONPath exists in a JSON value (simplified implementation)
336fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
337    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
338    // This handles simple paths like $.field or $.field.subfield
339    if let Some(path) = json_path.strip_prefix("$.") {
340        let parts: Vec<&str> = path.split('.').collect();
341
342        let mut current = json;
343        for part in parts {
344            if let Some(obj) = current.as_object() {
345                if let Some(value) = obj.get(part) {
346                    current = value;
347                } else {
348                    return false;
349                }
350            } else {
351                return false;
352            }
353        }
354        true
355    } else {
356        // For complex JSONPath expressions, would need a proper JSONPath library
357        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
358        false
359    }
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
363struct XPathSegment {
364    name: String,
365    text_equals: Option<String>,
366}
367
368fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
369    if segment.is_empty() {
370        return None;
371    }
372
373    let trimmed = segment.trim();
374    if let Some(bracket_start) = trimmed.find('[') {
375        if !trimmed.ends_with(']') {
376            return None;
377        }
378
379        let name = trimmed[..bracket_start].trim();
380        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
381        let predicate = predicate.trim();
382
383        // Support simple predicate: [text()="value"] or [text()='value']
384        if let Some(raw) = predicate.strip_prefix("text()=") {
385            let raw = raw.trim();
386            if raw.len() >= 2
387                && ((raw.starts_with('"') && raw.ends_with('"'))
388                    || (raw.starts_with('\'') && raw.ends_with('\'')))
389            {
390                let text = raw[1..raw.len() - 1].to_string();
391                if !name.is_empty() {
392                    return Some(XPathSegment {
393                        name: name.to_string(),
394                        text_equals: Some(text),
395                    });
396                }
397            }
398        }
399
400        None
401    } else {
402        Some(XPathSegment {
403            name: trimmed.to_string(),
404            text_equals: None,
405        })
406    }
407}
408
409fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
410    if !node.is_element() {
411        return false;
412    }
413    if node.tag_name().name() != segment.name {
414        return false;
415    }
416    match &segment.text_equals {
417        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
418        None => true,
419    }
420}
421
422/// Check if an XPath expression matches an XML body.
423///
424/// Supported subset:
425/// - Absolute paths: `/root/child/item`
426/// - Descendant search: `//item` and `//parent/child`
427/// - Optional text predicate per segment: `item[text()="value"]`
428fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
429    let doc = match roxmltree::Document::parse(xml_body) {
430        Ok(doc) => doc,
431        Err(err) => {
432            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
433            return false;
434        }
435    };
436
437    let expr = xpath.trim();
438    if expr.is_empty() {
439        return false;
440    }
441
442    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
443        (true, rest)
444    } else if let Some(rest) = expr.strip_prefix('/') {
445        (false, rest)
446    } else {
447        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
448        return false;
449    };
450
451    let segments: Vec<XPathSegment> = path_str
452        .split('/')
453        .filter(|s| !s.trim().is_empty())
454        .filter_map(parse_xpath_segment)
455        .collect();
456
457    if segments.is_empty() {
458        return false;
459    }
460
461    if is_descendant {
462        let first = &segments[0];
463        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
464            let mut frontier = vec![node];
465            for segment in &segments[1..] {
466                let mut next_frontier = Vec::new();
467                for parent in &frontier {
468                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
469                        next_frontier.push(child);
470                    }
471                }
472                if next_frontier.is_empty() {
473                    frontier.clear();
474                    break;
475                }
476                frontier = next_frontier;
477            }
478            if !frontier.is_empty() {
479                return true;
480            }
481        }
482        false
483    } else {
484        let mut frontier = vec![doc.root_element()];
485        for (index, segment) in segments.iter().enumerate() {
486            let mut next_frontier = Vec::new();
487            for parent in &frontier {
488                if index == 0 {
489                    if segment_matches(*parent, segment) {
490                        next_frontier.push(*parent);
491                    }
492                    continue;
493                }
494                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
495                    next_frontier.push(child);
496                }
497            }
498            if next_frontier.is_empty() {
499                return false;
500            }
501            frontier = next_frontier;
502        }
503        !frontier.is_empty()
504    }
505}
506
507/// Evaluate a custom matcher expression
508fn evaluate_custom_matcher(
509    expression: &str,
510    method: &str,
511    path: &str,
512    headers: &std::collections::HashMap<String, String>,
513    query_params: &std::collections::HashMap<String, String>,
514    body: Option<&[u8]>,
515) -> bool {
516    use regex::Regex;
517
518    let expr = expression.trim();
519
520    // Handle equality expressions (field == "value")
521    if expr.contains("==") {
522        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
523        if parts.len() != 2 {
524            return false;
525        }
526
527        let field = parts[0];
528        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
529
530        match field {
531            "method" => method == expected_value,
532            "path" => path == expected_value,
533            _ if field.starts_with("headers.") => {
534                let header_name = &field[8..];
535                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
536            }
537            _ if field.starts_with("query.") => {
538                let param_name = &field[6..];
539                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
540            }
541            _ => false,
542        }
543    }
544    // Handle regex match expressions (field =~ "pattern")
545    else if expr.contains("=~") {
546        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
547        if parts.len() != 2 {
548            return false;
549        }
550
551        let field = parts[0];
552        let pattern = parts[1].trim_matches('"').trim_matches('\'');
553
554        if let Ok(re) = Regex::new(pattern) {
555            match field {
556                "method" => re.is_match(method),
557                "path" => re.is_match(path),
558                _ if field.starts_with("headers.") => {
559                    let header_name = &field[8..];
560                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
561                }
562                _ if field.starts_with("query.") => {
563                    let param_name = &field[6..];
564                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
565                }
566                _ => false,
567            }
568        } else {
569            false
570        }
571    }
572    // Handle contains expressions (field contains "value")
573    else if expr.contains("contains") {
574        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
575        if parts.len() != 2 {
576            return false;
577        }
578
579        let field = parts[0];
580        let search_value = parts[1].trim_matches('"').trim_matches('\'');
581
582        match field {
583            "path" => path.contains(search_value),
584            _ if field.starts_with("headers.") => {
585                let header_name = &field[8..];
586                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
587            }
588            _ if field.starts_with("body") => {
589                if let Some(body_bytes) = body {
590                    let body_str = String::from_utf8_lossy(body_bytes);
591                    body_str.contains(search_value)
592                } else {
593                    false
594                }
595            }
596            _ => false,
597        }
598    } else {
599        // Unknown expression format
600        tracing::warn!("Unknown custom matcher expression format: {}", expr);
601        false
602    }
603}
604
605/// Server statistics
606#[derive(Debug, Clone, Serialize, Deserialize)]
607pub struct ServerStats {
608    /// Server uptime in seconds
609    pub uptime_seconds: u64,
610    /// Total number of requests processed
611    pub total_requests: u64,
612    /// Number of active mock configurations
613    pub active_mocks: usize,
614    /// Number of currently enabled mocks
615    pub enabled_mocks: usize,
616    /// Number of registered API routes
617    pub registered_routes: usize,
618}
619
620/// Server configuration info
621#[derive(Debug, Clone, Serialize, Deserialize)]
622pub struct ServerConfig {
623    /// MockForge version string
624    pub version: String,
625    /// Server port number
626    pub port: u16,
627    /// Whether an OpenAPI spec is loaded
628    pub has_openapi_spec: bool,
629    /// Optional path to the OpenAPI spec file
630    #[serde(skip_serializing_if = "Option::is_none")]
631    pub spec_path: Option<String>,
632}
633
634/// Shared state for the management API
635#[derive(Clone)]
636pub struct ManagementState {
637    /// Collection of mock configurations
638    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
639    /// Optional OpenAPI specification
640    pub spec: Option<Arc<OpenApiSpec>>,
641    /// Optional path to the OpenAPI spec file
642    pub spec_path: Option<String>,
643    /// Server port number
644    pub port: u16,
645    /// Server start time for uptime calculation
646    pub start_time: std::time::Instant,
647    /// Counter for total requests processed
648    pub request_counter: Arc<RwLock<u64>>,
649    /// Optional proxy configuration for migration pipeline
650    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
651    /// Optional SMTP registry for email mocking
652    #[cfg(feature = "smtp")]
653    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
654    /// Optional MQTT broker for message mocking
655    #[cfg(feature = "mqtt")]
656    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
657    /// Optional Kafka broker for event streaming
658    #[cfg(feature = "kafka")]
659    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
660    /// Broadcast channel for message events (MQTT & Kafka)
661    #[cfg(any(feature = "mqtt", feature = "kafka"))]
662    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
663    /// State machine manager for scenario state machines
664    pub state_machine_manager:
665        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
666    /// Optional WebSocket broadcast channel for real-time updates
667    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
668    /// Lifecycle hook registry for extensibility
669    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
670    /// Rule explanations storage (in-memory for now)
671    pub rule_explanations: Arc<
672        RwLock<
673            std::collections::HashMap<
674                String,
675                mockforge_core::intelligent_behavior::RuleExplanation,
676            >,
677        >,
678    >,
679    /// Optional chaos API state for chaos config management
680    #[cfg(feature = "chaos")]
681    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
682    /// Optional server configuration for profile application
683    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
684}
685
686impl ManagementState {
687    /// Create a new management state
688    ///
689    /// # Arguments
690    /// * `spec` - Optional OpenAPI specification
691    /// * `spec_path` - Optional path to the OpenAPI spec file
692    /// * `port` - Server port number
693    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
694        Self {
695            mocks: Arc::new(RwLock::new(Vec::new())),
696            spec,
697            spec_path,
698            port,
699            start_time: std::time::Instant::now(),
700            request_counter: Arc::new(RwLock::new(0)),
701            proxy_config: None,
702            #[cfg(feature = "smtp")]
703            smtp_registry: None,
704            #[cfg(feature = "mqtt")]
705            mqtt_broker: None,
706            #[cfg(feature = "kafka")]
707            kafka_broker: None,
708            #[cfg(any(feature = "mqtt", feature = "kafka"))]
709            message_events: {
710                let capacity = get_message_broadcast_capacity();
711                let (tx, _) = broadcast::channel(capacity);
712                Arc::new(tx)
713            },
714            state_machine_manager: Arc::new(RwLock::new(
715                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
716            )),
717            ws_broadcast: None,
718            lifecycle_hooks: None,
719            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
720            #[cfg(feature = "chaos")]
721            chaos_api_state: None,
722            server_config: None,
723        }
724    }
725
726    /// Add lifecycle hook registry to management state
727    pub fn with_lifecycle_hooks(
728        mut self,
729        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
730    ) -> Self {
731        self.lifecycle_hooks = Some(hooks);
732        self
733    }
734
735    /// Add WebSocket broadcast channel to management state
736    pub fn with_ws_broadcast(
737        mut self,
738        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
739    ) -> Self {
740        self.ws_broadcast = Some(ws_broadcast);
741        self
742    }
743
744    /// Add proxy configuration to management state
745    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
746        self.proxy_config = Some(proxy_config);
747        self
748    }
749
750    #[cfg(feature = "smtp")]
751    /// Add SMTP registry to management state
752    pub fn with_smtp_registry(
753        mut self,
754        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
755    ) -> Self {
756        self.smtp_registry = Some(smtp_registry);
757        self
758    }
759
760    #[cfg(feature = "mqtt")]
761    /// Add MQTT broker to management state
762    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
763        self.mqtt_broker = Some(mqtt_broker);
764        self
765    }
766
767    #[cfg(feature = "kafka")]
768    /// Add Kafka broker to management state
769    pub fn with_kafka_broker(
770        mut self,
771        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
772    ) -> Self {
773        self.kafka_broker = Some(kafka_broker);
774        self
775    }
776
777    #[cfg(feature = "chaos")]
778    /// Add chaos API state to management state
779    pub fn with_chaos_api_state(
780        mut self,
781        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
782    ) -> Self {
783        self.chaos_api_state = Some(chaos_api_state);
784        self
785    }
786
787    /// Add server configuration to management state
788    pub fn with_server_config(
789        mut self,
790        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
791    ) -> Self {
792        self.server_config = Some(server_config);
793        self
794    }
795}
796
797/// List all mocks
798async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
799    let mocks = state.mocks.read().await;
800    Json(serde_json::json!({
801        "mocks": *mocks,
802        "total": mocks.len(),
803        "enabled": mocks.iter().filter(|m| m.enabled).count()
804    }))
805}
806
807/// Get a specific mock by ID
808async fn get_mock(
809    State(state): State<ManagementState>,
810    Path(id): Path<String>,
811) -> Result<Json<MockConfig>, StatusCode> {
812    let mocks = state.mocks.read().await;
813    mocks
814        .iter()
815        .find(|m| m.id == id)
816        .cloned()
817        .map(Json)
818        .ok_or(StatusCode::NOT_FOUND)
819}
820
821/// Create a new mock
822async fn create_mock(
823    State(state): State<ManagementState>,
824    Json(mut mock): Json<MockConfig>,
825) -> Result<Json<MockConfig>, StatusCode> {
826    let mut mocks = state.mocks.write().await;
827
828    // Generate ID if not provided
829    if mock.id.is_empty() {
830        mock.id = uuid::Uuid::new_v4().to_string();
831    }
832
833    // Check for duplicate ID
834    if mocks.iter().any(|m| m.id == mock.id) {
835        return Err(StatusCode::CONFLICT);
836    }
837
838    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
839
840    // Invoke lifecycle hooks
841    if let Some(hooks) = &state.lifecycle_hooks {
842        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
843            id: mock.id.clone(),
844            name: mock.name.clone(),
845            config: serde_json::to_value(&mock).unwrap_or_default(),
846        };
847        hooks.invoke_mock_created(&event).await;
848    }
849
850    mocks.push(mock.clone());
851
852    // Broadcast WebSocket event
853    if let Some(tx) = &state.ws_broadcast {
854        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
855    }
856
857    Ok(Json(mock))
858}
859
860/// Update an existing mock
861async fn update_mock(
862    State(state): State<ManagementState>,
863    Path(id): Path<String>,
864    Json(updated_mock): Json<MockConfig>,
865) -> Result<Json<MockConfig>, StatusCode> {
866    let mut mocks = state.mocks.write().await;
867
868    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
869
870    // Get old mock for comparison
871    let old_mock = mocks[position].clone();
872
873    info!("Updating mock: {}", id);
874    mocks[position] = updated_mock.clone();
875
876    // Invoke lifecycle hooks
877    if let Some(hooks) = &state.lifecycle_hooks {
878        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
879            id: updated_mock.id.clone(),
880            name: updated_mock.name.clone(),
881            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
882        };
883        hooks.invoke_mock_updated(&event).await;
884
885        // Check if enabled state changed
886        if old_mock.enabled != updated_mock.enabled {
887            let state_event = if updated_mock.enabled {
888                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
889                    id: updated_mock.id.clone(),
890                }
891            } else {
892                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
893                    id: updated_mock.id.clone(),
894                }
895            };
896            hooks.invoke_mock_state_changed(&state_event).await;
897        }
898    }
899
900    // Broadcast WebSocket event
901    if let Some(tx) = &state.ws_broadcast {
902        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
903    }
904
905    Ok(Json(updated_mock))
906}
907
908/// Delete a mock
909async fn delete_mock(
910    State(state): State<ManagementState>,
911    Path(id): Path<String>,
912) -> Result<StatusCode, StatusCode> {
913    let mut mocks = state.mocks.write().await;
914
915    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
916
917    // Get mock info before deletion for lifecycle hooks
918    let deleted_mock = mocks[position].clone();
919
920    info!("Deleting mock: {}", id);
921    mocks.remove(position);
922
923    // Invoke lifecycle hooks
924    if let Some(hooks) = &state.lifecycle_hooks {
925        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
926            id: deleted_mock.id.clone(),
927            name: deleted_mock.name.clone(),
928        };
929        hooks.invoke_mock_deleted(&event).await;
930    }
931
932    // Broadcast WebSocket event
933    if let Some(tx) = &state.ws_broadcast {
934        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
935    }
936
937    Ok(StatusCode::NO_CONTENT)
938}
939
940/// Request to validate configuration
941#[derive(Debug, Deserialize)]
942pub struct ValidateConfigRequest {
943    /// Configuration to validate (as JSON)
944    pub config: serde_json::Value,
945    /// Format of the configuration ("json" or "yaml")
946    #[serde(default = "default_format")]
947    pub format: String,
948}
949
950fn default_format() -> String {
951    "json".to_string()
952}
953
954/// Validate configuration without applying it
955async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
956    use mockforge_core::config::ServerConfig;
957
958    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
959        "yaml" | "yml" => {
960            let yaml_str = match serde_json::to_string(&request.config) {
961                Ok(s) => s,
962                Err(e) => {
963                    return (
964                        StatusCode::BAD_REQUEST,
965                        Json(serde_json::json!({
966                            "valid": false,
967                            "error": format!("Failed to convert to string: {}", e),
968                            "message": "Configuration validation failed"
969                        })),
970                    )
971                        .into_response();
972                }
973            };
974            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
975        }
976        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
977    };
978
979    match config_result {
980        Ok(_) => Json(serde_json::json!({
981            "valid": true,
982            "message": "Configuration is valid"
983        }))
984        .into_response(),
985        Err(e) => (
986            StatusCode::BAD_REQUEST,
987            Json(serde_json::json!({
988                "valid": false,
989                "error": format!("Invalid configuration: {}", e),
990                "message": "Configuration validation failed"
991            })),
992        )
993            .into_response(),
994    }
995}
996
997/// Request for bulk configuration update
998#[derive(Debug, Deserialize)]
999pub struct BulkConfigUpdateRequest {
1000    /// Partial configuration updates (only specified fields will be updated)
1001    pub updates: serde_json::Value,
1002}
1003
1004/// Bulk update configuration
1005///
1006/// This endpoint allows updating multiple configuration options at once.
1007/// Only the specified fields in the updates object will be modified.
1008///
1009/// Configuration updates are applied to the server configuration if available
1010/// in ManagementState. Changes take effect immediately for supported settings.
1011async fn bulk_update_config(
1012    State(state): State<ManagementState>,
1013    Json(request): Json<BulkConfigUpdateRequest>,
1014) -> impl IntoResponse {
1015    // Validate the updates structure
1016    if !request.updates.is_object() {
1017        return (
1018            StatusCode::BAD_REQUEST,
1019            Json(serde_json::json!({
1020                "error": "Invalid request",
1021                "message": "Updates must be a JSON object"
1022            })),
1023        )
1024            .into_response();
1025    }
1026
1027    // Try to validate as partial ServerConfig
1028    use mockforge_core::config::ServerConfig;
1029
1030    // Create a minimal valid config and try to merge updates
1031    let base_config = ServerConfig::default();
1032    let base_json = match serde_json::to_value(&base_config) {
1033        Ok(v) => v,
1034        Err(e) => {
1035            return (
1036                StatusCode::INTERNAL_SERVER_ERROR,
1037                Json(serde_json::json!({
1038                    "error": "Internal error",
1039                    "message": format!("Failed to serialize base config: {}", e)
1040                })),
1041            )
1042                .into_response();
1043        }
1044    };
1045
1046    // Merge updates into base config (simplified merge)
1047    let mut merged = base_json.clone();
1048    if let (Some(merged_obj), Some(updates_obj)) =
1049        (merged.as_object_mut(), request.updates.as_object())
1050    {
1051        for (key, value) in updates_obj {
1052            merged_obj.insert(key.clone(), value.clone());
1053        }
1054    }
1055
1056    // Validate the merged config
1057    match serde_json::from_value::<ServerConfig>(merged) {
1058        Ok(validated_config) => {
1059            // Apply config if server_config is available in ManagementState
1060            if let Some(ref config_lock) = state.server_config {
1061                let mut config = config_lock.write().await;
1062                *config = validated_config;
1063                Json(serde_json::json!({
1064                    "success": true,
1065                    "message": "Bulk configuration update applied successfully",
1066                    "updates_received": request.updates,
1067                    "validated": true,
1068                    "applied": true
1069                }))
1070                .into_response()
1071            } else {
1072                Json(serde_json::json!({
1073                    "success": true,
1074                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1075                    "updates_received": request.updates,
1076                    "validated": true,
1077                    "applied": false
1078                }))
1079                .into_response()
1080            }
1081        }
1082        Err(e) => (
1083            StatusCode::BAD_REQUEST,
1084            Json(serde_json::json!({
1085                "error": "Invalid configuration",
1086                "message": format!("Configuration validation failed: {}", e),
1087                "validated": false
1088            })),
1089        )
1090            .into_response(),
1091    }
1092}
1093
1094/// Get server statistics
1095async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1096    let mocks = state.mocks.read().await;
1097    let request_count = *state.request_counter.read().await;
1098
1099    Json(ServerStats {
1100        uptime_seconds: state.start_time.elapsed().as_secs(),
1101        total_requests: request_count,
1102        active_mocks: mocks.len(),
1103        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1104        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1105    })
1106}
1107
1108/// Get server configuration
1109async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1110    Json(ServerConfig {
1111        version: env!("CARGO_PKG_VERSION").to_string(),
1112        port: state.port,
1113        has_openapi_spec: state.spec.is_some(),
1114        spec_path: state.spec_path.clone(),
1115    })
1116}
1117
1118/// Health check endpoint
1119async fn health_check() -> Json<serde_json::Value> {
1120    Json(serde_json::json!({
1121        "status": "healthy",
1122        "service": "mockforge-management",
1123        "timestamp": chrono::Utc::now().to_rfc3339()
1124    }))
1125}
1126
1127/// Export format for mock configurations
1128#[derive(Debug, Clone, Serialize, Deserialize)]
1129#[serde(rename_all = "lowercase")]
1130pub enum ExportFormat {
1131    /// JSON format
1132    Json,
1133    /// YAML format
1134    Yaml,
1135}
1136
1137/// Export mocks in specified format
1138async fn export_mocks(
1139    State(state): State<ManagementState>,
1140    Query(params): Query<std::collections::HashMap<String, String>>,
1141) -> Result<(StatusCode, String), StatusCode> {
1142    let mocks = state.mocks.read().await;
1143
1144    let format = params
1145        .get("format")
1146        .map(|f| match f.as_str() {
1147            "yaml" | "yml" => ExportFormat::Yaml,
1148            _ => ExportFormat::Json,
1149        })
1150        .unwrap_or(ExportFormat::Json);
1151
1152    match format {
1153        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1154            .map(|json| (StatusCode::OK, json))
1155            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1156        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1157            .map(|yaml| (StatusCode::OK, yaml))
1158            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1159    }
1160}
1161
1162/// Import mocks from JSON/YAML
1163async fn import_mocks(
1164    State(state): State<ManagementState>,
1165    Json(mocks): Json<Vec<MockConfig>>,
1166) -> impl IntoResponse {
1167    let mut current_mocks = state.mocks.write().await;
1168    current_mocks.clear();
1169    current_mocks.extend(mocks);
1170    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1171}
1172
1173#[cfg(feature = "smtp")]
1174/// List SMTP emails in mailbox
1175async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1176    if let Some(ref smtp_registry) = state.smtp_registry {
1177        match smtp_registry.get_emails() {
1178            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1179            Err(e) => (
1180                StatusCode::INTERNAL_SERVER_ERROR,
1181                Json(serde_json::json!({
1182                    "error": "Failed to retrieve emails",
1183                    "message": e.to_string()
1184                })),
1185            ),
1186        }
1187    } else {
1188        (
1189            StatusCode::NOT_IMPLEMENTED,
1190            Json(serde_json::json!({
1191                "error": "SMTP mailbox management not available",
1192                "message": "SMTP server is not enabled or registry not available."
1193            })),
1194        )
1195    }
1196}
1197
1198/// Get specific SMTP email
1199#[cfg(feature = "smtp")]
1200async fn get_smtp_email(
1201    State(state): State<ManagementState>,
1202    Path(id): Path<String>,
1203) -> impl IntoResponse {
1204    if let Some(ref smtp_registry) = state.smtp_registry {
1205        match smtp_registry.get_email_by_id(&id) {
1206            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1207            Ok(None) => (
1208                StatusCode::NOT_FOUND,
1209                Json(serde_json::json!({
1210                    "error": "Email not found",
1211                    "id": id
1212                })),
1213            ),
1214            Err(e) => (
1215                StatusCode::INTERNAL_SERVER_ERROR,
1216                Json(serde_json::json!({
1217                    "error": "Failed to retrieve email",
1218                    "message": e.to_string()
1219                })),
1220            ),
1221        }
1222    } else {
1223        (
1224            StatusCode::NOT_IMPLEMENTED,
1225            Json(serde_json::json!({
1226                "error": "SMTP mailbox management not available",
1227                "message": "SMTP server is not enabled or registry not available."
1228            })),
1229        )
1230    }
1231}
1232
1233/// Clear SMTP mailbox
1234#[cfg(feature = "smtp")]
1235async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1236    if let Some(ref smtp_registry) = state.smtp_registry {
1237        match smtp_registry.clear_mailbox() {
1238            Ok(()) => (
1239                StatusCode::OK,
1240                Json(serde_json::json!({
1241                    "message": "Mailbox cleared successfully"
1242                })),
1243            ),
1244            Err(e) => (
1245                StatusCode::INTERNAL_SERVER_ERROR,
1246                Json(serde_json::json!({
1247                    "error": "Failed to clear mailbox",
1248                    "message": e.to_string()
1249                })),
1250            ),
1251        }
1252    } else {
1253        (
1254            StatusCode::NOT_IMPLEMENTED,
1255            Json(serde_json::json!({
1256                "error": "SMTP mailbox management not available",
1257                "message": "SMTP server is not enabled or registry not available."
1258            })),
1259        )
1260    }
1261}
1262
1263/// Export SMTP mailbox
1264#[cfg(feature = "smtp")]
1265async fn export_smtp_mailbox(
1266    Query(params): Query<std::collections::HashMap<String, String>>,
1267) -> impl IntoResponse {
1268    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1269    (
1270        StatusCode::NOT_IMPLEMENTED,
1271        Json(serde_json::json!({
1272            "error": "SMTP mailbox management not available via HTTP API",
1273            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1274            "requested_format": format
1275        })),
1276    )
1277}
1278
1279/// Search SMTP emails
1280#[cfg(feature = "smtp")]
1281async fn search_smtp_emails(
1282    State(state): State<ManagementState>,
1283    Query(params): Query<std::collections::HashMap<String, String>>,
1284) -> impl IntoResponse {
1285    if let Some(ref smtp_registry) = state.smtp_registry {
1286        let filters = EmailSearchFilters {
1287            sender: params.get("sender").cloned(),
1288            recipient: params.get("recipient").cloned(),
1289            subject: params.get("subject").cloned(),
1290            body: params.get("body").cloned(),
1291            since: params
1292                .get("since")
1293                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1294                .map(|dt| dt.with_timezone(&chrono::Utc)),
1295            until: params
1296                .get("until")
1297                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1298                .map(|dt| dt.with_timezone(&chrono::Utc)),
1299            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1300            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1301        };
1302
1303        match smtp_registry.search_emails(filters) {
1304            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1305            Err(e) => (
1306                StatusCode::INTERNAL_SERVER_ERROR,
1307                Json(serde_json::json!({
1308                    "error": "Failed to search emails",
1309                    "message": e.to_string()
1310                })),
1311            ),
1312        }
1313    } else {
1314        (
1315            StatusCode::NOT_IMPLEMENTED,
1316            Json(serde_json::json!({
1317                "error": "SMTP mailbox management not available",
1318                "message": "SMTP server is not enabled or registry not available."
1319            })),
1320        )
1321    }
1322}
1323
1324/// MQTT broker statistics
1325#[cfg(feature = "mqtt")]
1326#[derive(Debug, Clone, Serialize, Deserialize)]
1327pub struct MqttBrokerStats {
1328    /// Number of connected MQTT clients
1329    pub connected_clients: usize,
1330    /// Number of active MQTT topics
1331    pub active_topics: usize,
1332    /// Number of retained messages
1333    pub retained_messages: usize,
1334    /// Total number of subscriptions
1335    pub total_subscriptions: usize,
1336}
1337
1338/// MQTT management handlers
1339#[cfg(feature = "mqtt")]
1340async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1341    if let Some(broker) = &state.mqtt_broker {
1342        let connected_clients = broker.get_connected_clients().await.len();
1343        let active_topics = broker.get_active_topics().await.len();
1344        let stats = broker.get_topic_stats().await;
1345
1346        let broker_stats = MqttBrokerStats {
1347            connected_clients,
1348            active_topics,
1349            retained_messages: stats.retained_messages,
1350            total_subscriptions: stats.total_subscriptions,
1351        };
1352
1353        Json(broker_stats).into_response()
1354    } else {
1355        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1356    }
1357}
1358
1359#[cfg(feature = "mqtt")]
1360async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1361    if let Some(broker) = &state.mqtt_broker {
1362        let clients = broker.get_connected_clients().await;
1363        Json(serde_json::json!({
1364            "clients": clients
1365        }))
1366        .into_response()
1367    } else {
1368        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1369    }
1370}
1371
1372#[cfg(feature = "mqtt")]
1373async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1374    if let Some(broker) = &state.mqtt_broker {
1375        let topics = broker.get_active_topics().await;
1376        Json(serde_json::json!({
1377            "topics": topics
1378        }))
1379        .into_response()
1380    } else {
1381        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1382    }
1383}
1384
1385#[cfg(feature = "mqtt")]
1386async fn disconnect_mqtt_client(
1387    State(state): State<ManagementState>,
1388    Path(client_id): Path<String>,
1389) -> impl IntoResponse {
1390    if let Some(broker) = &state.mqtt_broker {
1391        match broker.disconnect_client(&client_id).await {
1392            Ok(_) => {
1393                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1394            }
1395            Err(e) => {
1396                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1397                    .into_response()
1398            }
1399        }
1400    } else {
1401        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1402    }
1403}
1404
1405// ========== MQTT Publish Handler ==========
1406
1407#[cfg(feature = "mqtt")]
1408/// Request to publish a single MQTT message
1409#[derive(Debug, Deserialize)]
1410pub struct MqttPublishRequest {
1411    /// Topic to publish to
1412    pub topic: String,
1413    /// Message payload (string or JSON)
1414    pub payload: String,
1415    /// QoS level (0, 1, or 2)
1416    #[serde(default = "default_qos")]
1417    pub qos: u8,
1418    /// Whether to retain the message
1419    #[serde(default)]
1420    pub retain: bool,
1421}
1422
1423#[cfg(feature = "mqtt")]
1424fn default_qos() -> u8 {
1425    0
1426}
1427
1428#[cfg(feature = "mqtt")]
1429/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1430async fn publish_mqtt_message_handler(
1431    State(state): State<ManagementState>,
1432    Json(request): Json<serde_json::Value>,
1433) -> impl IntoResponse {
1434    // Extract fields from JSON manually
1435    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1436    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1437    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1438    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1439
1440    if topic.is_none() || payload.is_none() {
1441        return (
1442            StatusCode::BAD_REQUEST,
1443            Json(serde_json::json!({
1444                "error": "Invalid request",
1445                "message": "Missing required fields: topic and payload"
1446            })),
1447        );
1448    }
1449
1450    let topic = topic.unwrap();
1451    let payload = payload.unwrap();
1452
1453    if let Some(broker) = &state.mqtt_broker {
1454        // Validate QoS
1455        if qos > 2 {
1456            return (
1457                StatusCode::BAD_REQUEST,
1458                Json(serde_json::json!({
1459                    "error": "Invalid QoS",
1460                    "message": "QoS must be 0, 1, or 2"
1461                })),
1462            );
1463        }
1464
1465        // Convert payload to bytes
1466        let payload_bytes = payload.as_bytes().to_vec();
1467        let client_id = "mockforge-management-api".to_string();
1468
1469        let publish_result = broker
1470            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1471            .await
1472            .map_err(|e| format!("{}", e));
1473
1474        match publish_result {
1475            Ok(_) => {
1476                // Emit message event for real-time monitoring
1477                let event = MessageEvent::Mqtt(MqttMessageEvent {
1478                    topic: topic.clone(),
1479                    payload: payload.clone(),
1480                    qos,
1481                    retain,
1482                    timestamp: chrono::Utc::now().to_rfc3339(),
1483                });
1484                let _ = state.message_events.send(event);
1485
1486                (
1487                    StatusCode::OK,
1488                    Json(serde_json::json!({
1489                        "success": true,
1490                        "message": format!("Message published to topic '{}'", topic),
1491                        "topic": topic,
1492                        "qos": qos,
1493                        "retain": retain
1494                    })),
1495                )
1496            }
1497            Err(error_msg) => (
1498                StatusCode::INTERNAL_SERVER_ERROR,
1499                Json(serde_json::json!({
1500                    "error": "Failed to publish message",
1501                    "message": error_msg
1502                })),
1503            ),
1504        }
1505    } else {
1506        (
1507            StatusCode::SERVICE_UNAVAILABLE,
1508            Json(serde_json::json!({
1509                "error": "MQTT broker not available",
1510                "message": "MQTT broker is not enabled or not available."
1511            })),
1512        )
1513    }
1514}
1515
1516#[cfg(not(feature = "mqtt"))]
1517/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1518async fn publish_mqtt_message_handler(
1519    State(_state): State<ManagementState>,
1520    Json(_request): Json<serde_json::Value>,
1521) -> impl IntoResponse {
1522    (
1523        StatusCode::SERVICE_UNAVAILABLE,
1524        Json(serde_json::json!({
1525            "error": "MQTT feature not enabled",
1526            "message": "MQTT support is not compiled into this build"
1527        })),
1528    )
1529}
1530
1531#[cfg(feature = "mqtt")]
1532/// Request to publish multiple MQTT messages
1533#[derive(Debug, Deserialize)]
1534pub struct MqttBatchPublishRequest {
1535    /// List of messages to publish
1536    pub messages: Vec<MqttPublishRequest>,
1537    /// Delay between messages in milliseconds
1538    #[serde(default = "default_delay")]
1539    pub delay_ms: u64,
1540}
1541
1542#[cfg(feature = "mqtt")]
1543fn default_delay() -> u64 {
1544    100
1545}
1546
1547#[cfg(feature = "mqtt")]
1548/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1549async fn publish_mqtt_batch_handler(
1550    State(state): State<ManagementState>,
1551    Json(request): Json<serde_json::Value>,
1552) -> impl IntoResponse {
1553    // Extract fields from JSON manually
1554    let messages_json = request.get("messages").and_then(|v| v.as_array());
1555    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1556
1557    if messages_json.is_none() {
1558        return (
1559            StatusCode::BAD_REQUEST,
1560            Json(serde_json::json!({
1561                "error": "Invalid request",
1562                "message": "Missing required field: messages"
1563            })),
1564        );
1565    }
1566
1567    let messages_json = messages_json.unwrap();
1568
1569    if let Some(broker) = &state.mqtt_broker {
1570        if messages_json.is_empty() {
1571            return (
1572                StatusCode::BAD_REQUEST,
1573                Json(serde_json::json!({
1574                    "error": "Empty batch",
1575                    "message": "At least one message is required"
1576                })),
1577            );
1578        }
1579
1580        let mut results = Vec::new();
1581        let client_id = "mockforge-management-api".to_string();
1582
1583        for (index, msg_json) in messages_json.iter().enumerate() {
1584            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1585            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1586            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1587            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1588
1589            if topic.is_none() || payload.is_none() {
1590                results.push(serde_json::json!({
1591                    "index": index,
1592                    "success": false,
1593                    "error": "Missing required fields: topic and payload"
1594                }));
1595                continue;
1596            }
1597
1598            let topic = topic.unwrap();
1599            let payload = payload.unwrap();
1600
1601            // Validate QoS
1602            if qos > 2 {
1603                results.push(serde_json::json!({
1604                    "index": index,
1605                    "success": false,
1606                    "error": "Invalid QoS (must be 0, 1, or 2)"
1607                }));
1608                continue;
1609            }
1610
1611            // Convert payload to bytes
1612            let payload_bytes = payload.as_bytes().to_vec();
1613
1614            let publish_result = broker
1615                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1616                .await
1617                .map_err(|e| format!("{}", e));
1618
1619            match publish_result {
1620                Ok(_) => {
1621                    // Emit message event
1622                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1623                        topic: topic.clone(),
1624                        payload: payload.clone(),
1625                        qos,
1626                        retain,
1627                        timestamp: chrono::Utc::now().to_rfc3339(),
1628                    });
1629                    let _ = state.message_events.send(event);
1630
1631                    results.push(serde_json::json!({
1632                        "index": index,
1633                        "success": true,
1634                        "topic": topic,
1635                        "qos": qos
1636                    }));
1637                }
1638                Err(error_msg) => {
1639                    results.push(serde_json::json!({
1640                        "index": index,
1641                        "success": false,
1642                        "error": error_msg
1643                    }));
1644                }
1645            }
1646
1647            // Add delay between messages (except for the last one)
1648            if index < messages_json.len() - 1 && delay_ms > 0 {
1649                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1650            }
1651        }
1652
1653        let success_count =
1654            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1655
1656        (
1657            StatusCode::OK,
1658            Json(serde_json::json!({
1659                "success": true,
1660                "total": messages_json.len(),
1661                "succeeded": success_count,
1662                "failed": messages_json.len() - success_count,
1663                "results": results
1664            })),
1665        )
1666    } else {
1667        (
1668            StatusCode::SERVICE_UNAVAILABLE,
1669            Json(serde_json::json!({
1670                "error": "MQTT broker not available",
1671                "message": "MQTT broker is not enabled or not available."
1672            })),
1673        )
1674    }
1675}
1676
1677#[cfg(not(feature = "mqtt"))]
1678/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1679async fn publish_mqtt_batch_handler(
1680    State(_state): State<ManagementState>,
1681    Json(_request): Json<serde_json::Value>,
1682) -> impl IntoResponse {
1683    (
1684        StatusCode::SERVICE_UNAVAILABLE,
1685        Json(serde_json::json!({
1686            "error": "MQTT feature not enabled",
1687            "message": "MQTT support is not compiled into this build"
1688        })),
1689    )
1690}
1691
1692// Migration pipeline handlers
1693
1694/// Request to set migration mode
1695#[derive(Debug, Deserialize)]
1696struct SetMigrationModeRequest {
1697    mode: String,
1698}
1699
1700/// Get all migration routes
1701async fn get_migration_routes(
1702    State(state): State<ManagementState>,
1703) -> Result<Json<serde_json::Value>, StatusCode> {
1704    let proxy_config = match &state.proxy_config {
1705        Some(config) => config,
1706        None => {
1707            return Ok(Json(serde_json::json!({
1708                "error": "Migration not configured. Proxy config not available."
1709            })));
1710        }
1711    };
1712
1713    let config = proxy_config.read().await;
1714    let routes = config.get_migration_routes();
1715
1716    Ok(Json(serde_json::json!({
1717        "routes": routes
1718    })))
1719}
1720
1721/// Toggle a route's migration mode
1722async fn toggle_route_migration(
1723    State(state): State<ManagementState>,
1724    Path(pattern): Path<String>,
1725) -> Result<Json<serde_json::Value>, StatusCode> {
1726    let proxy_config = match &state.proxy_config {
1727        Some(config) => config,
1728        None => {
1729            return Ok(Json(serde_json::json!({
1730                "error": "Migration not configured. Proxy config not available."
1731            })));
1732        }
1733    };
1734
1735    let mut config = proxy_config.write().await;
1736    let new_mode = match config.toggle_route_migration(&pattern) {
1737        Some(mode) => mode,
1738        None => {
1739            return Ok(Json(serde_json::json!({
1740                "error": format!("Route pattern not found: {}", pattern)
1741            })));
1742        }
1743    };
1744
1745    Ok(Json(serde_json::json!({
1746        "pattern": pattern,
1747        "mode": format!("{:?}", new_mode).to_lowercase()
1748    })))
1749}
1750
1751/// Set a route's migration mode explicitly
1752async fn set_route_migration_mode(
1753    State(state): State<ManagementState>,
1754    Path(pattern): Path<String>,
1755    Json(request): Json<SetMigrationModeRequest>,
1756) -> Result<Json<serde_json::Value>, StatusCode> {
1757    let proxy_config = match &state.proxy_config {
1758        Some(config) => config,
1759        None => {
1760            return Ok(Json(serde_json::json!({
1761                "error": "Migration not configured. Proxy config not available."
1762            })));
1763        }
1764    };
1765
1766    use mockforge_core::proxy::config::MigrationMode;
1767    let mode = match request.mode.to_lowercase().as_str() {
1768        "mock" => MigrationMode::Mock,
1769        "shadow" => MigrationMode::Shadow,
1770        "real" => MigrationMode::Real,
1771        "auto" => MigrationMode::Auto,
1772        _ => {
1773            return Ok(Json(serde_json::json!({
1774                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1775            })));
1776        }
1777    };
1778
1779    let mut config = proxy_config.write().await;
1780    let updated = config.update_rule_migration_mode(&pattern, mode);
1781
1782    if !updated {
1783        return Ok(Json(serde_json::json!({
1784            "error": format!("Route pattern not found: {}", pattern)
1785        })));
1786    }
1787
1788    Ok(Json(serde_json::json!({
1789        "pattern": pattern,
1790        "mode": format!("{:?}", mode).to_lowercase()
1791    })))
1792}
1793
1794/// Toggle a group's migration mode
1795async fn toggle_group_migration(
1796    State(state): State<ManagementState>,
1797    Path(group): Path<String>,
1798) -> Result<Json<serde_json::Value>, StatusCode> {
1799    let proxy_config = match &state.proxy_config {
1800        Some(config) => config,
1801        None => {
1802            return Ok(Json(serde_json::json!({
1803                "error": "Migration not configured. Proxy config not available."
1804            })));
1805        }
1806    };
1807
1808    let mut config = proxy_config.write().await;
1809    let new_mode = config.toggle_group_migration(&group);
1810
1811    Ok(Json(serde_json::json!({
1812        "group": group,
1813        "mode": format!("{:?}", new_mode).to_lowercase()
1814    })))
1815}
1816
1817/// Set a group's migration mode explicitly
1818async fn set_group_migration_mode(
1819    State(state): State<ManagementState>,
1820    Path(group): Path<String>,
1821    Json(request): Json<SetMigrationModeRequest>,
1822) -> Result<Json<serde_json::Value>, StatusCode> {
1823    let proxy_config = match &state.proxy_config {
1824        Some(config) => config,
1825        None => {
1826            return Ok(Json(serde_json::json!({
1827                "error": "Migration not configured. Proxy config not available."
1828            })));
1829        }
1830    };
1831
1832    use mockforge_core::proxy::config::MigrationMode;
1833    let mode = match request.mode.to_lowercase().as_str() {
1834        "mock" => MigrationMode::Mock,
1835        "shadow" => MigrationMode::Shadow,
1836        "real" => MigrationMode::Real,
1837        "auto" => MigrationMode::Auto,
1838        _ => {
1839            return Ok(Json(serde_json::json!({
1840                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1841            })));
1842        }
1843    };
1844
1845    let mut config = proxy_config.write().await;
1846    config.update_group_migration_mode(&group, mode);
1847
1848    Ok(Json(serde_json::json!({
1849        "group": group,
1850        "mode": format!("{:?}", mode).to_lowercase()
1851    })))
1852}
1853
1854/// Get all migration groups
1855async fn get_migration_groups(
1856    State(state): State<ManagementState>,
1857) -> Result<Json<serde_json::Value>, StatusCode> {
1858    let proxy_config = match &state.proxy_config {
1859        Some(config) => config,
1860        None => {
1861            return Ok(Json(serde_json::json!({
1862                "error": "Migration not configured. Proxy config not available."
1863            })));
1864        }
1865    };
1866
1867    let config = proxy_config.read().await;
1868    let groups = config.get_migration_groups();
1869
1870    // Convert to JSON-serializable format
1871    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1872        .into_iter()
1873        .map(|(name, info)| {
1874            (
1875                name,
1876                serde_json::json!({
1877                    "name": info.name,
1878                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1879                    "route_count": info.route_count
1880                }),
1881            )
1882        })
1883        .collect();
1884
1885    Ok(Json(serde_json::json!(groups_json)))
1886}
1887
1888/// Get overall migration status
1889async fn get_migration_status(
1890    State(state): State<ManagementState>,
1891) -> Result<Json<serde_json::Value>, StatusCode> {
1892    let proxy_config = match &state.proxy_config {
1893        Some(config) => config,
1894        None => {
1895            return Ok(Json(serde_json::json!({
1896                "error": "Migration not configured. Proxy config not available."
1897            })));
1898        }
1899    };
1900
1901    let config = proxy_config.read().await;
1902    let routes = config.get_migration_routes();
1903    let groups = config.get_migration_groups();
1904
1905    let mut mock_count = 0;
1906    let mut shadow_count = 0;
1907    let mut real_count = 0;
1908    let mut auto_count = 0;
1909
1910    for route in &routes {
1911        match route.migration_mode {
1912            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1913            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1914            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1915            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1916        }
1917    }
1918
1919    Ok(Json(serde_json::json!({
1920        "total_routes": routes.len(),
1921        "mock_routes": mock_count,
1922        "shadow_routes": shadow_count,
1923        "real_routes": real_count,
1924        "auto_routes": auto_count,
1925        "total_groups": groups.len(),
1926        "migration_enabled": config.migration_enabled
1927    })))
1928}
1929
1930// ========== Proxy Replacement Rules Management ==========
1931
1932/// Request body for creating/updating proxy replacement rules
1933#[derive(Debug, Deserialize, Serialize)]
1934pub struct ProxyRuleRequest {
1935    /// URL pattern to match (supports wildcards like "/api/users/*")
1936    pub pattern: String,
1937    /// Rule type: "request" or "response"
1938    #[serde(rename = "type")]
1939    pub rule_type: String,
1940    /// Optional status code filter for response rules
1941    #[serde(default)]
1942    pub status_codes: Vec<u16>,
1943    /// Body transformations to apply
1944    pub body_transforms: Vec<BodyTransformRequest>,
1945    /// Whether this rule is enabled
1946    #[serde(default = "default_true")]
1947    pub enabled: bool,
1948}
1949
1950/// Request body for individual body transformations
1951#[derive(Debug, Deserialize, Serialize)]
1952pub struct BodyTransformRequest {
1953    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1954    pub path: String,
1955    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1956    pub replace: String,
1957    /// Operation to perform: "replace", "add", or "remove"
1958    #[serde(default)]
1959    pub operation: String,
1960}
1961
1962/// Response format for proxy rules
1963#[derive(Debug, Serialize)]
1964pub struct ProxyRuleResponse {
1965    /// Rule ID (index in the array)
1966    pub id: usize,
1967    /// URL pattern
1968    pub pattern: String,
1969    /// Rule type
1970    #[serde(rename = "type")]
1971    pub rule_type: String,
1972    /// Status codes (for response rules)
1973    pub status_codes: Vec<u16>,
1974    /// Body transformations
1975    pub body_transforms: Vec<BodyTransformRequest>,
1976    /// Whether enabled
1977    pub enabled: bool,
1978}
1979
1980/// List all proxy replacement rules
1981async fn list_proxy_rules(
1982    State(state): State<ManagementState>,
1983) -> Result<Json<serde_json::Value>, StatusCode> {
1984    let proxy_config = match &state.proxy_config {
1985        Some(config) => config,
1986        None => {
1987            return Ok(Json(serde_json::json!({
1988                "error": "Proxy not configured. Proxy config not available."
1989            })));
1990        }
1991    };
1992
1993    let config = proxy_config.read().await;
1994
1995    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1996
1997    // Add request replacement rules
1998    for (idx, rule) in config.request_replacements.iter().enumerate() {
1999        rules.push(ProxyRuleResponse {
2000            id: idx,
2001            pattern: rule.pattern.clone(),
2002            rule_type: "request".to_string(),
2003            status_codes: Vec::new(),
2004            body_transforms: rule
2005                .body_transforms
2006                .iter()
2007                .map(|t| BodyTransformRequest {
2008                    path: t.path.clone(),
2009                    replace: t.replace.clone(),
2010                    operation: format!("{:?}", t.operation).to_lowercase(),
2011                })
2012                .collect(),
2013            enabled: rule.enabled,
2014        });
2015    }
2016
2017    // Add response replacement rules
2018    let request_count = config.request_replacements.len();
2019    for (idx, rule) in config.response_replacements.iter().enumerate() {
2020        rules.push(ProxyRuleResponse {
2021            id: request_count + idx,
2022            pattern: rule.pattern.clone(),
2023            rule_type: "response".to_string(),
2024            status_codes: rule.status_codes.clone(),
2025            body_transforms: rule
2026                .body_transforms
2027                .iter()
2028                .map(|t| BodyTransformRequest {
2029                    path: t.path.clone(),
2030                    replace: t.replace.clone(),
2031                    operation: format!("{:?}", t.operation).to_lowercase(),
2032                })
2033                .collect(),
2034            enabled: rule.enabled,
2035        });
2036    }
2037
2038    Ok(Json(serde_json::json!({
2039        "rules": rules
2040    })))
2041}
2042
2043/// Create a new proxy replacement rule
2044async fn create_proxy_rule(
2045    State(state): State<ManagementState>,
2046    Json(request): Json<ProxyRuleRequest>,
2047) -> Result<Json<serde_json::Value>, StatusCode> {
2048    let proxy_config = match &state.proxy_config {
2049        Some(config) => config,
2050        None => {
2051            return Ok(Json(serde_json::json!({
2052                "error": "Proxy not configured. Proxy config not available."
2053            })));
2054        }
2055    };
2056
2057    // Validate request
2058    if request.body_transforms.is_empty() {
2059        return Ok(Json(serde_json::json!({
2060            "error": "At least one body transform is required"
2061        })));
2062    }
2063
2064    let body_transforms: Vec<BodyTransform> = request
2065        .body_transforms
2066        .iter()
2067        .map(|t| {
2068            let op = match t.operation.as_str() {
2069                "replace" => TransformOperation::Replace,
2070                "add" => TransformOperation::Add,
2071                "remove" => TransformOperation::Remove,
2072                _ => TransformOperation::Replace,
2073            };
2074            BodyTransform {
2075                path: t.path.clone(),
2076                replace: t.replace.clone(),
2077                operation: op,
2078            }
2079        })
2080        .collect();
2081
2082    let new_rule = BodyTransformRule {
2083        pattern: request.pattern.clone(),
2084        status_codes: request.status_codes.clone(),
2085        body_transforms,
2086        enabled: request.enabled,
2087    };
2088
2089    let mut config = proxy_config.write().await;
2090
2091    let rule_id = if request.rule_type == "request" {
2092        config.request_replacements.push(new_rule);
2093        config.request_replacements.len() - 1
2094    } else if request.rule_type == "response" {
2095        config.response_replacements.push(new_rule);
2096        config.request_replacements.len() + config.response_replacements.len() - 1
2097    } else {
2098        return Ok(Json(serde_json::json!({
2099            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2100        })));
2101    };
2102
2103    Ok(Json(serde_json::json!({
2104        "id": rule_id,
2105        "message": "Rule created successfully"
2106    })))
2107}
2108
2109/// Get a specific proxy replacement rule
2110async fn get_proxy_rule(
2111    State(state): State<ManagementState>,
2112    Path(id): Path<String>,
2113) -> Result<Json<serde_json::Value>, StatusCode> {
2114    let proxy_config = match &state.proxy_config {
2115        Some(config) => config,
2116        None => {
2117            return Ok(Json(serde_json::json!({
2118                "error": "Proxy not configured. Proxy config not available."
2119            })));
2120        }
2121    };
2122
2123    let config = proxy_config.read().await;
2124    let rule_id: usize = match id.parse() {
2125        Ok(id) => id,
2126        Err(_) => {
2127            return Ok(Json(serde_json::json!({
2128                "error": format!("Invalid rule ID: {}", id)
2129            })));
2130        }
2131    };
2132
2133    let request_count = config.request_replacements.len();
2134
2135    if rule_id < request_count {
2136        // Request rule
2137        let rule = &config.request_replacements[rule_id];
2138        Ok(Json(serde_json::json!({
2139            "id": rule_id,
2140            "pattern": rule.pattern,
2141            "type": "request",
2142            "status_codes": [],
2143            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2144                "path": t.path,
2145                "replace": t.replace,
2146                "operation": format!("{:?}", t.operation).to_lowercase()
2147            })).collect::<Vec<_>>(),
2148            "enabled": rule.enabled
2149        })))
2150    } else if rule_id < request_count + config.response_replacements.len() {
2151        // Response rule
2152        let response_idx = rule_id - request_count;
2153        let rule = &config.response_replacements[response_idx];
2154        Ok(Json(serde_json::json!({
2155            "id": rule_id,
2156            "pattern": rule.pattern,
2157            "type": "response",
2158            "status_codes": rule.status_codes,
2159            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2160                "path": t.path,
2161                "replace": t.replace,
2162                "operation": format!("{:?}", t.operation).to_lowercase()
2163            })).collect::<Vec<_>>(),
2164            "enabled": rule.enabled
2165        })))
2166    } else {
2167        Ok(Json(serde_json::json!({
2168            "error": format!("Rule ID {} not found", rule_id)
2169        })))
2170    }
2171}
2172
2173/// Update a proxy replacement rule
2174async fn update_proxy_rule(
2175    State(state): State<ManagementState>,
2176    Path(id): Path<String>,
2177    Json(request): Json<ProxyRuleRequest>,
2178) -> Result<Json<serde_json::Value>, StatusCode> {
2179    let proxy_config = match &state.proxy_config {
2180        Some(config) => config,
2181        None => {
2182            return Ok(Json(serde_json::json!({
2183                "error": "Proxy not configured. Proxy config not available."
2184            })));
2185        }
2186    };
2187
2188    let mut config = proxy_config.write().await;
2189    let rule_id: usize = match id.parse() {
2190        Ok(id) => id,
2191        Err(_) => {
2192            return Ok(Json(serde_json::json!({
2193                "error": format!("Invalid rule ID: {}", id)
2194            })));
2195        }
2196    };
2197
2198    let body_transforms: Vec<BodyTransform> = request
2199        .body_transforms
2200        .iter()
2201        .map(|t| {
2202            let op = match t.operation.as_str() {
2203                "replace" => TransformOperation::Replace,
2204                "add" => TransformOperation::Add,
2205                "remove" => TransformOperation::Remove,
2206                _ => TransformOperation::Replace,
2207            };
2208            BodyTransform {
2209                path: t.path.clone(),
2210                replace: t.replace.clone(),
2211                operation: op,
2212            }
2213        })
2214        .collect();
2215
2216    let updated_rule = BodyTransformRule {
2217        pattern: request.pattern.clone(),
2218        status_codes: request.status_codes.clone(),
2219        body_transforms,
2220        enabled: request.enabled,
2221    };
2222
2223    let request_count = config.request_replacements.len();
2224
2225    if rule_id < request_count {
2226        // Update request rule
2227        config.request_replacements[rule_id] = updated_rule;
2228    } else if rule_id < request_count + config.response_replacements.len() {
2229        // Update response rule
2230        let response_idx = rule_id - request_count;
2231        config.response_replacements[response_idx] = updated_rule;
2232    } else {
2233        return Ok(Json(serde_json::json!({
2234            "error": format!("Rule ID {} not found", rule_id)
2235        })));
2236    }
2237
2238    Ok(Json(serde_json::json!({
2239        "id": rule_id,
2240        "message": "Rule updated successfully"
2241    })))
2242}
2243
2244/// Delete a proxy replacement rule
2245async fn delete_proxy_rule(
2246    State(state): State<ManagementState>,
2247    Path(id): Path<String>,
2248) -> Result<Json<serde_json::Value>, StatusCode> {
2249    let proxy_config = match &state.proxy_config {
2250        Some(config) => config,
2251        None => {
2252            return Ok(Json(serde_json::json!({
2253                "error": "Proxy not configured. Proxy config not available."
2254            })));
2255        }
2256    };
2257
2258    let mut config = proxy_config.write().await;
2259    let rule_id: usize = match id.parse() {
2260        Ok(id) => id,
2261        Err(_) => {
2262            return Ok(Json(serde_json::json!({
2263                "error": format!("Invalid rule ID: {}", id)
2264            })));
2265        }
2266    };
2267
2268    let request_count = config.request_replacements.len();
2269
2270    if rule_id < request_count {
2271        // Delete request rule
2272        config.request_replacements.remove(rule_id);
2273    } else if rule_id < request_count + config.response_replacements.len() {
2274        // Delete response rule
2275        let response_idx = rule_id - request_count;
2276        config.response_replacements.remove(response_idx);
2277    } else {
2278        return Ok(Json(serde_json::json!({
2279            "error": format!("Rule ID {} not found", rule_id)
2280        })));
2281    }
2282
2283    Ok(Json(serde_json::json!({
2284        "id": rule_id,
2285        "message": "Rule deleted successfully"
2286    })))
2287}
2288
2289/// Get proxy rules and transformation configuration for inspection
2290async fn get_proxy_inspect(
2291    State(state): State<ManagementState>,
2292    Query(params): Query<std::collections::HashMap<String, String>>,
2293) -> Result<Json<serde_json::Value>, StatusCode> {
2294    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2295    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2296
2297    let proxy_config = match &state.proxy_config {
2298        Some(config) => config.read().await,
2299        None => {
2300            return Ok(Json(serde_json::json!({
2301                "error": "Proxy not configured. Proxy config not available."
2302            })));
2303        }
2304    };
2305
2306    let mut rules = Vec::new();
2307    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2308        rules.push(serde_json::json!({
2309            "id": idx,
2310            "kind": "request",
2311            "pattern": rule.pattern,
2312            "enabled": rule.enabled,
2313            "status_codes": rule.status_codes,
2314            "transform_count": rule.body_transforms.len(),
2315            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2316                "path": t.path,
2317                "operation": t.operation,
2318                "replace": t.replace
2319            })).collect::<Vec<_>>()
2320        }));
2321    }
2322    let request_rule_count = rules.len();
2323    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2324        rules.push(serde_json::json!({
2325            "id": request_rule_count + idx,
2326            "kind": "response",
2327            "pattern": rule.pattern,
2328            "enabled": rule.enabled,
2329            "status_codes": rule.status_codes,
2330            "transform_count": rule.body_transforms.len(),
2331            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2332                "path": t.path,
2333                "operation": t.operation,
2334                "replace": t.replace
2335            })).collect::<Vec<_>>()
2336        }));
2337    }
2338
2339    let total = rules.len();
2340    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2341
2342    Ok(Json(serde_json::json!({
2343        "enabled": proxy_config.enabled,
2344        "target_url": proxy_config.target_url,
2345        "prefix": proxy_config.prefix,
2346        "timeout_seconds": proxy_config.timeout_seconds,
2347        "follow_redirects": proxy_config.follow_redirects,
2348        "passthrough_by_default": proxy_config.passthrough_by_default,
2349        "rules": paged_rules,
2350        "request_rule_count": request_rule_count,
2351        "response_rule_count": total.saturating_sub(request_rule_count),
2352        "limit": limit,
2353        "offset": offset,
2354        "total": total
2355    })))
2356}
2357
2358/// Build the management API router
2359pub fn management_router(state: ManagementState) -> Router {
2360    let router = Router::new()
2361        .route("/health", get(health_check))
2362        .route("/stats", get(get_stats))
2363        .route("/config", get(get_config))
2364        .route("/config/validate", post(validate_config))
2365        .route("/config/bulk", post(bulk_update_config))
2366        .route("/mocks", get(list_mocks))
2367        .route("/mocks", post(create_mock))
2368        .route("/mocks/{id}", get(get_mock))
2369        .route("/mocks/{id}", put(update_mock))
2370        .route("/mocks/{id}", delete(delete_mock))
2371        .route("/export", get(export_mocks))
2372        .route("/import", post(import_mocks));
2373
2374    #[cfg(feature = "smtp")]
2375    let router = router
2376        .route("/smtp/mailbox", get(list_smtp_emails))
2377        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2378        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2379        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2380        .route("/smtp/mailbox/search", get(search_smtp_emails));
2381
2382    #[cfg(not(feature = "smtp"))]
2383    let router = router;
2384
2385    // MQTT routes
2386    #[cfg(feature = "mqtt")]
2387    let router = router
2388        .route("/mqtt/stats", get(get_mqtt_stats))
2389        .route("/mqtt/clients", get(get_mqtt_clients))
2390        .route("/mqtt/topics", get(get_mqtt_topics))
2391        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2392        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2393        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2394        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2395
2396    #[cfg(not(feature = "mqtt"))]
2397    let router = router
2398        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2399        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2400
2401    #[cfg(feature = "kafka")]
2402    let router = router
2403        .route("/kafka/stats", get(get_kafka_stats))
2404        .route("/kafka/topics", get(get_kafka_topics))
2405        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2406        .route("/kafka/groups", get(get_kafka_groups))
2407        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2408        .route("/kafka/produce", post(produce_kafka_message))
2409        .route("/kafka/produce/batch", post(produce_kafka_batch))
2410        .route("/kafka/messages/stream", get(kafka_messages_stream));
2411
2412    #[cfg(not(feature = "kafka"))]
2413    let router = router;
2414
2415    // Migration pipeline routes
2416    let router = router
2417        .route("/migration/routes", get(get_migration_routes))
2418        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2419        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2420        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2421        .route("/migration/groups/{group}", put(set_group_migration_mode))
2422        .route("/migration/groups", get(get_migration_groups))
2423        .route("/migration/status", get(get_migration_status));
2424
2425    // Proxy replacement rules routes
2426    let router = router
2427        .route("/proxy/rules", get(list_proxy_rules))
2428        .route("/proxy/rules", post(create_proxy_rule))
2429        .route("/proxy/rules/{id}", get(get_proxy_rule))
2430        .route("/proxy/rules/{id}", put(update_proxy_rule))
2431        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2432        .route("/proxy/inspect", get(get_proxy_inspect));
2433
2434    // AI-powered features
2435    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2436
2437    // Snapshot diff endpoints
2438    let router = router.nest(
2439        "/snapshot-diff",
2440        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2441    );
2442
2443    #[cfg(feature = "behavioral-cloning")]
2444    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2445
2446    let router = router
2447        .route("/mockai/learn", post(learn_from_examples))
2448        .route("/mockai/rules/explanations", get(list_rule_explanations))
2449        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2450        .route("/chaos/config", get(get_chaos_config))
2451        .route("/chaos/config", post(update_chaos_config))
2452        .route("/network/profiles", get(list_network_profiles))
2453        .route("/network/profile/apply", post(apply_network_profile));
2454
2455    // State machine API routes
2456    let router =
2457        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2458
2459    router.with_state(state)
2460}
2461
2462#[cfg(feature = "kafka")]
2463/// Kafka broker statistics
2464#[derive(Debug, Clone, Serialize, Deserialize)]
2465pub struct KafkaBrokerStats {
2466    /// Number of topics
2467    pub topics: usize,
2468    /// Total number of partitions
2469    pub partitions: usize,
2470    /// Number of consumer groups
2471    pub consumer_groups: usize,
2472    /// Total messages produced
2473    pub messages_produced: u64,
2474    /// Total messages consumed
2475    pub messages_consumed: u64,
2476}
2477
2478#[cfg(feature = "kafka")]
2479#[allow(missing_docs)]
2480#[derive(Debug, Clone, Serialize, Deserialize)]
2481pub struct KafkaTopicInfo {
2482    pub name: String,
2483    pub partitions: usize,
2484    pub replication_factor: i32,
2485}
2486
2487#[cfg(feature = "kafka")]
2488#[allow(missing_docs)]
2489#[derive(Debug, Clone, Serialize, Deserialize)]
2490pub struct KafkaConsumerGroupInfo {
2491    pub group_id: String,
2492    pub members: usize,
2493    pub state: String,
2494}
2495
2496#[cfg(feature = "kafka")]
2497/// Get Kafka broker statistics
2498async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2499    if let Some(broker) = &state.kafka_broker {
2500        let topics = broker.topics.read().await;
2501        let consumer_groups = broker.consumer_groups.read().await;
2502
2503        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2504
2505        // Get metrics snapshot for message counts
2506        let metrics_snapshot = broker.metrics().snapshot();
2507
2508        let stats = KafkaBrokerStats {
2509            topics: topics.len(),
2510            partitions: total_partitions,
2511            consumer_groups: consumer_groups.groups().len(),
2512            messages_produced: metrics_snapshot.messages_produced_total,
2513            messages_consumed: metrics_snapshot.messages_consumed_total,
2514        };
2515
2516        Json(stats).into_response()
2517    } else {
2518        (
2519            StatusCode::SERVICE_UNAVAILABLE,
2520            Json(serde_json::json!({
2521                "error": "Kafka broker not available",
2522                "message": "Kafka broker is not enabled or not available."
2523            })),
2524        )
2525            .into_response()
2526    }
2527}
2528
2529#[cfg(feature = "kafka")]
2530/// List Kafka topics
2531async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2532    if let Some(broker) = &state.kafka_broker {
2533        let topics = broker.topics.read().await;
2534        let topic_list: Vec<KafkaTopicInfo> = topics
2535            .iter()
2536            .map(|(name, topic)| KafkaTopicInfo {
2537                name: name.clone(),
2538                partitions: topic.partitions.len(),
2539                replication_factor: topic.config.replication_factor as i32,
2540            })
2541            .collect();
2542
2543        Json(serde_json::json!({
2544            "topics": topic_list
2545        }))
2546        .into_response()
2547    } else {
2548        (
2549            StatusCode::SERVICE_UNAVAILABLE,
2550            Json(serde_json::json!({
2551                "error": "Kafka broker not available",
2552                "message": "Kafka broker is not enabled or not available."
2553            })),
2554        )
2555            .into_response()
2556    }
2557}
2558
2559#[cfg(feature = "kafka")]
2560/// Get Kafka topic details
2561async fn get_kafka_topic(
2562    State(state): State<ManagementState>,
2563    Path(topic_name): Path<String>,
2564) -> impl IntoResponse {
2565    if let Some(broker) = &state.kafka_broker {
2566        let topics = broker.topics.read().await;
2567        if let Some(topic) = topics.get(&topic_name) {
2568            Json(serde_json::json!({
2569                "name": topic_name,
2570                "partitions": topic.partitions.len(),
2571                "replication_factor": topic.config.replication_factor,
2572                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2573                    "id": idx as i32,
2574                    "leader": 0,
2575                    "replicas": vec![0],
2576                    "message_count": partition.messages.len()
2577                })).collect::<Vec<_>>()
2578            })).into_response()
2579        } else {
2580            (
2581                StatusCode::NOT_FOUND,
2582                Json(serde_json::json!({
2583                    "error": "Topic not found",
2584                    "topic": topic_name
2585                })),
2586            )
2587                .into_response()
2588        }
2589    } else {
2590        (
2591            StatusCode::SERVICE_UNAVAILABLE,
2592            Json(serde_json::json!({
2593                "error": "Kafka broker not available",
2594                "message": "Kafka broker is not enabled or not available."
2595            })),
2596        )
2597            .into_response()
2598    }
2599}
2600
2601#[cfg(feature = "kafka")]
2602/// List Kafka consumer groups
2603async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2604    if let Some(broker) = &state.kafka_broker {
2605        let consumer_groups = broker.consumer_groups.read().await;
2606        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2607            .groups()
2608            .iter()
2609            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2610                group_id: group_id.clone(),
2611                members: group.members.len(),
2612                state: "Stable".to_string(), // Simplified - could be more detailed
2613            })
2614            .collect();
2615
2616        Json(serde_json::json!({
2617            "groups": groups
2618        }))
2619        .into_response()
2620    } else {
2621        (
2622            StatusCode::SERVICE_UNAVAILABLE,
2623            Json(serde_json::json!({
2624                "error": "Kafka broker not available",
2625                "message": "Kafka broker is not enabled or not available."
2626            })),
2627        )
2628            .into_response()
2629    }
2630}
2631
2632#[cfg(feature = "kafka")]
2633/// Get Kafka consumer group details
2634async fn get_kafka_group(
2635    State(state): State<ManagementState>,
2636    Path(group_id): Path<String>,
2637) -> impl IntoResponse {
2638    if let Some(broker) = &state.kafka_broker {
2639        let consumer_groups = broker.consumer_groups.read().await;
2640        if let Some(group) = consumer_groups.groups().get(&group_id) {
2641            Json(serde_json::json!({
2642                "group_id": group_id,
2643                "members": group.members.len(),
2644                "state": "Stable",
2645                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2646                    "member_id": member_id,
2647                    "client_id": member.client_id,
2648                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2649                        "topic": a.topic,
2650                        "partitions": a.partitions
2651                    })).collect::<Vec<_>>()
2652                })).collect::<Vec<_>>(),
2653                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2654                    "topic": topic,
2655                    "partition": partition,
2656                    "offset": offset
2657                })).collect::<Vec<_>>()
2658            })).into_response()
2659        } else {
2660            (
2661                StatusCode::NOT_FOUND,
2662                Json(serde_json::json!({
2663                    "error": "Consumer group not found",
2664                    "group_id": group_id
2665                })),
2666            )
2667                .into_response()
2668        }
2669    } else {
2670        (
2671            StatusCode::SERVICE_UNAVAILABLE,
2672            Json(serde_json::json!({
2673                "error": "Kafka broker not available",
2674                "message": "Kafka broker is not enabled or not available."
2675            })),
2676        )
2677            .into_response()
2678    }
2679}
2680
2681// ========== Kafka Produce Handler ==========
2682
2683#[cfg(feature = "kafka")]
2684/// Request body for producing a Kafka message
2685#[derive(Debug, Deserialize)]
2686pub struct KafkaProduceRequest {
2687    /// Topic to produce to
2688    pub topic: String,
2689    /// Message key (optional)
2690    #[serde(default)]
2691    pub key: Option<String>,
2692    /// Message value (JSON string or plain string)
2693    pub value: String,
2694    /// Partition ID (optional, auto-assigned if not provided)
2695    #[serde(default)]
2696    pub partition: Option<i32>,
2697    /// Message headers (optional, key-value pairs)
2698    #[serde(default)]
2699    pub headers: Option<std::collections::HashMap<String, String>>,
2700}
2701
2702#[cfg(feature = "kafka")]
2703/// Produce a message to a Kafka topic
2704async fn produce_kafka_message(
2705    State(state): State<ManagementState>,
2706    Json(request): Json<KafkaProduceRequest>,
2707) -> impl IntoResponse {
2708    if let Some(broker) = &state.kafka_broker {
2709        let mut topics = broker.topics.write().await;
2710
2711        // Get or create the topic
2712        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2713            mockforge_kafka::topics::Topic::new(
2714                request.topic.clone(),
2715                mockforge_kafka::topics::TopicConfig::default(),
2716            )
2717        });
2718
2719        // Determine partition
2720        let partition_id = if let Some(partition) = request.partition {
2721            partition
2722        } else {
2723            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2724        };
2725
2726        // Validate partition exists
2727        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2728            return (
2729                StatusCode::BAD_REQUEST,
2730                Json(serde_json::json!({
2731                    "error": "Invalid partition",
2732                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2733                })),
2734            )
2735                .into_response();
2736        }
2737
2738        // Create the message
2739        let key_clone = request.key.clone();
2740        let headers_clone = request.headers.clone();
2741        let message = mockforge_kafka::partitions::KafkaMessage {
2742            offset: 0, // Will be set by partition.append
2743            timestamp: chrono::Utc::now().timestamp_millis(),
2744            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2745            value: request.value.as_bytes().to_vec(),
2746            headers: headers_clone
2747                .clone()
2748                .unwrap_or_default()
2749                .into_iter()
2750                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2751                .collect(),
2752        };
2753
2754        // Produce to partition
2755        match topic_entry.produce(partition_id, message).await {
2756            Ok(offset) => {
2757                // Record metrics for successful message production
2758                if let Some(broker) = &state.kafka_broker {
2759                    broker.metrics().record_messages_produced(1);
2760                }
2761
2762                // Emit message event for real-time monitoring
2763                #[cfg(feature = "kafka")]
2764                {
2765                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2766                        topic: request.topic.clone(),
2767                        key: key_clone,
2768                        value: request.value.clone(),
2769                        partition: partition_id,
2770                        offset,
2771                        headers: headers_clone,
2772                        timestamp: chrono::Utc::now().to_rfc3339(),
2773                    });
2774                    let _ = state.message_events.send(event);
2775                }
2776
2777                Json(serde_json::json!({
2778                    "success": true,
2779                    "message": format!("Message produced to topic '{}'", request.topic),
2780                    "topic": request.topic,
2781                    "partition": partition_id,
2782                    "offset": offset
2783                }))
2784                .into_response()
2785            }
2786            Err(e) => (
2787                StatusCode::INTERNAL_SERVER_ERROR,
2788                Json(serde_json::json!({
2789                    "error": "Failed to produce message",
2790                    "message": e.to_string()
2791                })),
2792            )
2793                .into_response(),
2794        }
2795    } else {
2796        (
2797            StatusCode::SERVICE_UNAVAILABLE,
2798            Json(serde_json::json!({
2799                "error": "Kafka broker not available",
2800                "message": "Kafka broker is not enabled or not available."
2801            })),
2802        )
2803            .into_response()
2804    }
2805}
2806
2807#[cfg(feature = "kafka")]
2808/// Request body for producing a batch of Kafka messages
2809#[derive(Debug, Deserialize)]
2810pub struct KafkaBatchProduceRequest {
2811    /// List of messages to produce
2812    pub messages: Vec<KafkaProduceRequest>,
2813    /// Delay between messages in milliseconds
2814    #[serde(default = "default_delay")]
2815    pub delay_ms: u64,
2816}
2817
2818#[cfg(feature = "kafka")]
2819/// Produce multiple messages to Kafka topics
2820async fn produce_kafka_batch(
2821    State(state): State<ManagementState>,
2822    Json(request): Json<KafkaBatchProduceRequest>,
2823) -> impl IntoResponse {
2824    if let Some(broker) = &state.kafka_broker {
2825        if request.messages.is_empty() {
2826            return (
2827                StatusCode::BAD_REQUEST,
2828                Json(serde_json::json!({
2829                    "error": "Empty batch",
2830                    "message": "At least one message is required"
2831                })),
2832            )
2833                .into_response();
2834        }
2835
2836        let mut results = Vec::new();
2837
2838        for (index, msg_request) in request.messages.iter().enumerate() {
2839            let mut topics = broker.topics.write().await;
2840
2841            // Get or create the topic
2842            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2843                mockforge_kafka::topics::Topic::new(
2844                    msg_request.topic.clone(),
2845                    mockforge_kafka::topics::TopicConfig::default(),
2846                )
2847            });
2848
2849            // Determine partition
2850            let partition_id = if let Some(partition) = msg_request.partition {
2851                partition
2852            } else {
2853                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2854            };
2855
2856            // Validate partition exists
2857            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2858                results.push(serde_json::json!({
2859                    "index": index,
2860                    "success": false,
2861                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2862                }));
2863                continue;
2864            }
2865
2866            // Create the message
2867            let message = mockforge_kafka::partitions::KafkaMessage {
2868                offset: 0,
2869                timestamp: chrono::Utc::now().timestamp_millis(),
2870                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2871                value: msg_request.value.as_bytes().to_vec(),
2872                headers: msg_request
2873                    .headers
2874                    .clone()
2875                    .unwrap_or_default()
2876                    .into_iter()
2877                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2878                    .collect(),
2879            };
2880
2881            // Produce to partition
2882            match topic_entry.produce(partition_id, message).await {
2883                Ok(offset) => {
2884                    // Record metrics for successful message production
2885                    if let Some(broker) = &state.kafka_broker {
2886                        broker.metrics().record_messages_produced(1);
2887                    }
2888
2889                    // Emit message event
2890                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2891                        topic: msg_request.topic.clone(),
2892                        key: msg_request.key.clone(),
2893                        value: msg_request.value.clone(),
2894                        partition: partition_id,
2895                        offset,
2896                        headers: msg_request.headers.clone(),
2897                        timestamp: chrono::Utc::now().to_rfc3339(),
2898                    });
2899                    let _ = state.message_events.send(event);
2900
2901                    results.push(serde_json::json!({
2902                        "index": index,
2903                        "success": true,
2904                        "topic": msg_request.topic,
2905                        "partition": partition_id,
2906                        "offset": offset
2907                    }));
2908                }
2909                Err(e) => {
2910                    results.push(serde_json::json!({
2911                        "index": index,
2912                        "success": false,
2913                        "error": e.to_string()
2914                    }));
2915                }
2916            }
2917
2918            // Add delay between messages (except for the last one)
2919            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2920                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2921            }
2922        }
2923
2924        let success_count =
2925            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2926
2927        Json(serde_json::json!({
2928            "success": true,
2929            "total": request.messages.len(),
2930            "succeeded": success_count,
2931            "failed": request.messages.len() - success_count,
2932            "results": results
2933        }))
2934        .into_response()
2935    } else {
2936        (
2937            StatusCode::SERVICE_UNAVAILABLE,
2938            Json(serde_json::json!({
2939                "error": "Kafka broker not available",
2940                "message": "Kafka broker is not enabled or not available."
2941            })),
2942        )
2943            .into_response()
2944    }
2945}
2946
2947// ========== Real-time Message Streaming (SSE) ==========
2948
2949#[cfg(feature = "mqtt")]
2950/// SSE stream for MQTT messages
2951async fn mqtt_messages_stream(
2952    State(state): State<ManagementState>,
2953    Query(params): Query<std::collections::HashMap<String, String>>,
2954) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2955    let rx = state.message_events.subscribe();
2956    let topic_filter = params.get("topic").cloned();
2957
2958    let stream = stream::unfold(rx, move |mut rx| {
2959        let topic_filter = topic_filter.clone();
2960
2961        async move {
2962            loop {
2963                match rx.recv().await {
2964                    Ok(MessageEvent::Mqtt(event)) => {
2965                        // Apply topic filter if specified
2966                        if let Some(filter) = &topic_filter {
2967                            if !event.topic.contains(filter) {
2968                                continue;
2969                            }
2970                        }
2971
2972                        let event_json = serde_json::json!({
2973                            "protocol": "mqtt",
2974                            "topic": event.topic,
2975                            "payload": event.payload,
2976                            "qos": event.qos,
2977                            "retain": event.retain,
2978                            "timestamp": event.timestamp,
2979                        });
2980
2981                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2982                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2983                            return Some((Ok(sse_event), rx));
2984                        }
2985                    }
2986                    #[cfg(feature = "kafka")]
2987                    Ok(MessageEvent::Kafka(_)) => {
2988                        // Skip Kafka events in MQTT stream
2989                        continue;
2990                    }
2991                    Err(broadcast::error::RecvError::Closed) => {
2992                        return None;
2993                    }
2994                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2995                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2996                        continue;
2997                    }
2998                }
2999            }
3000        }
3001    });
3002
3003    Sse::new(stream).keep_alive(
3004        axum::response::sse::KeepAlive::new()
3005            .interval(std::time::Duration::from_secs(15))
3006            .text("keep-alive-text"),
3007    )
3008}
3009
3010#[cfg(feature = "kafka")]
3011/// SSE stream for Kafka messages
3012async fn kafka_messages_stream(
3013    State(state): State<ManagementState>,
3014    Query(params): Query<std::collections::HashMap<String, String>>,
3015) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3016    let rx = state.message_events.subscribe();
3017    let topic_filter = params.get("topic").cloned();
3018
3019    let stream = stream::unfold(rx, move |mut rx| {
3020        let topic_filter = topic_filter.clone();
3021
3022        async move {
3023            loop {
3024                match rx.recv().await {
3025                    #[cfg(feature = "mqtt")]
3026                    Ok(MessageEvent::Mqtt(_)) => {
3027                        // Skip MQTT events in Kafka stream
3028                        continue;
3029                    }
3030                    Ok(MessageEvent::Kafka(event)) => {
3031                        // Apply topic filter if specified
3032                        if let Some(filter) = &topic_filter {
3033                            if !event.topic.contains(filter) {
3034                                continue;
3035                            }
3036                        }
3037
3038                        let event_json = serde_json::json!({
3039                            "protocol": "kafka",
3040                            "topic": event.topic,
3041                            "key": event.key,
3042                            "value": event.value,
3043                            "partition": event.partition,
3044                            "offset": event.offset,
3045                            "headers": event.headers,
3046                            "timestamp": event.timestamp,
3047                        });
3048
3049                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3050                            let sse_event =
3051                                Event::default().event("kafka_message").data(event_data);
3052                            return Some((Ok(sse_event), rx));
3053                        }
3054                    }
3055                    Err(broadcast::error::RecvError::Closed) => {
3056                        return None;
3057                    }
3058                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3059                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3060                        continue;
3061                    }
3062                }
3063            }
3064        }
3065    });
3066
3067    Sse::new(stream).keep_alive(
3068        axum::response::sse::KeepAlive::new()
3069            .interval(std::time::Duration::from_secs(15))
3070            .text("keep-alive-text"),
3071    )
3072}
3073
3074// ========== AI-Powered Features ==========
3075
3076/// Request for AI-powered API specification generation
3077#[derive(Debug, Deserialize)]
3078pub struct GenerateSpecRequest {
3079    /// Natural language description of the API to generate
3080    pub query: String,
3081    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3082    pub spec_type: String,
3083    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3084    pub api_version: Option<String>,
3085}
3086
3087/// Request for OpenAPI generation from recorded traffic
3088#[derive(Debug, Deserialize)]
3089pub struct GenerateOpenApiFromTrafficRequest {
3090    /// Path to recorder database (optional, defaults to ./recordings.db)
3091    #[serde(default)]
3092    pub database_path: Option<String>,
3093    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3094    #[serde(default)]
3095    pub since: Option<String>,
3096    /// End time for filtering (ISO 8601 format)
3097    #[serde(default)]
3098    pub until: Option<String>,
3099    /// Path pattern filter (supports wildcards, e.g., /api/*)
3100    #[serde(default)]
3101    pub path_pattern: Option<String>,
3102    /// Minimum confidence score for including paths (0.0 to 1.0)
3103    #[serde(default = "default_min_confidence")]
3104    pub min_confidence: f64,
3105}
3106
3107fn default_min_confidence() -> f64 {
3108    0.7
3109}
3110
3111/// Generate API specification from natural language using AI
3112#[cfg(feature = "data-faker")]
3113async fn generate_ai_spec(
3114    State(_state): State<ManagementState>,
3115    Json(request): Json<GenerateSpecRequest>,
3116) -> impl IntoResponse {
3117    use mockforge_data::rag::{
3118        config::{LlmProvider, RagConfig},
3119        engine::RagEngine,
3120        storage::DocumentStorage,
3121    };
3122    use std::sync::Arc;
3123
3124    // Build RAG config from environment variables
3125    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3126        .ok()
3127        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3128
3129    // Check if RAG is configured - require API key
3130    if api_key.is_none() {
3131        return (
3132            StatusCode::SERVICE_UNAVAILABLE,
3133            Json(serde_json::json!({
3134                "error": "AI service not configured",
3135                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3136            })),
3137        )
3138            .into_response();
3139    }
3140
3141    // Build RAG configuration
3142    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3143        .unwrap_or_else(|_| "openai".to_string())
3144        .to_lowercase();
3145
3146    let provider = match provider_str.as_str() {
3147        "openai" => LlmProvider::OpenAI,
3148        "anthropic" => LlmProvider::Anthropic,
3149        "ollama" => LlmProvider::Ollama,
3150        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3151        _ => LlmProvider::OpenAI,
3152    };
3153
3154    let api_endpoint =
3155        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3156            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3157            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3158            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3159            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3160        });
3161
3162    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3163        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3164        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3165        LlmProvider::Ollama => "llama2".to_string(),
3166        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3167    });
3168
3169    // Build RagConfig using struct literal with defaults
3170    let rag_config = RagConfig {
3171        provider,
3172        api_endpoint,
3173        api_key,
3174        model,
3175        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3176            .unwrap_or_else(|_| "4096".to_string())
3177            .parse()
3178            .unwrap_or(4096),
3179        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3180            .unwrap_or_else(|_| "0.3".to_string())
3181            .parse()
3182            .unwrap_or(0.3), // Lower temperature for more structured output
3183        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3184            .unwrap_or_else(|_| "60".to_string())
3185            .parse()
3186            .unwrap_or(60),
3187        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3188            .unwrap_or_else(|_| "4000".to_string())
3189            .parse()
3190            .unwrap_or(4000),
3191        ..Default::default()
3192    };
3193
3194    // Build the prompt for spec generation
3195    let spec_type_label = match request.spec_type.as_str() {
3196        "openapi" => "OpenAPI 3.0",
3197        "graphql" => "GraphQL",
3198        "asyncapi" => "AsyncAPI",
3199        _ => "OpenAPI 3.0",
3200    };
3201
3202    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3203
3204    let prompt = format!(
3205        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3206
3207User Requirements:
3208{}
3209
3210Instructions:
32111. Generate a complete, valid {} specification
32122. Include all paths, operations, request/response schemas, and components
32133. Use realistic field names and data types
32144. Include proper descriptions and examples
32155. Follow {} best practices
32166. Return ONLY the specification, no additional explanation
32177. For OpenAPI, use version {}
3218
3219Return the specification in {} format."#,
3220        spec_type_label,
3221        request.query,
3222        spec_type_label,
3223        spec_type_label,
3224        api_version,
3225        if request.spec_type == "graphql" {
3226            "GraphQL SDL"
3227        } else {
3228            "YAML"
3229        }
3230    );
3231
3232    // Create in-memory storage for RAG engine
3233    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3234    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3235    // a simpler approach: create InMemoryStorage directly
3236    use mockforge_data::rag::storage::InMemoryStorage;
3237    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3238
3239    // Create RAG engine
3240    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3241        Ok(engine) => engine,
3242        Err(e) => {
3243            return (
3244                StatusCode::INTERNAL_SERVER_ERROR,
3245                Json(serde_json::json!({
3246                    "error": "Failed to initialize RAG engine",
3247                    "message": e.to_string()
3248                })),
3249            )
3250                .into_response();
3251        }
3252    };
3253
3254    // Generate using RAG engine
3255    match rag_engine.generate(&prompt, None).await {
3256        Ok(generated_text) => {
3257            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3258            let spec = if request.spec_type == "graphql" {
3259                // For GraphQL, extract SDL
3260                extract_graphql_schema(&generated_text)
3261            } else {
3262                // For OpenAPI/AsyncAPI, extract YAML
3263                extract_yaml_spec(&generated_text)
3264            };
3265
3266            Json(serde_json::json!({
3267                "success": true,
3268                "spec": spec,
3269                "spec_type": request.spec_type,
3270            }))
3271            .into_response()
3272        }
3273        Err(e) => (
3274            StatusCode::INTERNAL_SERVER_ERROR,
3275            Json(serde_json::json!({
3276                "error": "AI generation failed",
3277                "message": e.to_string()
3278            })),
3279        )
3280            .into_response(),
3281    }
3282}
3283
3284#[cfg(not(feature = "data-faker"))]
3285async fn generate_ai_spec(
3286    State(_state): State<ManagementState>,
3287    Json(_request): Json<GenerateSpecRequest>,
3288) -> impl IntoResponse {
3289    (
3290        StatusCode::NOT_IMPLEMENTED,
3291        Json(serde_json::json!({
3292            "error": "AI features not enabled",
3293            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3294        })),
3295    )
3296        .into_response()
3297}
3298
3299/// Generate OpenAPI specification from recorded traffic
3300#[cfg(feature = "behavioral-cloning")]
3301async fn generate_openapi_from_traffic(
3302    State(_state): State<ManagementState>,
3303    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3304) -> impl IntoResponse {
3305    use chrono::{DateTime, Utc};
3306    use mockforge_core::intelligent_behavior::{
3307        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3308        IntelligentBehaviorConfig,
3309    };
3310    use mockforge_recorder::{
3311        database::RecorderDatabase,
3312        openapi_export::{QueryFilters, RecordingsToOpenApi},
3313    };
3314    use std::path::PathBuf;
3315
3316    // Determine database path
3317    let db_path = if let Some(ref path) = request.database_path {
3318        PathBuf::from(path)
3319    } else {
3320        std::env::current_dir()
3321            .unwrap_or_else(|_| PathBuf::from("."))
3322            .join("recordings.db")
3323    };
3324
3325    // Open database
3326    let db = match RecorderDatabase::new(&db_path).await {
3327        Ok(db) => db,
3328        Err(e) => {
3329            return (
3330                StatusCode::BAD_REQUEST,
3331                Json(serde_json::json!({
3332                    "error": "Database error",
3333                    "message": format!("Failed to open recorder database: {}", e)
3334                })),
3335            )
3336                .into_response();
3337        }
3338    };
3339
3340    // Parse time filters
3341    let since_dt = if let Some(ref since_str) = request.since {
3342        match DateTime::parse_from_rfc3339(since_str) {
3343            Ok(dt) => Some(dt.with_timezone(&Utc)),
3344            Err(e) => {
3345                return (
3346                    StatusCode::BAD_REQUEST,
3347                    Json(serde_json::json!({
3348                        "error": "Invalid date format",
3349                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3350                    })),
3351                )
3352                    .into_response();
3353            }
3354        }
3355    } else {
3356        None
3357    };
3358
3359    let until_dt = if let Some(ref until_str) = request.until {
3360        match DateTime::parse_from_rfc3339(until_str) {
3361            Ok(dt) => Some(dt.with_timezone(&Utc)),
3362            Err(e) => {
3363                return (
3364                    StatusCode::BAD_REQUEST,
3365                    Json(serde_json::json!({
3366                        "error": "Invalid date format",
3367                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3368                    })),
3369                )
3370                    .into_response();
3371            }
3372        }
3373    } else {
3374        None
3375    };
3376
3377    // Build query filters
3378    let query_filters = QueryFilters {
3379        since: since_dt,
3380        until: until_dt,
3381        path_pattern: request.path_pattern.clone(),
3382        min_status_code: None,
3383        max_requests: Some(1000),
3384    };
3385
3386    // Query HTTP exchanges
3387    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3388    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3389    // dependency, so we need to manually convert to the local version.
3390    let exchanges_from_recorder =
3391        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3392            Ok(exchanges) => exchanges,
3393            Err(e) => {
3394                return (
3395                    StatusCode::INTERNAL_SERVER_ERROR,
3396                    Json(serde_json::json!({
3397                        "error": "Query error",
3398                        "message": format!("Failed to query HTTP exchanges: {}", e)
3399                    })),
3400                )
3401                    .into_response();
3402            }
3403        };
3404
3405    if exchanges_from_recorder.is_empty() {
3406        return (
3407            StatusCode::NOT_FOUND,
3408            Json(serde_json::json!({
3409                "error": "No exchanges found",
3410                "message": "No HTTP exchanges found matching the specified filters"
3411            })),
3412        )
3413            .into_response();
3414    }
3415
3416    // Convert to local HttpExchange type to avoid version mismatch
3417    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3418    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3419        .into_iter()
3420        .map(|e| LocalHttpExchange {
3421            method: e.method,
3422            path: e.path,
3423            query_params: e.query_params,
3424            headers: e.headers,
3425            body: e.body,
3426            body_encoding: e.body_encoding,
3427            status_code: e.status_code,
3428            response_headers: e.response_headers,
3429            response_body: e.response_body,
3430            response_body_encoding: e.response_body_encoding,
3431            timestamp: e.timestamp,
3432        })
3433        .collect();
3434
3435    // Create OpenAPI generator config
3436    let behavior_config = IntelligentBehaviorConfig::default();
3437    let gen_config = OpenApiGenerationConfig {
3438        min_confidence: request.min_confidence,
3439        behavior_model: Some(behavior_config.behavior_model),
3440    };
3441
3442    // Generate OpenAPI spec
3443    let generator = OpenApiSpecGenerator::new(gen_config);
3444    let result = match generator.generate_from_exchanges(exchanges).await {
3445        Ok(result) => result,
3446        Err(e) => {
3447            return (
3448                StatusCode::INTERNAL_SERVER_ERROR,
3449                Json(serde_json::json!({
3450                    "error": "Generation error",
3451                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3452                })),
3453            )
3454                .into_response();
3455        }
3456    };
3457
3458    // Prepare response
3459    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3460        raw.clone()
3461    } else {
3462        match serde_json::to_value(&result.spec.spec) {
3463            Ok(json) => json,
3464            Err(e) => {
3465                return (
3466                    StatusCode::INTERNAL_SERVER_ERROR,
3467                    Json(serde_json::json!({
3468                        "error": "Serialization error",
3469                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3470                    })),
3471                )
3472                    .into_response();
3473            }
3474        }
3475    };
3476
3477    // Build response with metadata
3478    let response = serde_json::json!({
3479        "spec": spec_json,
3480        "metadata": {
3481            "requests_analyzed": result.metadata.requests_analyzed,
3482            "paths_inferred": result.metadata.paths_inferred,
3483            "path_confidence": result.metadata.path_confidence,
3484            "generated_at": result.metadata.generated_at.to_rfc3339(),
3485            "duration_ms": result.metadata.duration_ms,
3486        }
3487    });
3488
3489    Json(response).into_response()
3490}
3491
3492/// List all rule explanations
3493async fn list_rule_explanations(
3494    State(state): State<ManagementState>,
3495    Query(params): Query<std::collections::HashMap<String, String>>,
3496) -> impl IntoResponse {
3497    use mockforge_core::intelligent_behavior::RuleType;
3498
3499    let explanations = state.rule_explanations.read().await;
3500    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3501
3502    // Filter by rule type if provided
3503    if let Some(rule_type_str) = params.get("rule_type") {
3504        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3505            explanations_vec.retain(|e| e.rule_type == rule_type);
3506        }
3507    }
3508
3509    // Filter by minimum confidence if provided
3510    if let Some(min_confidence_str) = params.get("min_confidence") {
3511        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3512            explanations_vec.retain(|e| e.confidence >= min_confidence);
3513        }
3514    }
3515
3516    // Sort by confidence (descending) and then by generated_at (descending)
3517    explanations_vec.sort_by(|a, b| {
3518        b.confidence
3519            .partial_cmp(&a.confidence)
3520            .unwrap_or(std::cmp::Ordering::Equal)
3521            .then_with(|| b.generated_at.cmp(&a.generated_at))
3522    });
3523
3524    Json(serde_json::json!({
3525        "explanations": explanations_vec,
3526        "total": explanations_vec.len(),
3527    }))
3528    .into_response()
3529}
3530
3531/// Get a specific rule explanation by ID
3532async fn get_rule_explanation(
3533    State(state): State<ManagementState>,
3534    Path(rule_id): Path<String>,
3535) -> impl IntoResponse {
3536    let explanations = state.rule_explanations.read().await;
3537
3538    match explanations.get(&rule_id) {
3539        Some(explanation) => Json(serde_json::json!({
3540            "explanation": explanation,
3541        }))
3542        .into_response(),
3543        None => (
3544            StatusCode::NOT_FOUND,
3545            Json(serde_json::json!({
3546                "error": "Rule explanation not found",
3547                "message": format!("No explanation found for rule ID: {}", rule_id)
3548            })),
3549        )
3550            .into_response(),
3551    }
3552}
3553
3554/// Request for learning from examples
3555#[derive(Debug, Deserialize)]
3556pub struct LearnFromExamplesRequest {
3557    /// Example request/response pairs to learn from
3558    pub examples: Vec<ExamplePairRequest>,
3559    /// Optional configuration override
3560    #[serde(default)]
3561    pub config: Option<serde_json::Value>,
3562}
3563
3564/// Example pair request format
3565#[derive(Debug, Deserialize)]
3566pub struct ExamplePairRequest {
3567    /// Request data (method, path, body, etc.)
3568    pub request: serde_json::Value,
3569    /// Response data (status_code, body, etc.)
3570    pub response: serde_json::Value,
3571}
3572
3573/// Learn behavioral rules from example pairs
3574///
3575/// This endpoint accepts example request/response pairs, generates behavioral rules
3576/// with explanations, and stores the explanations for later retrieval.
3577async fn learn_from_examples(
3578    State(state): State<ManagementState>,
3579    Json(request): Json<LearnFromExamplesRequest>,
3580) -> impl IntoResponse {
3581    use mockforge_core::intelligent_behavior::{
3582        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3583        rule_generator::{ExamplePair, RuleGenerator},
3584    };
3585
3586    if request.examples.is_empty() {
3587        return (
3588            StatusCode::BAD_REQUEST,
3589            Json(serde_json::json!({
3590                "error": "No examples provided",
3591                "message": "At least one example pair is required"
3592            })),
3593        )
3594            .into_response();
3595    }
3596
3597    // Convert request examples to ExamplePair format
3598    let example_pairs: Result<Vec<ExamplePair>, String> = request
3599        .examples
3600        .into_iter()
3601        .enumerate()
3602        .map(|(idx, ex)| {
3603            // Parse request JSON to extract method, path, body, etc.
3604            let method = ex
3605                .request
3606                .get("method")
3607                .and_then(|v| v.as_str())
3608                .map(|s| s.to_string())
3609                .unwrap_or_else(|| "GET".to_string());
3610            let path = ex
3611                .request
3612                .get("path")
3613                .and_then(|v| v.as_str())
3614                .map(|s| s.to_string())
3615                .unwrap_or_else(|| "/".to_string());
3616            let request_body = ex.request.get("body").cloned();
3617            let query_params = ex
3618                .request
3619                .get("query_params")
3620                .and_then(|v| v.as_object())
3621                .map(|obj| {
3622                    obj.iter()
3623                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3624                        .collect()
3625                })
3626                .unwrap_or_default();
3627            let headers = ex
3628                .request
3629                .get("headers")
3630                .and_then(|v| v.as_object())
3631                .map(|obj| {
3632                    obj.iter()
3633                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3634                        .collect()
3635                })
3636                .unwrap_or_default();
3637
3638            // Parse response JSON to extract status, body, etc.
3639            let status = ex
3640                .response
3641                .get("status_code")
3642                .or_else(|| ex.response.get("status"))
3643                .and_then(|v| v.as_u64())
3644                .map(|n| n as u16)
3645                .unwrap_or(200);
3646            let response_body = ex.response.get("body").cloned();
3647
3648            Ok(ExamplePair {
3649                method,
3650                path,
3651                request: request_body,
3652                status,
3653                response: response_body,
3654                query_params,
3655                headers,
3656                metadata: {
3657                    let mut meta = std::collections::HashMap::new();
3658                    meta.insert("source".to_string(), "api".to_string());
3659                    meta.insert("example_index".to_string(), idx.to_string());
3660                    meta
3661                },
3662            })
3663        })
3664        .collect();
3665
3666    let example_pairs = match example_pairs {
3667        Ok(pairs) => pairs,
3668        Err(e) => {
3669            return (
3670                StatusCode::BAD_REQUEST,
3671                Json(serde_json::json!({
3672                    "error": "Invalid examples",
3673                    "message": e
3674                })),
3675            )
3676                .into_response();
3677        }
3678    };
3679
3680    // Create behavior config (use provided config or default)
3681    let behavior_config = if let Some(config_json) = request.config {
3682        // Try to deserialize custom config, fall back to default
3683        serde_json::from_value(config_json)
3684            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3685            .behavior_model
3686    } else {
3687        BehaviorModelConfig::default()
3688    };
3689
3690    // Create rule generator
3691    let generator = RuleGenerator::new(behavior_config);
3692
3693    // Generate rules with explanations
3694    let (rules, explanations) =
3695        match generator.generate_rules_with_explanations(example_pairs).await {
3696            Ok(result) => result,
3697            Err(e) => {
3698                return (
3699                    StatusCode::INTERNAL_SERVER_ERROR,
3700                    Json(serde_json::json!({
3701                        "error": "Rule generation failed",
3702                        "message": format!("Failed to generate rules: {}", e)
3703                    })),
3704                )
3705                    .into_response();
3706            }
3707        };
3708
3709    // Store explanations in ManagementState
3710    {
3711        let mut stored_explanations = state.rule_explanations.write().await;
3712        for explanation in &explanations {
3713            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3714        }
3715    }
3716
3717    // Prepare response
3718    let response = serde_json::json!({
3719        "success": true,
3720        "rules_generated": {
3721            "consistency_rules": rules.consistency_rules.len(),
3722            "schemas": rules.schemas.len(),
3723            "state_machines": rules.state_transitions.len(),
3724            "system_prompt": !rules.system_prompt.is_empty(),
3725        },
3726        "explanations": explanations.iter().map(|e| serde_json::json!({
3727            "rule_id": e.rule_id,
3728            "rule_type": e.rule_type,
3729            "confidence": e.confidence,
3730            "reasoning": e.reasoning,
3731        })).collect::<Vec<_>>(),
3732        "total_explanations": explanations.len(),
3733    });
3734
3735    Json(response).into_response()
3736}
3737
3738#[cfg(feature = "data-faker")]
3739fn extract_yaml_spec(text: &str) -> String {
3740    // Try to find YAML code blocks
3741    if let Some(start) = text.find("```yaml") {
3742        let yaml_start = text[start + 7..].trim_start();
3743        if let Some(end) = yaml_start.find("```") {
3744            return yaml_start[..end].trim().to_string();
3745        }
3746    }
3747    if let Some(start) = text.find("```") {
3748        let content_start = text[start + 3..].trim_start();
3749        if let Some(end) = content_start.find("```") {
3750            return content_start[..end].trim().to_string();
3751        }
3752    }
3753
3754    // Check if it starts with openapi: or asyncapi:
3755    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3756        return text.trim().to_string();
3757    }
3758
3759    // Return as-is if no code blocks found
3760    text.trim().to_string()
3761}
3762
3763/// Extract GraphQL schema from text content
3764#[cfg(feature = "data-faker")]
3765fn extract_graphql_schema(text: &str) -> String {
3766    // Try to find GraphQL code blocks
3767    if let Some(start) = text.find("```graphql") {
3768        let schema_start = text[start + 10..].trim_start();
3769        if let Some(end) = schema_start.find("```") {
3770            return schema_start[..end].trim().to_string();
3771        }
3772    }
3773    if let Some(start) = text.find("```") {
3774        let content_start = text[start + 3..].trim_start();
3775        if let Some(end) = content_start.find("```") {
3776            return content_start[..end].trim().to_string();
3777        }
3778    }
3779
3780    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3781    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3782        return text.trim().to_string();
3783    }
3784
3785    text.trim().to_string()
3786}
3787
3788// ========== Chaos Engineering Management ==========
3789
3790/// Get current chaos engineering configuration
3791async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3792    #[cfg(feature = "chaos")]
3793    {
3794        if let Some(chaos_state) = &_state.chaos_api_state {
3795            let config = chaos_state.config.read().await;
3796            // Convert ChaosConfig to JSON response format
3797            Json(serde_json::json!({
3798                "enabled": config.enabled,
3799                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3800                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3801                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3802                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3803            }))
3804            .into_response()
3805        } else {
3806            // Chaos API not available, return default
3807            Json(serde_json::json!({
3808                "enabled": false,
3809                "latency": null,
3810                "fault_injection": null,
3811                "rate_limit": null,
3812                "traffic_shaping": null,
3813            }))
3814            .into_response()
3815        }
3816    }
3817    #[cfg(not(feature = "chaos"))]
3818    {
3819        // Chaos feature not enabled
3820        Json(serde_json::json!({
3821            "enabled": false,
3822            "latency": null,
3823            "fault_injection": null,
3824            "rate_limit": null,
3825            "traffic_shaping": null,
3826        }))
3827        .into_response()
3828    }
3829}
3830
3831/// Request to update chaos configuration
3832#[derive(Debug, Deserialize)]
3833pub struct ChaosConfigUpdate {
3834    /// Whether to enable chaos engineering
3835    pub enabled: Option<bool>,
3836    /// Latency configuration
3837    pub latency: Option<serde_json::Value>,
3838    /// Fault injection configuration
3839    pub fault_injection: Option<serde_json::Value>,
3840    /// Rate limiting configuration
3841    pub rate_limit: Option<serde_json::Value>,
3842    /// Traffic shaping configuration
3843    pub traffic_shaping: Option<serde_json::Value>,
3844}
3845
3846/// Update chaos engineering configuration
3847async fn update_chaos_config(
3848    State(_state): State<ManagementState>,
3849    Json(_config_update): Json<ChaosConfigUpdate>,
3850) -> impl IntoResponse {
3851    #[cfg(feature = "chaos")]
3852    {
3853        if let Some(chaos_state) = &_state.chaos_api_state {
3854            use mockforge_chaos::config::{
3855                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3856            };
3857
3858            let mut config = chaos_state.config.write().await;
3859
3860            // Update enabled flag if provided
3861            if let Some(enabled) = _config_update.enabled {
3862                config.enabled = enabled;
3863            }
3864
3865            // Update latency config if provided
3866            if let Some(latency_json) = _config_update.latency {
3867                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3868                    config.latency = Some(latency);
3869                }
3870            }
3871
3872            // Update fault injection config if provided
3873            if let Some(fault_json) = _config_update.fault_injection {
3874                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3875                    config.fault_injection = Some(fault);
3876                }
3877            }
3878
3879            // Update rate limit config if provided
3880            if let Some(rate_json) = _config_update.rate_limit {
3881                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3882                    config.rate_limit = Some(rate);
3883                }
3884            }
3885
3886            // Update traffic shaping config if provided
3887            if let Some(traffic_json) = _config_update.traffic_shaping {
3888                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3889                    config.traffic_shaping = Some(traffic);
3890                }
3891            }
3892
3893            // Reinitialize middleware injectors with new config
3894            // The middleware will pick up the changes on the next request
3895            drop(config);
3896
3897            info!("Chaos configuration updated successfully");
3898            Json(serde_json::json!({
3899                "success": true,
3900                "message": "Chaos configuration updated and applied"
3901            }))
3902            .into_response()
3903        } else {
3904            (
3905                StatusCode::SERVICE_UNAVAILABLE,
3906                Json(serde_json::json!({
3907                    "success": false,
3908                    "error": "Chaos API not available",
3909                    "message": "Chaos engineering is not enabled or configured"
3910                })),
3911            )
3912                .into_response()
3913        }
3914    }
3915    #[cfg(not(feature = "chaos"))]
3916    {
3917        (
3918            StatusCode::NOT_IMPLEMENTED,
3919            Json(serde_json::json!({
3920                "success": false,
3921                "error": "Chaos feature not enabled",
3922                "message": "Chaos engineering feature is not compiled into this build"
3923            })),
3924        )
3925            .into_response()
3926    }
3927}
3928
3929// ========== Network Profile Management ==========
3930
3931/// List available network profiles
3932async fn list_network_profiles() -> impl IntoResponse {
3933    use mockforge_core::network_profiles::NetworkProfileCatalog;
3934
3935    let catalog = NetworkProfileCatalog::default();
3936    let profiles: Vec<serde_json::Value> = catalog
3937        .list_profiles_with_description()
3938        .iter()
3939        .map(|(name, description)| {
3940            serde_json::json!({
3941                "name": name,
3942                "description": description,
3943            })
3944        })
3945        .collect();
3946
3947    Json(serde_json::json!({
3948        "profiles": profiles
3949    }))
3950    .into_response()
3951}
3952
3953#[derive(Debug, Deserialize)]
3954/// Request to apply a network profile
3955pub struct ApplyNetworkProfileRequest {
3956    /// Name of the network profile to apply
3957    pub profile_name: String,
3958}
3959
3960/// Apply a network profile
3961async fn apply_network_profile(
3962    State(state): State<ManagementState>,
3963    Json(request): Json<ApplyNetworkProfileRequest>,
3964) -> impl IntoResponse {
3965    use mockforge_core::network_profiles::NetworkProfileCatalog;
3966
3967    let catalog = NetworkProfileCatalog::default();
3968    if let Some(profile) = catalog.get(&request.profile_name) {
3969        // Apply profile to server configuration if available
3970        // NetworkProfile contains latency and traffic_shaping configs
3971        if let Some(server_config) = &state.server_config {
3972            let mut config = server_config.write().await;
3973
3974            // Apply network profile's traffic shaping to core config
3975            use mockforge_core::config::NetworkShapingConfig;
3976
3977            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3978            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3979            // which has bandwidth and burst_loss fields
3980            let network_shaping = NetworkShapingConfig {
3981                enabled: profile.traffic_shaping.bandwidth.enabled
3982                    || profile.traffic_shaping.burst_loss.enabled,
3983                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3984                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3985                max_connections: 1000, // Default value
3986            };
3987
3988            // Update chaos config if it exists, or create it
3989            // Chaos config is in observability.chaos, not core.chaos
3990            if let Some(ref mut chaos) = config.observability.chaos {
3991                chaos.traffic_shaping = Some(network_shaping);
3992            } else {
3993                // Create minimal chaos config with traffic shaping
3994                use mockforge_core::config::ChaosEngConfig;
3995                config.observability.chaos = Some(ChaosEngConfig {
3996                    enabled: true,
3997                    latency: None,
3998                    fault_injection: None,
3999                    rate_limit: None,
4000                    traffic_shaping: Some(network_shaping),
4001                    scenario: None,
4002                });
4003            }
4004
4005            info!("Network profile '{}' applied to server configuration", request.profile_name);
4006        } else {
4007            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4008        }
4009
4010        // Also update chaos API state if available
4011        #[cfg(feature = "chaos")]
4012        {
4013            if let Some(chaos_state) = &state.chaos_api_state {
4014                use mockforge_chaos::config::TrafficShapingConfig;
4015
4016                let mut chaos_config = chaos_state.config.write().await;
4017                // Apply profile's traffic shaping to chaos API state
4018                let chaos_traffic_shaping = TrafficShapingConfig {
4019                    enabled: profile.traffic_shaping.bandwidth.enabled
4020                        || profile.traffic_shaping.burst_loss.enabled,
4021                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4022                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4023                    max_connections: 0,
4024                    connection_timeout_ms: 30000,
4025                };
4026                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4027                chaos_config.enabled = true; // Enable chaos when applying a profile
4028                drop(chaos_config);
4029                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4030            }
4031        }
4032
4033        Json(serde_json::json!({
4034            "success": true,
4035            "message": format!("Network profile '{}' applied", request.profile_name),
4036            "profile": {
4037                "name": profile.name,
4038                "description": profile.description,
4039            }
4040        }))
4041        .into_response()
4042    } else {
4043        (
4044            StatusCode::NOT_FOUND,
4045            Json(serde_json::json!({
4046                "error": "Profile not found",
4047                "message": format!("Network profile '{}' not found", request.profile_name)
4048            })),
4049        )
4050            .into_response()
4051    }
4052}
4053
4054/// Build the management API router with UI Builder support
4055pub fn management_router_with_ui_builder(
4056    state: ManagementState,
4057    server_config: mockforge_core::config::ServerConfig,
4058) -> Router {
4059    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4060
4061    // Create the base management router
4062    let management = management_router(state);
4063
4064    // Create UI Builder state and router
4065    let ui_builder_state = UIBuilderState::new(server_config);
4066    let ui_builder = create_ui_builder_router(ui_builder_state);
4067
4068    // Nest UI Builder under /ui-builder
4069    management.nest("/ui-builder", ui_builder)
4070}
4071
4072/// Build management router with spec import API
4073pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4074    use crate::spec_import::{spec_import_router, SpecImportState};
4075
4076    // Create base management router
4077    let management = management_router(state);
4078
4079    // Merge with spec import router
4080    Router::new()
4081        .merge(management)
4082        .merge(spec_import_router(SpecImportState::new()))
4083}
4084
4085#[cfg(test)]
4086mod tests {
4087    use super::*;
4088
4089    #[tokio::test]
4090    async fn test_create_and_get_mock() {
4091        let state = ManagementState::new(None, None, 3000);
4092
4093        let mock = MockConfig {
4094            id: "test-1".to_string(),
4095            name: "Test Mock".to_string(),
4096            method: "GET".to_string(),
4097            path: "/test".to_string(),
4098            response: MockResponse {
4099                body: serde_json::json!({"message": "test"}),
4100                headers: None,
4101            },
4102            enabled: true,
4103            latency_ms: None,
4104            status_code: Some(200),
4105            request_match: None,
4106            priority: None,
4107            scenario: None,
4108            required_scenario_state: None,
4109            new_scenario_state: None,
4110        };
4111
4112        // Create mock
4113        {
4114            let mut mocks = state.mocks.write().await;
4115            mocks.push(mock.clone());
4116        }
4117
4118        // Get mock
4119        let mocks = state.mocks.read().await;
4120        let found = mocks.iter().find(|m| m.id == "test-1");
4121        assert!(found.is_some());
4122        assert_eq!(found.unwrap().name, "Test Mock");
4123    }
4124
4125    #[tokio::test]
4126    async fn test_server_stats() {
4127        let state = ManagementState::new(None, None, 3000);
4128
4129        // Add some mocks
4130        {
4131            let mut mocks = state.mocks.write().await;
4132            mocks.push(MockConfig {
4133                id: "1".to_string(),
4134                name: "Mock 1".to_string(),
4135                method: "GET".to_string(),
4136                path: "/test1".to_string(),
4137                response: MockResponse {
4138                    body: serde_json::json!({}),
4139                    headers: None,
4140                },
4141                enabled: true,
4142                latency_ms: None,
4143                status_code: Some(200),
4144                request_match: None,
4145                priority: None,
4146                scenario: None,
4147                required_scenario_state: None,
4148                new_scenario_state: None,
4149            });
4150            mocks.push(MockConfig {
4151                id: "2".to_string(),
4152                name: "Mock 2".to_string(),
4153                method: "POST".to_string(),
4154                path: "/test2".to_string(),
4155                response: MockResponse {
4156                    body: serde_json::json!({}),
4157                    headers: None,
4158                },
4159                enabled: false,
4160                latency_ms: None,
4161                status_code: Some(201),
4162                request_match: None,
4163                priority: None,
4164                scenario: None,
4165                required_scenario_state: None,
4166                new_scenario_state: None,
4167            });
4168        }
4169
4170        let mocks = state.mocks.read().await;
4171        assert_eq!(mocks.len(), 2);
4172        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4173    }
4174
4175    #[test]
4176    fn test_mock_matches_request_with_xpath_absolute_path() {
4177        let mock = MockConfig {
4178            id: "xpath-1".to_string(),
4179            name: "XPath Match".to_string(),
4180            method: "POST".to_string(),
4181            path: "/xml".to_string(),
4182            response: MockResponse {
4183                body: serde_json::json!({"ok": true}),
4184                headers: None,
4185            },
4186            enabled: true,
4187            latency_ms: None,
4188            status_code: Some(200),
4189            request_match: Some(RequestMatchCriteria {
4190                xpath: Some("/root/order/id".to_string()),
4191                ..Default::default()
4192            }),
4193            priority: None,
4194            scenario: None,
4195            required_scenario_state: None,
4196            new_scenario_state: None,
4197        };
4198
4199        let body = br#"<root><order><id>123</id></order></root>"#;
4200        let headers = std::collections::HashMap::new();
4201        let query = std::collections::HashMap::new();
4202
4203        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4204    }
4205
4206    #[test]
4207    fn test_mock_matches_request_with_xpath_text_predicate() {
4208        let mock = MockConfig {
4209            id: "xpath-2".to_string(),
4210            name: "XPath Predicate Match".to_string(),
4211            method: "POST".to_string(),
4212            path: "/xml".to_string(),
4213            response: MockResponse {
4214                body: serde_json::json!({"ok": true}),
4215                headers: None,
4216            },
4217            enabled: true,
4218            latency_ms: None,
4219            status_code: Some(200),
4220            request_match: Some(RequestMatchCriteria {
4221                xpath: Some("//order/id[text()='123']".to_string()),
4222                ..Default::default()
4223            }),
4224            priority: None,
4225            scenario: None,
4226            required_scenario_state: None,
4227            new_scenario_state: None,
4228        };
4229
4230        let body = br#"<root><order><id>123</id></order></root>"#;
4231        let headers = std::collections::HashMap::new();
4232        let query = std::collections::HashMap::new();
4233
4234        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4235    }
4236
4237    #[test]
4238    fn test_mock_matches_request_with_xpath_no_match() {
4239        let mock = MockConfig {
4240            id: "xpath-3".to_string(),
4241            name: "XPath No Match".to_string(),
4242            method: "POST".to_string(),
4243            path: "/xml".to_string(),
4244            response: MockResponse {
4245                body: serde_json::json!({"ok": true}),
4246                headers: None,
4247            },
4248            enabled: true,
4249            latency_ms: None,
4250            status_code: Some(200),
4251            request_match: Some(RequestMatchCriteria {
4252                xpath: Some("//order/id[text()='456']".to_string()),
4253                ..Default::default()
4254            }),
4255            priority: None,
4256            scenario: None,
4257            required_scenario_state: None,
4258            new_scenario_state: None,
4259        };
4260
4261        let body = br#"<root><order><id>123</id></order></root>"#;
4262        let headers = std::collections::HashMap::new();
4263        let query = std::collections::HashMap::new();
4264
4265        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4266    }
4267}