Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32/// Get the broadcast channel capacity from environment or use default
33fn get_message_broadcast_capacity() -> usize {
34    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35        .ok()
36        .and_then(|s| s.parse().ok())
37        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40/// Message event types for real-time monitoring
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45    #[cfg(feature = "mqtt")]
46    /// MQTT message event
47    Mqtt(MqttMessageEvent),
48    #[cfg(feature = "kafka")]
49    /// Kafka message event
50    Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54/// MQTT message event for real-time monitoring
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57    /// MQTT topic name
58    pub topic: String,
59    /// Message payload content
60    pub payload: String,
61    /// Quality of Service level (0, 1, or 2)
62    pub qos: u8,
63    /// Whether the message is retained
64    pub retain: bool,
65    /// RFC3339 formatted timestamp
66    pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72    pub topic: String,
73    pub key: Option<String>,
74    pub value: String,
75    pub partition: i32,
76    pub offset: i64,
77    pub headers: Option<std::collections::HashMap<String, String>>,
78    pub timestamp: String,
79}
80
81/// Mock configuration representation
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84    /// Unique identifier for the mock
85    #[serde(skip_serializing_if = "String::is_empty")]
86    pub id: String,
87    /// Human-readable name for the mock
88    pub name: String,
89    /// HTTP method (GET, POST, etc.)
90    pub method: String,
91    /// API path pattern to match
92    pub path: String,
93    /// Response configuration
94    pub response: MockResponse,
95    /// Whether this mock is currently enabled
96    #[serde(default = "default_true")]
97    pub enabled: bool,
98    /// Optional latency to inject in milliseconds
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub latency_ms: Option<u64>,
101    /// Optional HTTP status code override
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub status_code: Option<u16>,
104    /// Request matching criteria (headers, query params, body patterns)
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub request_match: Option<RequestMatchCriteria>,
107    /// Priority for mock ordering (higher priority mocks are matched first)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub priority: Option<i32>,
110    /// Scenario name for stateful mocking
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub scenario: Option<String>,
113    /// Required scenario state for this mock to be active
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub required_scenario_state: Option<String>,
116    /// New scenario state after this mock is matched
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122    true
123}
124
125/// Mock response configuration
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128    /// Response body as JSON
129    pub body: serde_json::Value,
130    /// Optional custom response headers
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135/// Request matching criteria for advanced request matching
136#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138    /// Headers that must be present and match (case-insensitive header names)
139    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140    pub headers: std::collections::HashMap<String, String>,
141    /// Query parameters that must be present and match
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub query_params: std::collections::HashMap<String, String>,
144    /// Request body pattern (supports exact match or regex)
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub body_pattern: Option<String>,
147    /// JSONPath expression for JSON body matching
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub json_path: Option<String>,
150    /// XPath expression for XML body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub xpath: Option<String>,
153    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub custom_matcher: Option<String>,
156}
157
158/// Check if a request matches the given mock configuration
159///
160/// This function implements comprehensive request matching including:
161/// - Method and path matching
162/// - Header matching (with regex support)
163/// - Query parameter matching
164/// - Body pattern matching (exact, regex, JSONPath, XPath)
165/// - Custom matcher expressions
166pub fn mock_matches_request(
167    mock: &MockConfig,
168    method: &str,
169    path: &str,
170    headers: &std::collections::HashMap<String, String>,
171    query_params: &std::collections::HashMap<String, String>,
172    body: Option<&[u8]>,
173) -> bool {
174    use regex::Regex;
175
176    // Check if mock is enabled
177    if !mock.enabled {
178        return false;
179    }
180
181    // Check method (case-insensitive)
182    if mock.method.to_uppercase() != method.to_uppercase() {
183        return false;
184    }
185
186    // Check path pattern (supports wildcards and path parameters)
187    if !path_matches_pattern(&mock.path, path) {
188        return false;
189    }
190
191    // Check request matching criteria if present
192    if let Some(criteria) = &mock.request_match {
193        // Check headers
194        for (key, expected_value) in &criteria.headers {
195            let header_key_lower = key.to_lowercase();
196            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198            if let Some((_, actual_value)) = found {
199                // Try regex match first, then exact match
200                if let Ok(re) = Regex::new(expected_value) {
201                    if !re.is_match(actual_value) {
202                        return false;
203                    }
204                } else if actual_value != expected_value {
205                    return false;
206                }
207            } else {
208                return false; // Header not found
209            }
210        }
211
212        // Check query parameters
213        for (key, expected_value) in &criteria.query_params {
214            if let Some(actual_value) = query_params.get(key) {
215                if actual_value != expected_value {
216                    return false;
217                }
218            } else {
219                return false; // Query param not found
220            }
221        }
222
223        // Check body pattern
224        if let Some(pattern) = &criteria.body_pattern {
225            if let Some(body_bytes) = body {
226                let body_str = String::from_utf8_lossy(body_bytes);
227                // Try regex first, then exact match
228                if let Ok(re) = Regex::new(pattern) {
229                    if !re.is_match(&body_str) {
230                        return false;
231                    }
232                } else if body_str.as_ref() != pattern {
233                    return false;
234                }
235            } else {
236                return false; // Body required but not present
237            }
238        }
239
240        // Check JSONPath (simplified implementation)
241        if let Some(json_path) = &criteria.json_path {
242            if let Some(body_bytes) = body {
243                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245                        // Simple JSONPath check
246                        if !json_path_exists(&json_value, json_path) {
247                            return false;
248                        }
249                    }
250                }
251            }
252        }
253
254        // Check XPath (supports a focused subset)
255        if let Some(xpath) = &criteria.xpath {
256            if let Some(body_bytes) = body {
257                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258                    if !xml_xpath_exists(body_str, xpath) {
259                        return false;
260                    }
261                } else {
262                    return false;
263                }
264            } else {
265                return false; // Body required but not present
266            }
267        }
268
269        // Check custom matcher
270        if let Some(custom) = &criteria.custom_matcher {
271            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272                return false;
273            }
274        }
275    }
276
277    true
278}
279
280/// Check if a path matches a pattern (supports wildcards and path parameters)
281fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282    // Exact match
283    if pattern == path {
284        return true;
285    }
286
287    // Wildcard match
288    if pattern == "*" {
289        return true;
290    }
291
292    // Path parameter matching (e.g., /users/{id} matches /users/123)
293    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296    if pattern_parts.len() != path_parts.len() {
297        // Check for wildcard patterns
298        if pattern.contains('*') {
299            return matches_wildcard_pattern(pattern, path);
300        }
301        return false;
302    }
303
304    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305        // Check for path parameters {param}
306        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307            continue; // Matches any value
308        }
309
310        if pattern_part != path_part {
311            return false;
312        }
313    }
314
315    true
316}
317
318/// Check if path matches a wildcard pattern
319fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320    use regex::Regex;
321
322    // Convert pattern to regex
323    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326        return re.is_match(path);
327    }
328
329    false
330}
331
332/// Check if a JSONPath exists in a JSON value (simplified implementation)
333fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
335    // This handles simple paths like $.field or $.field.subfield
336    if let Some(path) = json_path.strip_prefix("$.") {
337        let parts: Vec<&str> = path.split('.').collect();
338
339        let mut current = json;
340        for part in parts {
341            if let Some(obj) = current.as_object() {
342                if let Some(value) = obj.get(part) {
343                    current = value;
344                } else {
345                    return false;
346                }
347            } else {
348                return false;
349            }
350        }
351        true
352    } else {
353        // For complex JSONPath expressions, would need a proper JSONPath library
354        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355        false
356    }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361    name: String,
362    text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366    if segment.is_empty() {
367        return None;
368    }
369
370    let trimmed = segment.trim();
371    if let Some(bracket_start) = trimmed.find('[') {
372        if !trimmed.ends_with(']') {
373            return None;
374        }
375
376        let name = trimmed[..bracket_start].trim();
377        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378        let predicate = predicate.trim();
379
380        // Support simple predicate: [text()="value"] or [text()='value']
381        if let Some(raw) = predicate.strip_prefix("text()=") {
382            let raw = raw.trim();
383            if raw.len() >= 2
384                && ((raw.starts_with('"') && raw.ends_with('"'))
385                    || (raw.starts_with('\'') && raw.ends_with('\'')))
386            {
387                let text = raw[1..raw.len() - 1].to_string();
388                if !name.is_empty() {
389                    return Some(XPathSegment {
390                        name: name.to_string(),
391                        text_equals: Some(text),
392                    });
393                }
394            }
395        }
396
397        None
398    } else {
399        Some(XPathSegment {
400            name: trimmed.to_string(),
401            text_equals: None,
402        })
403    }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407    if !node.is_element() {
408        return false;
409    }
410    if node.tag_name().name() != segment.name {
411        return false;
412    }
413    match &segment.text_equals {
414        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415        None => true,
416    }
417}
418
419/// Check if an XPath expression matches an XML body.
420///
421/// Supported subset:
422/// - Absolute paths: `/root/child/item`
423/// - Descendant search: `//item` and `//parent/child`
424/// - Optional text predicate per segment: `item[text()="value"]`
425fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426    let doc = match roxmltree::Document::parse(xml_body) {
427        Ok(doc) => doc,
428        Err(err) => {
429            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430            return false;
431        }
432    };
433
434    let expr = xpath.trim();
435    if expr.is_empty() {
436        return false;
437    }
438
439    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440        (true, rest)
441    } else if let Some(rest) = expr.strip_prefix('/') {
442        (false, rest)
443    } else {
444        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445        return false;
446    };
447
448    let segments: Vec<XPathSegment> = path_str
449        .split('/')
450        .filter(|s| !s.trim().is_empty())
451        .filter_map(parse_xpath_segment)
452        .collect();
453
454    if segments.is_empty() {
455        return false;
456    }
457
458    if is_descendant {
459        let first = &segments[0];
460        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461            let mut frontier = vec![node];
462            for segment in &segments[1..] {
463                let mut next_frontier = Vec::new();
464                for parent in &frontier {
465                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466                        next_frontier.push(child);
467                    }
468                }
469                if next_frontier.is_empty() {
470                    frontier.clear();
471                    break;
472                }
473                frontier = next_frontier;
474            }
475            if !frontier.is_empty() {
476                return true;
477            }
478        }
479        false
480    } else {
481        let mut frontier = vec![doc.root_element()];
482        for (index, segment) in segments.iter().enumerate() {
483            let mut next_frontier = Vec::new();
484            for parent in &frontier {
485                if index == 0 {
486                    if segment_matches(*parent, segment) {
487                        next_frontier.push(*parent);
488                    }
489                    continue;
490                }
491                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492                    next_frontier.push(child);
493                }
494            }
495            if next_frontier.is_empty() {
496                return false;
497            }
498            frontier = next_frontier;
499        }
500        !frontier.is_empty()
501    }
502}
503
504/// Evaluate a custom matcher expression
505fn evaluate_custom_matcher(
506    expression: &str,
507    method: &str,
508    path: &str,
509    headers: &std::collections::HashMap<String, String>,
510    query_params: &std::collections::HashMap<String, String>,
511    body: Option<&[u8]>,
512) -> bool {
513    use regex::Regex;
514
515    let expr = expression.trim();
516
517    // Handle equality expressions (field == "value")
518    if expr.contains("==") {
519        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520        if parts.len() != 2 {
521            return false;
522        }
523
524        let field = parts[0];
525        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527        match field {
528            "method" => method == expected_value,
529            "path" => path == expected_value,
530            _ if field.starts_with("headers.") => {
531                let header_name = &field[8..];
532                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533            }
534            _ if field.starts_with("query.") => {
535                let param_name = &field[6..];
536                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537            }
538            _ => false,
539        }
540    }
541    // Handle regex match expressions (field =~ "pattern")
542    else if expr.contains("=~") {
543        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544        if parts.len() != 2 {
545            return false;
546        }
547
548        let field = parts[0];
549        let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551        if let Ok(re) = Regex::new(pattern) {
552            match field {
553                "method" => re.is_match(method),
554                "path" => re.is_match(path),
555                _ if field.starts_with("headers.") => {
556                    let header_name = &field[8..];
557                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558                }
559                _ if field.starts_with("query.") => {
560                    let param_name = &field[6..];
561                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562                }
563                _ => false,
564            }
565        } else {
566            false
567        }
568    }
569    // Handle contains expressions (field contains "value")
570    else if expr.contains("contains") {
571        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572        if parts.len() != 2 {
573            return false;
574        }
575
576        let field = parts[0];
577        let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579        match field {
580            "path" => path.contains(search_value),
581            _ if field.starts_with("headers.") => {
582                let header_name = &field[8..];
583                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584            }
585            _ if field.starts_with("body") => {
586                if let Some(body_bytes) = body {
587                    let body_str = String::from_utf8_lossy(body_bytes);
588                    body_str.contains(search_value)
589                } else {
590                    false
591                }
592            }
593            _ => false,
594        }
595    } else {
596        // Unknown expression format
597        tracing::warn!("Unknown custom matcher expression format: {}", expr);
598        false
599    }
600}
601
602/// Server statistics
603#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605    /// Server uptime in seconds
606    pub uptime_seconds: u64,
607    /// Total number of requests processed
608    pub total_requests: u64,
609    /// Number of active mock configurations
610    pub active_mocks: usize,
611    /// Number of currently enabled mocks
612    pub enabled_mocks: usize,
613    /// Number of registered API routes
614    pub registered_routes: usize,
615}
616
617/// Server configuration info
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620    /// MockForge version string
621    pub version: String,
622    /// Server port number
623    pub port: u16,
624    /// Whether an OpenAPI spec is loaded
625    pub has_openapi_spec: bool,
626    /// Optional path to the OpenAPI spec file
627    #[serde(skip_serializing_if = "Option::is_none")]
628    pub spec_path: Option<String>,
629}
630
631/// Shared state for the management API
632#[derive(Clone)]
633pub struct ManagementState {
634    /// Collection of mock configurations
635    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636    /// Optional OpenAPI specification
637    pub spec: Option<Arc<OpenApiSpec>>,
638    /// Optional path to the OpenAPI spec file
639    pub spec_path: Option<String>,
640    /// Server port number
641    pub port: u16,
642    /// Server start time for uptime calculation
643    pub start_time: std::time::Instant,
644    /// Counter for total requests processed
645    pub request_counter: Arc<RwLock<u64>>,
646    /// Optional proxy configuration for migration pipeline
647    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648    /// Optional SMTP registry for email mocking
649    #[cfg(feature = "smtp")]
650    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651    /// Optional MQTT broker for message mocking
652    #[cfg(feature = "mqtt")]
653    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654    /// Optional Kafka broker for event streaming
655    #[cfg(feature = "kafka")]
656    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657    /// Broadcast channel for message events (MQTT & Kafka)
658    #[cfg(any(feature = "mqtt", feature = "kafka"))]
659    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660    /// State machine manager for scenario state machines
661    pub state_machine_manager:
662        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663    /// Optional WebSocket broadcast channel for real-time updates
664    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665    /// Lifecycle hook registry for extensibility
666    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667    /// Rule explanations storage (in-memory for now)
668    pub rule_explanations: Arc<
669        RwLock<
670            std::collections::HashMap<
671                String,
672                mockforge_core::intelligent_behavior::RuleExplanation,
673            >,
674        >,
675    >,
676    /// Optional chaos API state for chaos config management
677    #[cfg(feature = "chaos")]
678    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679    /// Optional server configuration for profile application
680    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684    /// Create a new management state
685    ///
686    /// # Arguments
687    /// * `spec` - Optional OpenAPI specification
688    /// * `spec_path` - Optional path to the OpenAPI spec file
689    /// * `port` - Server port number
690    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691        Self {
692            mocks: Arc::new(RwLock::new(Vec::new())),
693            spec,
694            spec_path,
695            port,
696            start_time: std::time::Instant::now(),
697            request_counter: Arc::new(RwLock::new(0)),
698            proxy_config: None,
699            #[cfg(feature = "smtp")]
700            smtp_registry: None,
701            #[cfg(feature = "mqtt")]
702            mqtt_broker: None,
703            #[cfg(feature = "kafka")]
704            kafka_broker: None,
705            #[cfg(any(feature = "mqtt", feature = "kafka"))]
706            message_events: {
707                let capacity = get_message_broadcast_capacity();
708                let (tx, _) = broadcast::channel(capacity);
709                Arc::new(tx)
710            },
711            state_machine_manager: Arc::new(RwLock::new(
712                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713            )),
714            ws_broadcast: None,
715            lifecycle_hooks: None,
716            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717            #[cfg(feature = "chaos")]
718            chaos_api_state: None,
719            server_config: None,
720        }
721    }
722
723    /// Add lifecycle hook registry to management state
724    pub fn with_lifecycle_hooks(
725        mut self,
726        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727    ) -> Self {
728        self.lifecycle_hooks = Some(hooks);
729        self
730    }
731
732    /// Add WebSocket broadcast channel to management state
733    pub fn with_ws_broadcast(
734        mut self,
735        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736    ) -> Self {
737        self.ws_broadcast = Some(ws_broadcast);
738        self
739    }
740
741    /// Add proxy configuration to management state
742    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743        self.proxy_config = Some(proxy_config);
744        self
745    }
746
747    #[cfg(feature = "smtp")]
748    /// Add SMTP registry to management state
749    pub fn with_smtp_registry(
750        mut self,
751        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752    ) -> Self {
753        self.smtp_registry = Some(smtp_registry);
754        self
755    }
756
757    #[cfg(feature = "mqtt")]
758    /// Add MQTT broker to management state
759    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760        self.mqtt_broker = Some(mqtt_broker);
761        self
762    }
763
764    #[cfg(feature = "kafka")]
765    /// Add Kafka broker to management state
766    pub fn with_kafka_broker(
767        mut self,
768        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769    ) -> Self {
770        self.kafka_broker = Some(kafka_broker);
771        self
772    }
773
774    #[cfg(feature = "chaos")]
775    /// Add chaos API state to management state
776    pub fn with_chaos_api_state(
777        mut self,
778        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779    ) -> Self {
780        self.chaos_api_state = Some(chaos_api_state);
781        self
782    }
783
784    /// Add server configuration to management state
785    pub fn with_server_config(
786        mut self,
787        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788    ) -> Self {
789        self.server_config = Some(server_config);
790        self
791    }
792}
793
794/// List all mocks
795async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796    let mocks = state.mocks.read().await;
797    Json(serde_json::json!({
798        "mocks": *mocks,
799        "total": mocks.len(),
800        "enabled": mocks.iter().filter(|m| m.enabled).count()
801    }))
802}
803
804/// Get a specific mock by ID
805async fn get_mock(
806    State(state): State<ManagementState>,
807    Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809    let mocks = state.mocks.read().await;
810    mocks
811        .iter()
812        .find(|m| m.id == id)
813        .cloned()
814        .map(Json)
815        .ok_or(StatusCode::NOT_FOUND)
816}
817
818/// Create a new mock
819async fn create_mock(
820    State(state): State<ManagementState>,
821    Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823    let mut mocks = state.mocks.write().await;
824
825    // Generate ID if not provided
826    if mock.id.is_empty() {
827        mock.id = uuid::Uuid::new_v4().to_string();
828    }
829
830    // Check for duplicate ID
831    if mocks.iter().any(|m| m.id == mock.id) {
832        return Err(StatusCode::CONFLICT);
833    }
834
835    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837    // Invoke lifecycle hooks
838    if let Some(hooks) = &state.lifecycle_hooks {
839        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840            id: mock.id.clone(),
841            name: mock.name.clone(),
842            config: serde_json::to_value(&mock).unwrap_or_default(),
843        };
844        hooks.invoke_mock_created(&event).await;
845    }
846
847    mocks.push(mock.clone());
848
849    // Broadcast WebSocket event
850    if let Some(tx) = &state.ws_broadcast {
851        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852    }
853
854    Ok(Json(mock))
855}
856
857/// Update an existing mock
858async fn update_mock(
859    State(state): State<ManagementState>,
860    Path(id): Path<String>,
861    Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863    let mut mocks = state.mocks.write().await;
864
865    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867    // Get old mock for comparison
868    let old_mock = mocks[position].clone();
869
870    info!("Updating mock: {}", id);
871    mocks[position] = updated_mock.clone();
872
873    // Invoke lifecycle hooks
874    if let Some(hooks) = &state.lifecycle_hooks {
875        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876            id: updated_mock.id.clone(),
877            name: updated_mock.name.clone(),
878            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879        };
880        hooks.invoke_mock_updated(&event).await;
881
882        // Check if enabled state changed
883        if old_mock.enabled != updated_mock.enabled {
884            let state_event = if updated_mock.enabled {
885                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886                    id: updated_mock.id.clone(),
887                }
888            } else {
889                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890                    id: updated_mock.id.clone(),
891                }
892            };
893            hooks.invoke_mock_state_changed(&state_event).await;
894        }
895    }
896
897    // Broadcast WebSocket event
898    if let Some(tx) = &state.ws_broadcast {
899        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900    }
901
902    Ok(Json(updated_mock))
903}
904
905/// Delete a mock
906async fn delete_mock(
907    State(state): State<ManagementState>,
908    Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910    let mut mocks = state.mocks.write().await;
911
912    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914    // Get mock info before deletion for lifecycle hooks
915    let deleted_mock = mocks[position].clone();
916
917    info!("Deleting mock: {}", id);
918    mocks.remove(position);
919
920    // Invoke lifecycle hooks
921    if let Some(hooks) = &state.lifecycle_hooks {
922        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923            id: deleted_mock.id.clone(),
924            name: deleted_mock.name.clone(),
925        };
926        hooks.invoke_mock_deleted(&event).await;
927    }
928
929    // Broadcast WebSocket event
930    if let Some(tx) = &state.ws_broadcast {
931        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932    }
933
934    Ok(StatusCode::NO_CONTENT)
935}
936
937/// Request to validate configuration
938#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940    /// Configuration to validate (as JSON)
941    pub config: serde_json::Value,
942    /// Format of the configuration ("json" or "yaml")
943    #[serde(default = "default_format")]
944    pub format: String,
945}
946
947fn default_format() -> String {
948    "json".to_string()
949}
950
951/// Validate configuration without applying it
952async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953    use mockforge_core::config::ServerConfig;
954
955    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956        "yaml" | "yml" => {
957            let yaml_str = match serde_json::to_string(&request.config) {
958                Ok(s) => s,
959                Err(e) => {
960                    return (
961                        StatusCode::BAD_REQUEST,
962                        Json(serde_json::json!({
963                            "valid": false,
964                            "error": format!("Failed to convert to string: {}", e),
965                            "message": "Configuration validation failed"
966                        })),
967                    )
968                        .into_response();
969                }
970            };
971            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972        }
973        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974    };
975
976    match config_result {
977        Ok(_) => Json(serde_json::json!({
978            "valid": true,
979            "message": "Configuration is valid"
980        }))
981        .into_response(),
982        Err(e) => (
983            StatusCode::BAD_REQUEST,
984            Json(serde_json::json!({
985                "valid": false,
986                "error": format!("Invalid configuration: {}", e),
987                "message": "Configuration validation failed"
988            })),
989        )
990            .into_response(),
991    }
992}
993
994/// Request for bulk configuration update
995#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997    /// Partial configuration updates (only specified fields will be updated)
998    pub updates: serde_json::Value,
999}
1000
1001/// Bulk update configuration
1002///
1003/// This endpoint allows updating multiple configuration options at once.
1004/// Only the specified fields in the updates object will be modified.
1005///
1006/// Configuration updates are applied to the server configuration if available
1007/// in ManagementState. Changes take effect immediately for supported settings.
1008async fn bulk_update_config(
1009    State(state): State<ManagementState>,
1010    Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012    // Validate the updates structure
1013    if !request.updates.is_object() {
1014        return (
1015            StatusCode::BAD_REQUEST,
1016            Json(serde_json::json!({
1017                "error": "Invalid request",
1018                "message": "Updates must be a JSON object"
1019            })),
1020        )
1021            .into_response();
1022    }
1023
1024    // Try to validate as partial ServerConfig
1025    use mockforge_core::config::ServerConfig;
1026
1027    // Create a minimal valid config and try to merge updates
1028    let base_config = ServerConfig::default();
1029    let base_json = match serde_json::to_value(&base_config) {
1030        Ok(v) => v,
1031        Err(e) => {
1032            return (
1033                StatusCode::INTERNAL_SERVER_ERROR,
1034                Json(serde_json::json!({
1035                    "error": "Internal error",
1036                    "message": format!("Failed to serialize base config: {}", e)
1037                })),
1038            )
1039                .into_response();
1040        }
1041    };
1042
1043    // Merge updates into base config (simplified merge)
1044    let mut merged = base_json.clone();
1045    if let (Some(merged_obj), Some(updates_obj)) =
1046        (merged.as_object_mut(), request.updates.as_object())
1047    {
1048        for (key, value) in updates_obj {
1049            merged_obj.insert(key.clone(), value.clone());
1050        }
1051    }
1052
1053    // Validate the merged config
1054    match serde_json::from_value::<ServerConfig>(merged) {
1055        Ok(validated_config) => {
1056            // Apply config if server_config is available in ManagementState
1057            if let Some(ref config_lock) = state.server_config {
1058                let mut config = config_lock.write().await;
1059                *config = validated_config;
1060                Json(serde_json::json!({
1061                    "success": true,
1062                    "message": "Bulk configuration update applied successfully",
1063                    "updates_received": request.updates,
1064                    "validated": true,
1065                    "applied": true
1066                }))
1067                .into_response()
1068            } else {
1069                Json(serde_json::json!({
1070                    "success": true,
1071                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1072                    "updates_received": request.updates,
1073                    "validated": true,
1074                    "applied": false
1075                }))
1076                .into_response()
1077            }
1078        }
1079        Err(e) => (
1080            StatusCode::BAD_REQUEST,
1081            Json(serde_json::json!({
1082                "error": "Invalid configuration",
1083                "message": format!("Configuration validation failed: {}", e),
1084                "validated": false
1085            })),
1086        )
1087            .into_response(),
1088    }
1089}
1090
1091/// Get server statistics
1092async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1093    let mocks = state.mocks.read().await;
1094    let request_count = *state.request_counter.read().await;
1095
1096    Json(ServerStats {
1097        uptime_seconds: state.start_time.elapsed().as_secs(),
1098        total_requests: request_count,
1099        active_mocks: mocks.len(),
1100        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1101        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1102    })
1103}
1104
1105/// Get server configuration
1106async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1107    Json(ServerConfig {
1108        version: env!("CARGO_PKG_VERSION").to_string(),
1109        port: state.port,
1110        has_openapi_spec: state.spec.is_some(),
1111        spec_path: state.spec_path.clone(),
1112    })
1113}
1114
1115/// Health check endpoint
1116async fn health_check() -> Json<serde_json::Value> {
1117    Json(serde_json::json!({
1118        "status": "healthy",
1119        "service": "mockforge-management",
1120        "timestamp": chrono::Utc::now().to_rfc3339()
1121    }))
1122}
1123
1124/// Export format for mock configurations
1125#[derive(Debug, Clone, Serialize, Deserialize)]
1126#[serde(rename_all = "lowercase")]
1127pub enum ExportFormat {
1128    /// JSON format
1129    Json,
1130    /// YAML format
1131    Yaml,
1132}
1133
1134/// Export mocks in specified format
1135async fn export_mocks(
1136    State(state): State<ManagementState>,
1137    Query(params): Query<std::collections::HashMap<String, String>>,
1138) -> Result<(StatusCode, String), StatusCode> {
1139    let mocks = state.mocks.read().await;
1140
1141    let format = params
1142        .get("format")
1143        .map(|f| match f.as_str() {
1144            "yaml" | "yml" => ExportFormat::Yaml,
1145            _ => ExportFormat::Json,
1146        })
1147        .unwrap_or(ExportFormat::Json);
1148
1149    match format {
1150        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1151            .map(|json| (StatusCode::OK, json))
1152            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1153        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1154            .map(|yaml| (StatusCode::OK, yaml))
1155            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1156    }
1157}
1158
1159/// Import mocks from JSON/YAML
1160async fn import_mocks(
1161    State(state): State<ManagementState>,
1162    Json(mocks): Json<Vec<MockConfig>>,
1163) -> impl IntoResponse {
1164    let mut current_mocks = state.mocks.write().await;
1165    current_mocks.clear();
1166    current_mocks.extend(mocks);
1167    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1168}
1169
1170#[cfg(feature = "smtp")]
1171/// List SMTP emails in mailbox
1172async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1173    if let Some(ref smtp_registry) = state.smtp_registry {
1174        match smtp_registry.get_emails() {
1175            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1176            Err(e) => (
1177                StatusCode::INTERNAL_SERVER_ERROR,
1178                Json(serde_json::json!({
1179                    "error": "Failed to retrieve emails",
1180                    "message": e.to_string()
1181                })),
1182            ),
1183        }
1184    } else {
1185        (
1186            StatusCode::NOT_IMPLEMENTED,
1187            Json(serde_json::json!({
1188                "error": "SMTP mailbox management not available",
1189                "message": "SMTP server is not enabled or registry not available."
1190            })),
1191        )
1192    }
1193}
1194
1195/// Get specific SMTP email
1196#[cfg(feature = "smtp")]
1197async fn get_smtp_email(
1198    State(state): State<ManagementState>,
1199    Path(id): Path<String>,
1200) -> impl IntoResponse {
1201    if let Some(ref smtp_registry) = state.smtp_registry {
1202        match smtp_registry.get_email_by_id(&id) {
1203            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1204            Ok(None) => (
1205                StatusCode::NOT_FOUND,
1206                Json(serde_json::json!({
1207                    "error": "Email not found",
1208                    "id": id
1209                })),
1210            ),
1211            Err(e) => (
1212                StatusCode::INTERNAL_SERVER_ERROR,
1213                Json(serde_json::json!({
1214                    "error": "Failed to retrieve email",
1215                    "message": e.to_string()
1216                })),
1217            ),
1218        }
1219    } else {
1220        (
1221            StatusCode::NOT_IMPLEMENTED,
1222            Json(serde_json::json!({
1223                "error": "SMTP mailbox management not available",
1224                "message": "SMTP server is not enabled or registry not available."
1225            })),
1226        )
1227    }
1228}
1229
1230/// Clear SMTP mailbox
1231#[cfg(feature = "smtp")]
1232async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1233    if let Some(ref smtp_registry) = state.smtp_registry {
1234        match smtp_registry.clear_mailbox() {
1235            Ok(()) => (
1236                StatusCode::OK,
1237                Json(serde_json::json!({
1238                    "message": "Mailbox cleared successfully"
1239                })),
1240            ),
1241            Err(e) => (
1242                StatusCode::INTERNAL_SERVER_ERROR,
1243                Json(serde_json::json!({
1244                    "error": "Failed to clear mailbox",
1245                    "message": e.to_string()
1246                })),
1247            ),
1248        }
1249    } else {
1250        (
1251            StatusCode::NOT_IMPLEMENTED,
1252            Json(serde_json::json!({
1253                "error": "SMTP mailbox management not available",
1254                "message": "SMTP server is not enabled or registry not available."
1255            })),
1256        )
1257    }
1258}
1259
1260/// Export SMTP mailbox
1261#[cfg(feature = "smtp")]
1262async fn export_smtp_mailbox(
1263    Query(params): Query<std::collections::HashMap<String, String>>,
1264) -> impl IntoResponse {
1265    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1266    (
1267        StatusCode::NOT_IMPLEMENTED,
1268        Json(serde_json::json!({
1269            "error": "SMTP mailbox management not available via HTTP API",
1270            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1271            "requested_format": format
1272        })),
1273    )
1274}
1275
1276/// Search SMTP emails
1277#[cfg(feature = "smtp")]
1278async fn search_smtp_emails(
1279    State(state): State<ManagementState>,
1280    Query(params): Query<std::collections::HashMap<String, String>>,
1281) -> impl IntoResponse {
1282    if let Some(ref smtp_registry) = state.smtp_registry {
1283        let filters = EmailSearchFilters {
1284            sender: params.get("sender").cloned(),
1285            recipient: params.get("recipient").cloned(),
1286            subject: params.get("subject").cloned(),
1287            body: params.get("body").cloned(),
1288            since: params
1289                .get("since")
1290                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1291                .map(|dt| dt.with_timezone(&chrono::Utc)),
1292            until: params
1293                .get("until")
1294                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1295                .map(|dt| dt.with_timezone(&chrono::Utc)),
1296            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1297            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1298        };
1299
1300        match smtp_registry.search_emails(filters) {
1301            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1302            Err(e) => (
1303                StatusCode::INTERNAL_SERVER_ERROR,
1304                Json(serde_json::json!({
1305                    "error": "Failed to search emails",
1306                    "message": e.to_string()
1307                })),
1308            ),
1309        }
1310    } else {
1311        (
1312            StatusCode::NOT_IMPLEMENTED,
1313            Json(serde_json::json!({
1314                "error": "SMTP mailbox management not available",
1315                "message": "SMTP server is not enabled or registry not available."
1316            })),
1317        )
1318    }
1319}
1320
1321/// MQTT broker statistics
1322#[cfg(feature = "mqtt")]
1323#[derive(Debug, Clone, Serialize, Deserialize)]
1324pub struct MqttBrokerStats {
1325    /// Number of connected MQTT clients
1326    pub connected_clients: usize,
1327    /// Number of active MQTT topics
1328    pub active_topics: usize,
1329    /// Number of retained messages
1330    pub retained_messages: usize,
1331    /// Total number of subscriptions
1332    pub total_subscriptions: usize,
1333}
1334
1335/// MQTT management handlers
1336#[cfg(feature = "mqtt")]
1337async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1338    if let Some(broker) = &state.mqtt_broker {
1339        let connected_clients = broker.get_connected_clients().await.len();
1340        let active_topics = broker.get_active_topics().await.len();
1341        let stats = broker.get_topic_stats().await;
1342
1343        let broker_stats = MqttBrokerStats {
1344            connected_clients,
1345            active_topics,
1346            retained_messages: stats.retained_messages,
1347            total_subscriptions: stats.total_subscriptions,
1348        };
1349
1350        Json(broker_stats).into_response()
1351    } else {
1352        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1353    }
1354}
1355
1356#[cfg(feature = "mqtt")]
1357async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1358    if let Some(broker) = &state.mqtt_broker {
1359        let clients = broker.get_connected_clients().await;
1360        Json(serde_json::json!({
1361            "clients": clients
1362        }))
1363        .into_response()
1364    } else {
1365        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1366    }
1367}
1368
1369#[cfg(feature = "mqtt")]
1370async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1371    if let Some(broker) = &state.mqtt_broker {
1372        let topics = broker.get_active_topics().await;
1373        Json(serde_json::json!({
1374            "topics": topics
1375        }))
1376        .into_response()
1377    } else {
1378        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1379    }
1380}
1381
1382#[cfg(feature = "mqtt")]
1383async fn disconnect_mqtt_client(
1384    State(state): State<ManagementState>,
1385    Path(client_id): Path<String>,
1386) -> impl IntoResponse {
1387    if let Some(broker) = &state.mqtt_broker {
1388        match broker.disconnect_client(&client_id).await {
1389            Ok(_) => {
1390                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1391            }
1392            Err(e) => {
1393                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1394                    .into_response()
1395            }
1396        }
1397    } else {
1398        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1399    }
1400}
1401
1402// ========== MQTT Publish Handler ==========
1403
1404#[cfg(feature = "mqtt")]
1405/// Request to publish a single MQTT message
1406#[derive(Debug, Deserialize)]
1407pub struct MqttPublishRequest {
1408    /// Topic to publish to
1409    pub topic: String,
1410    /// Message payload (string or JSON)
1411    pub payload: String,
1412    /// QoS level (0, 1, or 2)
1413    #[serde(default = "default_qos")]
1414    pub qos: u8,
1415    /// Whether to retain the message
1416    #[serde(default)]
1417    pub retain: bool,
1418}
1419
1420#[cfg(feature = "mqtt")]
1421fn default_qos() -> u8 {
1422    0
1423}
1424
1425#[cfg(feature = "mqtt")]
1426/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1427async fn publish_mqtt_message_handler(
1428    State(state): State<ManagementState>,
1429    Json(request): Json<serde_json::Value>,
1430) -> impl IntoResponse {
1431    // Extract fields from JSON manually
1432    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1433    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1434    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1435    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1436
1437    if topic.is_none() || payload.is_none() {
1438        return (
1439            StatusCode::BAD_REQUEST,
1440            Json(serde_json::json!({
1441                "error": "Invalid request",
1442                "message": "Missing required fields: topic and payload"
1443            })),
1444        );
1445    }
1446
1447    let topic = topic.unwrap();
1448    let payload = payload.unwrap();
1449
1450    if let Some(broker) = &state.mqtt_broker {
1451        // Validate QoS
1452        if qos > 2 {
1453            return (
1454                StatusCode::BAD_REQUEST,
1455                Json(serde_json::json!({
1456                    "error": "Invalid QoS",
1457                    "message": "QoS must be 0, 1, or 2"
1458                })),
1459            );
1460        }
1461
1462        // Convert payload to bytes
1463        let payload_bytes = payload.as_bytes().to_vec();
1464        let client_id = "mockforge-management-api".to_string();
1465
1466        let publish_result = broker
1467            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1468            .await
1469            .map_err(|e| format!("{}", e));
1470
1471        match publish_result {
1472            Ok(_) => {
1473                // Emit message event for real-time monitoring
1474                let event = MessageEvent::Mqtt(MqttMessageEvent {
1475                    topic: topic.clone(),
1476                    payload: payload.clone(),
1477                    qos,
1478                    retain,
1479                    timestamp: chrono::Utc::now().to_rfc3339(),
1480                });
1481                let _ = state.message_events.send(event);
1482
1483                (
1484                    StatusCode::OK,
1485                    Json(serde_json::json!({
1486                        "success": true,
1487                        "message": format!("Message published to topic '{}'", topic),
1488                        "topic": topic,
1489                        "qos": qos,
1490                        "retain": retain
1491                    })),
1492                )
1493            }
1494            Err(error_msg) => (
1495                StatusCode::INTERNAL_SERVER_ERROR,
1496                Json(serde_json::json!({
1497                    "error": "Failed to publish message",
1498                    "message": error_msg
1499                })),
1500            ),
1501        }
1502    } else {
1503        (
1504            StatusCode::SERVICE_UNAVAILABLE,
1505            Json(serde_json::json!({
1506                "error": "MQTT broker not available",
1507                "message": "MQTT broker is not enabled or not available."
1508            })),
1509        )
1510    }
1511}
1512
1513#[cfg(not(feature = "mqtt"))]
1514/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1515async fn publish_mqtt_message_handler(
1516    State(_state): State<ManagementState>,
1517    Json(_request): Json<serde_json::Value>,
1518) -> impl IntoResponse {
1519    (
1520        StatusCode::SERVICE_UNAVAILABLE,
1521        Json(serde_json::json!({
1522            "error": "MQTT feature not enabled",
1523            "message": "MQTT support is not compiled into this build"
1524        })),
1525    )
1526}
1527
1528#[cfg(feature = "mqtt")]
1529/// Request to publish multiple MQTT messages
1530#[derive(Debug, Deserialize)]
1531pub struct MqttBatchPublishRequest {
1532    /// List of messages to publish
1533    pub messages: Vec<MqttPublishRequest>,
1534    /// Delay between messages in milliseconds
1535    #[serde(default = "default_delay")]
1536    pub delay_ms: u64,
1537}
1538
1539#[cfg(feature = "mqtt")]
1540fn default_delay() -> u64 {
1541    100
1542}
1543
1544#[cfg(feature = "mqtt")]
1545/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1546async fn publish_mqtt_batch_handler(
1547    State(state): State<ManagementState>,
1548    Json(request): Json<serde_json::Value>,
1549) -> impl IntoResponse {
1550    // Extract fields from JSON manually
1551    let messages_json = request.get("messages").and_then(|v| v.as_array());
1552    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1553
1554    if messages_json.is_none() {
1555        return (
1556            StatusCode::BAD_REQUEST,
1557            Json(serde_json::json!({
1558                "error": "Invalid request",
1559                "message": "Missing required field: messages"
1560            })),
1561        );
1562    }
1563
1564    let messages_json = messages_json.unwrap();
1565
1566    if let Some(broker) = &state.mqtt_broker {
1567        if messages_json.is_empty() {
1568            return (
1569                StatusCode::BAD_REQUEST,
1570                Json(serde_json::json!({
1571                    "error": "Empty batch",
1572                    "message": "At least one message is required"
1573                })),
1574            );
1575        }
1576
1577        let mut results = Vec::new();
1578        let client_id = "mockforge-management-api".to_string();
1579
1580        for (index, msg_json) in messages_json.iter().enumerate() {
1581            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1582            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1583            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1584            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1585
1586            if topic.is_none() || payload.is_none() {
1587                results.push(serde_json::json!({
1588                    "index": index,
1589                    "success": false,
1590                    "error": "Missing required fields: topic and payload"
1591                }));
1592                continue;
1593            }
1594
1595            let topic = topic.unwrap();
1596            let payload = payload.unwrap();
1597
1598            // Validate QoS
1599            if qos > 2 {
1600                results.push(serde_json::json!({
1601                    "index": index,
1602                    "success": false,
1603                    "error": "Invalid QoS (must be 0, 1, or 2)"
1604                }));
1605                continue;
1606            }
1607
1608            // Convert payload to bytes
1609            let payload_bytes = payload.as_bytes().to_vec();
1610
1611            let publish_result = broker
1612                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1613                .await
1614                .map_err(|e| format!("{}", e));
1615
1616            match publish_result {
1617                Ok(_) => {
1618                    // Emit message event
1619                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1620                        topic: topic.clone(),
1621                        payload: payload.clone(),
1622                        qos,
1623                        retain,
1624                        timestamp: chrono::Utc::now().to_rfc3339(),
1625                    });
1626                    let _ = state.message_events.send(event);
1627
1628                    results.push(serde_json::json!({
1629                        "index": index,
1630                        "success": true,
1631                        "topic": topic,
1632                        "qos": qos
1633                    }));
1634                }
1635                Err(error_msg) => {
1636                    results.push(serde_json::json!({
1637                        "index": index,
1638                        "success": false,
1639                        "error": error_msg
1640                    }));
1641                }
1642            }
1643
1644            // Add delay between messages (except for the last one)
1645            if index < messages_json.len() - 1 && delay_ms > 0 {
1646                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1647            }
1648        }
1649
1650        let success_count =
1651            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1652
1653        (
1654            StatusCode::OK,
1655            Json(serde_json::json!({
1656                "success": true,
1657                "total": messages_json.len(),
1658                "succeeded": success_count,
1659                "failed": messages_json.len() - success_count,
1660                "results": results
1661            })),
1662        )
1663    } else {
1664        (
1665            StatusCode::SERVICE_UNAVAILABLE,
1666            Json(serde_json::json!({
1667                "error": "MQTT broker not available",
1668                "message": "MQTT broker is not enabled or not available."
1669            })),
1670        )
1671    }
1672}
1673
1674#[cfg(not(feature = "mqtt"))]
1675/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1676async fn publish_mqtt_batch_handler(
1677    State(_state): State<ManagementState>,
1678    Json(_request): Json<serde_json::Value>,
1679) -> impl IntoResponse {
1680    (
1681        StatusCode::SERVICE_UNAVAILABLE,
1682        Json(serde_json::json!({
1683            "error": "MQTT feature not enabled",
1684            "message": "MQTT support is not compiled into this build"
1685        })),
1686    )
1687}
1688
1689// Migration pipeline handlers
1690
1691/// Request to set migration mode
1692#[derive(Debug, Deserialize)]
1693struct SetMigrationModeRequest {
1694    mode: String,
1695}
1696
1697/// Get all migration routes
1698async fn get_migration_routes(
1699    State(state): State<ManagementState>,
1700) -> Result<Json<serde_json::Value>, StatusCode> {
1701    let proxy_config = match &state.proxy_config {
1702        Some(config) => config,
1703        None => {
1704            return Ok(Json(serde_json::json!({
1705                "error": "Migration not configured. Proxy config not available."
1706            })));
1707        }
1708    };
1709
1710    let config = proxy_config.read().await;
1711    let routes = config.get_migration_routes();
1712
1713    Ok(Json(serde_json::json!({
1714        "routes": routes
1715    })))
1716}
1717
1718/// Toggle a route's migration mode
1719async fn toggle_route_migration(
1720    State(state): State<ManagementState>,
1721    Path(pattern): Path<String>,
1722) -> Result<Json<serde_json::Value>, StatusCode> {
1723    let proxy_config = match &state.proxy_config {
1724        Some(config) => config,
1725        None => {
1726            return Ok(Json(serde_json::json!({
1727                "error": "Migration not configured. Proxy config not available."
1728            })));
1729        }
1730    };
1731
1732    let mut config = proxy_config.write().await;
1733    let new_mode = match config.toggle_route_migration(&pattern) {
1734        Some(mode) => mode,
1735        None => {
1736            return Ok(Json(serde_json::json!({
1737                "error": format!("Route pattern not found: {}", pattern)
1738            })));
1739        }
1740    };
1741
1742    Ok(Json(serde_json::json!({
1743        "pattern": pattern,
1744        "mode": format!("{:?}", new_mode).to_lowercase()
1745    })))
1746}
1747
1748/// Set a route's migration mode explicitly
1749async fn set_route_migration_mode(
1750    State(state): State<ManagementState>,
1751    Path(pattern): Path<String>,
1752    Json(request): Json<SetMigrationModeRequest>,
1753) -> Result<Json<serde_json::Value>, StatusCode> {
1754    let proxy_config = match &state.proxy_config {
1755        Some(config) => config,
1756        None => {
1757            return Ok(Json(serde_json::json!({
1758                "error": "Migration not configured. Proxy config not available."
1759            })));
1760        }
1761    };
1762
1763    use mockforge_core::proxy::config::MigrationMode;
1764    let mode = match request.mode.to_lowercase().as_str() {
1765        "mock" => MigrationMode::Mock,
1766        "shadow" => MigrationMode::Shadow,
1767        "real" => MigrationMode::Real,
1768        "auto" => MigrationMode::Auto,
1769        _ => {
1770            return Ok(Json(serde_json::json!({
1771                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1772            })));
1773        }
1774    };
1775
1776    let mut config = proxy_config.write().await;
1777    let updated = config.update_rule_migration_mode(&pattern, mode);
1778
1779    if !updated {
1780        return Ok(Json(serde_json::json!({
1781            "error": format!("Route pattern not found: {}", pattern)
1782        })));
1783    }
1784
1785    Ok(Json(serde_json::json!({
1786        "pattern": pattern,
1787        "mode": format!("{:?}", mode).to_lowercase()
1788    })))
1789}
1790
1791/// Toggle a group's migration mode
1792async fn toggle_group_migration(
1793    State(state): State<ManagementState>,
1794    Path(group): Path<String>,
1795) -> Result<Json<serde_json::Value>, StatusCode> {
1796    let proxy_config = match &state.proxy_config {
1797        Some(config) => config,
1798        None => {
1799            return Ok(Json(serde_json::json!({
1800                "error": "Migration not configured. Proxy config not available."
1801            })));
1802        }
1803    };
1804
1805    let mut config = proxy_config.write().await;
1806    let new_mode = config.toggle_group_migration(&group);
1807
1808    Ok(Json(serde_json::json!({
1809        "group": group,
1810        "mode": format!("{:?}", new_mode).to_lowercase()
1811    })))
1812}
1813
1814/// Set a group's migration mode explicitly
1815async fn set_group_migration_mode(
1816    State(state): State<ManagementState>,
1817    Path(group): Path<String>,
1818    Json(request): Json<SetMigrationModeRequest>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820    let proxy_config = match &state.proxy_config {
1821        Some(config) => config,
1822        None => {
1823            return Ok(Json(serde_json::json!({
1824                "error": "Migration not configured. Proxy config not available."
1825            })));
1826        }
1827    };
1828
1829    use mockforge_core::proxy::config::MigrationMode;
1830    let mode = match request.mode.to_lowercase().as_str() {
1831        "mock" => MigrationMode::Mock,
1832        "shadow" => MigrationMode::Shadow,
1833        "real" => MigrationMode::Real,
1834        "auto" => MigrationMode::Auto,
1835        _ => {
1836            return Ok(Json(serde_json::json!({
1837                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1838            })));
1839        }
1840    };
1841
1842    let mut config = proxy_config.write().await;
1843    config.update_group_migration_mode(&group, mode);
1844
1845    Ok(Json(serde_json::json!({
1846        "group": group,
1847        "mode": format!("{:?}", mode).to_lowercase()
1848    })))
1849}
1850
1851/// Get all migration groups
1852async fn get_migration_groups(
1853    State(state): State<ManagementState>,
1854) -> Result<Json<serde_json::Value>, StatusCode> {
1855    let proxy_config = match &state.proxy_config {
1856        Some(config) => config,
1857        None => {
1858            return Ok(Json(serde_json::json!({
1859                "error": "Migration not configured. Proxy config not available."
1860            })));
1861        }
1862    };
1863
1864    let config = proxy_config.read().await;
1865    let groups = config.get_migration_groups();
1866
1867    // Convert to JSON-serializable format
1868    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1869        .into_iter()
1870        .map(|(name, info)| {
1871            (
1872                name,
1873                serde_json::json!({
1874                    "name": info.name,
1875                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1876                    "route_count": info.route_count
1877                }),
1878            )
1879        })
1880        .collect();
1881
1882    Ok(Json(serde_json::json!(groups_json)))
1883}
1884
1885/// Get overall migration status
1886async fn get_migration_status(
1887    State(state): State<ManagementState>,
1888) -> Result<Json<serde_json::Value>, StatusCode> {
1889    let proxy_config = match &state.proxy_config {
1890        Some(config) => config,
1891        None => {
1892            return Ok(Json(serde_json::json!({
1893                "error": "Migration not configured. Proxy config not available."
1894            })));
1895        }
1896    };
1897
1898    let config = proxy_config.read().await;
1899    let routes = config.get_migration_routes();
1900    let groups = config.get_migration_groups();
1901
1902    let mut mock_count = 0;
1903    let mut shadow_count = 0;
1904    let mut real_count = 0;
1905    let mut auto_count = 0;
1906
1907    for route in &routes {
1908        match route.migration_mode {
1909            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1910            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1911            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1912            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1913        }
1914    }
1915
1916    Ok(Json(serde_json::json!({
1917        "total_routes": routes.len(),
1918        "mock_routes": mock_count,
1919        "shadow_routes": shadow_count,
1920        "real_routes": real_count,
1921        "auto_routes": auto_count,
1922        "total_groups": groups.len(),
1923        "migration_enabled": config.migration_enabled
1924    })))
1925}
1926
1927// ========== Proxy Replacement Rules Management ==========
1928
1929/// Request body for creating/updating proxy replacement rules
1930#[derive(Debug, Deserialize, Serialize)]
1931pub struct ProxyRuleRequest {
1932    /// URL pattern to match (supports wildcards like "/api/users/*")
1933    pub pattern: String,
1934    /// Rule type: "request" or "response"
1935    #[serde(rename = "type")]
1936    pub rule_type: String,
1937    /// Optional status code filter for response rules
1938    #[serde(default)]
1939    pub status_codes: Vec<u16>,
1940    /// Body transformations to apply
1941    pub body_transforms: Vec<BodyTransformRequest>,
1942    /// Whether this rule is enabled
1943    #[serde(default = "default_true")]
1944    pub enabled: bool,
1945}
1946
1947/// Request body for individual body transformations
1948#[derive(Debug, Deserialize, Serialize)]
1949pub struct BodyTransformRequest {
1950    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1951    pub path: String,
1952    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1953    pub replace: String,
1954    /// Operation to perform: "replace", "add", or "remove"
1955    #[serde(default)]
1956    pub operation: String,
1957}
1958
1959/// Response format for proxy rules
1960#[derive(Debug, Serialize)]
1961pub struct ProxyRuleResponse {
1962    /// Rule ID (index in the array)
1963    pub id: usize,
1964    /// URL pattern
1965    pub pattern: String,
1966    /// Rule type
1967    #[serde(rename = "type")]
1968    pub rule_type: String,
1969    /// Status codes (for response rules)
1970    pub status_codes: Vec<u16>,
1971    /// Body transformations
1972    pub body_transforms: Vec<BodyTransformRequest>,
1973    /// Whether enabled
1974    pub enabled: bool,
1975}
1976
1977/// List all proxy replacement rules
1978async fn list_proxy_rules(
1979    State(state): State<ManagementState>,
1980) -> Result<Json<serde_json::Value>, StatusCode> {
1981    let proxy_config = match &state.proxy_config {
1982        Some(config) => config,
1983        None => {
1984            return Ok(Json(serde_json::json!({
1985                "error": "Proxy not configured. Proxy config not available."
1986            })));
1987        }
1988    };
1989
1990    let config = proxy_config.read().await;
1991
1992    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1993
1994    // Add request replacement rules
1995    for (idx, rule) in config.request_replacements.iter().enumerate() {
1996        rules.push(ProxyRuleResponse {
1997            id: idx,
1998            pattern: rule.pattern.clone(),
1999            rule_type: "request".to_string(),
2000            status_codes: Vec::new(),
2001            body_transforms: rule
2002                .body_transforms
2003                .iter()
2004                .map(|t| BodyTransformRequest {
2005                    path: t.path.clone(),
2006                    replace: t.replace.clone(),
2007                    operation: format!("{:?}", t.operation).to_lowercase(),
2008                })
2009                .collect(),
2010            enabled: rule.enabled,
2011        });
2012    }
2013
2014    // Add response replacement rules
2015    let request_count = config.request_replacements.len();
2016    for (idx, rule) in config.response_replacements.iter().enumerate() {
2017        rules.push(ProxyRuleResponse {
2018            id: request_count + idx,
2019            pattern: rule.pattern.clone(),
2020            rule_type: "response".to_string(),
2021            status_codes: rule.status_codes.clone(),
2022            body_transforms: rule
2023                .body_transforms
2024                .iter()
2025                .map(|t| BodyTransformRequest {
2026                    path: t.path.clone(),
2027                    replace: t.replace.clone(),
2028                    operation: format!("{:?}", t.operation).to_lowercase(),
2029                })
2030                .collect(),
2031            enabled: rule.enabled,
2032        });
2033    }
2034
2035    Ok(Json(serde_json::json!({
2036        "rules": rules
2037    })))
2038}
2039
2040/// Create a new proxy replacement rule
2041async fn create_proxy_rule(
2042    State(state): State<ManagementState>,
2043    Json(request): Json<ProxyRuleRequest>,
2044) -> Result<Json<serde_json::Value>, StatusCode> {
2045    let proxy_config = match &state.proxy_config {
2046        Some(config) => config,
2047        None => {
2048            return Ok(Json(serde_json::json!({
2049                "error": "Proxy not configured. Proxy config not available."
2050            })));
2051        }
2052    };
2053
2054    // Validate request
2055    if request.body_transforms.is_empty() {
2056        return Ok(Json(serde_json::json!({
2057            "error": "At least one body transform is required"
2058        })));
2059    }
2060
2061    let body_transforms: Vec<BodyTransform> = request
2062        .body_transforms
2063        .iter()
2064        .map(|t| {
2065            let op = match t.operation.as_str() {
2066                "replace" => TransformOperation::Replace,
2067                "add" => TransformOperation::Add,
2068                "remove" => TransformOperation::Remove,
2069                _ => TransformOperation::Replace,
2070            };
2071            BodyTransform {
2072                path: t.path.clone(),
2073                replace: t.replace.clone(),
2074                operation: op,
2075            }
2076        })
2077        .collect();
2078
2079    let new_rule = BodyTransformRule {
2080        pattern: request.pattern.clone(),
2081        status_codes: request.status_codes.clone(),
2082        body_transforms,
2083        enabled: request.enabled,
2084    };
2085
2086    let mut config = proxy_config.write().await;
2087
2088    let rule_id = if request.rule_type == "request" {
2089        config.request_replacements.push(new_rule);
2090        config.request_replacements.len() - 1
2091    } else if request.rule_type == "response" {
2092        config.response_replacements.push(new_rule);
2093        config.request_replacements.len() + config.response_replacements.len() - 1
2094    } else {
2095        return Ok(Json(serde_json::json!({
2096            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2097        })));
2098    };
2099
2100    Ok(Json(serde_json::json!({
2101        "id": rule_id,
2102        "message": "Rule created successfully"
2103    })))
2104}
2105
2106/// Get a specific proxy replacement rule
2107async fn get_proxy_rule(
2108    State(state): State<ManagementState>,
2109    Path(id): Path<String>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111    let proxy_config = match &state.proxy_config {
2112        Some(config) => config,
2113        None => {
2114            return Ok(Json(serde_json::json!({
2115                "error": "Proxy not configured. Proxy config not available."
2116            })));
2117        }
2118    };
2119
2120    let config = proxy_config.read().await;
2121    let rule_id: usize = match id.parse() {
2122        Ok(id) => id,
2123        Err(_) => {
2124            return Ok(Json(serde_json::json!({
2125                "error": format!("Invalid rule ID: {}", id)
2126            })));
2127        }
2128    };
2129
2130    let request_count = config.request_replacements.len();
2131
2132    if rule_id < request_count {
2133        // Request rule
2134        let rule = &config.request_replacements[rule_id];
2135        Ok(Json(serde_json::json!({
2136            "id": rule_id,
2137            "pattern": rule.pattern,
2138            "type": "request",
2139            "status_codes": [],
2140            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2141                "path": t.path,
2142                "replace": t.replace,
2143                "operation": format!("{:?}", t.operation).to_lowercase()
2144            })).collect::<Vec<_>>(),
2145            "enabled": rule.enabled
2146        })))
2147    } else if rule_id < request_count + config.response_replacements.len() {
2148        // Response rule
2149        let response_idx = rule_id - request_count;
2150        let rule = &config.response_replacements[response_idx];
2151        Ok(Json(serde_json::json!({
2152            "id": rule_id,
2153            "pattern": rule.pattern,
2154            "type": "response",
2155            "status_codes": rule.status_codes,
2156            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2157                "path": t.path,
2158                "replace": t.replace,
2159                "operation": format!("{:?}", t.operation).to_lowercase()
2160            })).collect::<Vec<_>>(),
2161            "enabled": rule.enabled
2162        })))
2163    } else {
2164        Ok(Json(serde_json::json!({
2165            "error": format!("Rule ID {} not found", rule_id)
2166        })))
2167    }
2168}
2169
2170/// Update a proxy replacement rule
2171async fn update_proxy_rule(
2172    State(state): State<ManagementState>,
2173    Path(id): Path<String>,
2174    Json(request): Json<ProxyRuleRequest>,
2175) -> Result<Json<serde_json::Value>, StatusCode> {
2176    let proxy_config = match &state.proxy_config {
2177        Some(config) => config,
2178        None => {
2179            return Ok(Json(serde_json::json!({
2180                "error": "Proxy not configured. Proxy config not available."
2181            })));
2182        }
2183    };
2184
2185    let mut config = proxy_config.write().await;
2186    let rule_id: usize = match id.parse() {
2187        Ok(id) => id,
2188        Err(_) => {
2189            return Ok(Json(serde_json::json!({
2190                "error": format!("Invalid rule ID: {}", id)
2191            })));
2192        }
2193    };
2194
2195    let body_transforms: Vec<BodyTransform> = request
2196        .body_transforms
2197        .iter()
2198        .map(|t| {
2199            let op = match t.operation.as_str() {
2200                "replace" => TransformOperation::Replace,
2201                "add" => TransformOperation::Add,
2202                "remove" => TransformOperation::Remove,
2203                _ => TransformOperation::Replace,
2204            };
2205            BodyTransform {
2206                path: t.path.clone(),
2207                replace: t.replace.clone(),
2208                operation: op,
2209            }
2210        })
2211        .collect();
2212
2213    let updated_rule = BodyTransformRule {
2214        pattern: request.pattern.clone(),
2215        status_codes: request.status_codes.clone(),
2216        body_transforms,
2217        enabled: request.enabled,
2218    };
2219
2220    let request_count = config.request_replacements.len();
2221
2222    if rule_id < request_count {
2223        // Update request rule
2224        config.request_replacements[rule_id] = updated_rule;
2225    } else if rule_id < request_count + config.response_replacements.len() {
2226        // Update response rule
2227        let response_idx = rule_id - request_count;
2228        config.response_replacements[response_idx] = updated_rule;
2229    } else {
2230        return Ok(Json(serde_json::json!({
2231            "error": format!("Rule ID {} not found", rule_id)
2232        })));
2233    }
2234
2235    Ok(Json(serde_json::json!({
2236        "id": rule_id,
2237        "message": "Rule updated successfully"
2238    })))
2239}
2240
2241/// Delete a proxy replacement rule
2242async fn delete_proxy_rule(
2243    State(state): State<ManagementState>,
2244    Path(id): Path<String>,
2245) -> Result<Json<serde_json::Value>, StatusCode> {
2246    let proxy_config = match &state.proxy_config {
2247        Some(config) => config,
2248        None => {
2249            return Ok(Json(serde_json::json!({
2250                "error": "Proxy not configured. Proxy config not available."
2251            })));
2252        }
2253    };
2254
2255    let mut config = proxy_config.write().await;
2256    let rule_id: usize = match id.parse() {
2257        Ok(id) => id,
2258        Err(_) => {
2259            return Ok(Json(serde_json::json!({
2260                "error": format!("Invalid rule ID: {}", id)
2261            })));
2262        }
2263    };
2264
2265    let request_count = config.request_replacements.len();
2266
2267    if rule_id < request_count {
2268        // Delete request rule
2269        config.request_replacements.remove(rule_id);
2270    } else if rule_id < request_count + config.response_replacements.len() {
2271        // Delete response rule
2272        let response_idx = rule_id - request_count;
2273        config.response_replacements.remove(response_idx);
2274    } else {
2275        return Ok(Json(serde_json::json!({
2276            "error": format!("Rule ID {} not found", rule_id)
2277        })));
2278    }
2279
2280    Ok(Json(serde_json::json!({
2281        "id": rule_id,
2282        "message": "Rule deleted successfully"
2283    })))
2284}
2285
2286/// Get proxy rules and transformation configuration for inspection
2287async fn get_proxy_inspect(
2288    State(state): State<ManagementState>,
2289    Query(params): Query<std::collections::HashMap<String, String>>,
2290) -> Result<Json<serde_json::Value>, StatusCode> {
2291    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2292    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2293
2294    let proxy_config = match &state.proxy_config {
2295        Some(config) => config.read().await,
2296        None => {
2297            return Ok(Json(serde_json::json!({
2298                "error": "Proxy not configured. Proxy config not available."
2299            })));
2300        }
2301    };
2302
2303    let mut rules = Vec::new();
2304    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2305        rules.push(serde_json::json!({
2306            "id": idx,
2307            "kind": "request",
2308            "pattern": rule.pattern,
2309            "enabled": rule.enabled,
2310            "status_codes": rule.status_codes,
2311            "transform_count": rule.body_transforms.len(),
2312            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2313                "path": t.path,
2314                "operation": t.operation,
2315                "replace": t.replace
2316            })).collect::<Vec<_>>()
2317        }));
2318    }
2319    let request_rule_count = rules.len();
2320    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2321        rules.push(serde_json::json!({
2322            "id": request_rule_count + idx,
2323            "kind": "response",
2324            "pattern": rule.pattern,
2325            "enabled": rule.enabled,
2326            "status_codes": rule.status_codes,
2327            "transform_count": rule.body_transforms.len(),
2328            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2329                "path": t.path,
2330                "operation": t.operation,
2331                "replace": t.replace
2332            })).collect::<Vec<_>>()
2333        }));
2334    }
2335
2336    let total = rules.len();
2337    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2338
2339    Ok(Json(serde_json::json!({
2340        "enabled": proxy_config.enabled,
2341        "target_url": proxy_config.target_url,
2342        "prefix": proxy_config.prefix,
2343        "timeout_seconds": proxy_config.timeout_seconds,
2344        "follow_redirects": proxy_config.follow_redirects,
2345        "passthrough_by_default": proxy_config.passthrough_by_default,
2346        "rules": paged_rules,
2347        "request_rule_count": request_rule_count,
2348        "response_rule_count": total.saturating_sub(request_rule_count),
2349        "limit": limit,
2350        "offset": offset,
2351        "total": total
2352    })))
2353}
2354
2355/// Build the management API router
2356pub fn management_router(state: ManagementState) -> Router {
2357    let router = Router::new()
2358        .route("/health", get(health_check))
2359        .route("/stats", get(get_stats))
2360        .route("/config", get(get_config))
2361        .route("/config/validate", post(validate_config))
2362        .route("/config/bulk", post(bulk_update_config))
2363        .route("/mocks", get(list_mocks))
2364        .route("/mocks", post(create_mock))
2365        .route("/mocks/{id}", get(get_mock))
2366        .route("/mocks/{id}", put(update_mock))
2367        .route("/mocks/{id}", delete(delete_mock))
2368        .route("/export", get(export_mocks))
2369        .route("/import", post(import_mocks));
2370
2371    #[cfg(feature = "smtp")]
2372    let router = router
2373        .route("/smtp/mailbox", get(list_smtp_emails))
2374        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2375        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2376        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2377        .route("/smtp/mailbox/search", get(search_smtp_emails));
2378
2379    #[cfg(not(feature = "smtp"))]
2380    let router = router;
2381
2382    // MQTT routes
2383    #[cfg(feature = "mqtt")]
2384    let router = router
2385        .route("/mqtt/stats", get(get_mqtt_stats))
2386        .route("/mqtt/clients", get(get_mqtt_clients))
2387        .route("/mqtt/topics", get(get_mqtt_topics))
2388        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2389        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2390        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2391        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2392
2393    #[cfg(not(feature = "mqtt"))]
2394    let router = router
2395        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2396        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2397
2398    #[cfg(feature = "kafka")]
2399    let router = router
2400        .route("/kafka/stats", get(get_kafka_stats))
2401        .route("/kafka/topics", get(get_kafka_topics))
2402        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2403        .route("/kafka/groups", get(get_kafka_groups))
2404        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2405        .route("/kafka/produce", post(produce_kafka_message))
2406        .route("/kafka/produce/batch", post(produce_kafka_batch))
2407        .route("/kafka/messages/stream", get(kafka_messages_stream));
2408
2409    #[cfg(not(feature = "kafka"))]
2410    let router = router;
2411
2412    // Migration pipeline routes
2413    let router = router
2414        .route("/migration/routes", get(get_migration_routes))
2415        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2416        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2417        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2418        .route("/migration/groups/{group}", put(set_group_migration_mode))
2419        .route("/migration/groups", get(get_migration_groups))
2420        .route("/migration/status", get(get_migration_status));
2421
2422    // Proxy replacement rules routes
2423    let router = router
2424        .route("/proxy/rules", get(list_proxy_rules))
2425        .route("/proxy/rules", post(create_proxy_rule))
2426        .route("/proxy/rules/{id}", get(get_proxy_rule))
2427        .route("/proxy/rules/{id}", put(update_proxy_rule))
2428        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2429        .route("/proxy/inspect", get(get_proxy_inspect));
2430
2431    // AI-powered features
2432    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2433
2434    // Snapshot diff endpoints
2435    let router = router.nest(
2436        "/snapshot-diff",
2437        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2438    );
2439
2440    #[cfg(feature = "behavioral-cloning")]
2441    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2442
2443    let router = router
2444        .route("/mockai/learn", post(learn_from_examples))
2445        .route("/mockai/rules/explanations", get(list_rule_explanations))
2446        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2447        .route("/chaos/config", get(get_chaos_config))
2448        .route("/chaos/config", post(update_chaos_config))
2449        .route("/network/profiles", get(list_network_profiles))
2450        .route("/network/profile/apply", post(apply_network_profile));
2451
2452    // State machine API routes
2453    let router =
2454        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2455
2456    router.with_state(state)
2457}
2458
2459#[cfg(feature = "kafka")]
2460#[derive(Debug, Clone, Serialize, Deserialize)]
2461pub struct KafkaBrokerStats {
2462    /// Number of topics
2463    pub topics: usize,
2464    /// Total number of partitions
2465    pub partitions: usize,
2466    /// Number of consumer groups
2467    pub consumer_groups: usize,
2468    /// Total messages produced
2469    pub messages_produced: u64,
2470    /// Total messages consumed
2471    pub messages_consumed: u64,
2472}
2473
2474#[cfg(feature = "kafka")]
2475#[derive(Debug, Clone, Serialize, Deserialize)]
2476pub struct KafkaTopicInfo {
2477    pub name: String,
2478    pub partitions: usize,
2479    pub replication_factor: i32,
2480}
2481
2482#[cfg(feature = "kafka")]
2483#[derive(Debug, Clone, Serialize, Deserialize)]
2484pub struct KafkaConsumerGroupInfo {
2485    pub group_id: String,
2486    pub members: usize,
2487    pub state: String,
2488}
2489
2490#[cfg(feature = "kafka")]
2491/// Get Kafka broker statistics
2492async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2493    if let Some(broker) = &state.kafka_broker {
2494        let topics = broker.topics.read().await;
2495        let consumer_groups = broker.consumer_groups.read().await;
2496
2497        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2498
2499        // Get metrics snapshot for message counts
2500        let metrics_snapshot = broker.metrics().snapshot();
2501
2502        let stats = KafkaBrokerStats {
2503            topics: topics.len(),
2504            partitions: total_partitions,
2505            consumer_groups: consumer_groups.groups().len(),
2506            messages_produced: metrics_snapshot.messages_produced_total,
2507            messages_consumed: metrics_snapshot.messages_consumed_total,
2508        };
2509
2510        Json(stats).into_response()
2511    } else {
2512        (
2513            StatusCode::SERVICE_UNAVAILABLE,
2514            Json(serde_json::json!({
2515                "error": "Kafka broker not available",
2516                "message": "Kafka broker is not enabled or not available."
2517            })),
2518        )
2519            .into_response()
2520    }
2521}
2522
2523#[cfg(feature = "kafka")]
2524/// List Kafka topics
2525async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2526    if let Some(broker) = &state.kafka_broker {
2527        let topics = broker.topics.read().await;
2528        let topic_list: Vec<KafkaTopicInfo> = topics
2529            .iter()
2530            .map(|(name, topic)| KafkaTopicInfo {
2531                name: name.clone(),
2532                partitions: topic.partitions.len(),
2533                replication_factor: topic.config.replication_factor as i32,
2534            })
2535            .collect();
2536
2537        Json(serde_json::json!({
2538            "topics": topic_list
2539        }))
2540        .into_response()
2541    } else {
2542        (
2543            StatusCode::SERVICE_UNAVAILABLE,
2544            Json(serde_json::json!({
2545                "error": "Kafka broker not available",
2546                "message": "Kafka broker is not enabled or not available."
2547            })),
2548        )
2549            .into_response()
2550    }
2551}
2552
2553#[cfg(feature = "kafka")]
2554/// Get Kafka topic details
2555async fn get_kafka_topic(
2556    State(state): State<ManagementState>,
2557    Path(topic_name): Path<String>,
2558) -> impl IntoResponse {
2559    if let Some(broker) = &state.kafka_broker {
2560        let topics = broker.topics.read().await;
2561        if let Some(topic) = topics.get(&topic_name) {
2562            Json(serde_json::json!({
2563                "name": topic_name,
2564                "partitions": topic.partitions.len(),
2565                "replication_factor": topic.config.replication_factor,
2566                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2567                    "id": idx as i32,
2568                    "leader": 0,
2569                    "replicas": vec![0],
2570                    "message_count": partition.messages.len()
2571                })).collect::<Vec<_>>()
2572            })).into_response()
2573        } else {
2574            (
2575                StatusCode::NOT_FOUND,
2576                Json(serde_json::json!({
2577                    "error": "Topic not found",
2578                    "topic": topic_name
2579                })),
2580            )
2581                .into_response()
2582        }
2583    } else {
2584        (
2585            StatusCode::SERVICE_UNAVAILABLE,
2586            Json(serde_json::json!({
2587                "error": "Kafka broker not available",
2588                "message": "Kafka broker is not enabled or not available."
2589            })),
2590        )
2591            .into_response()
2592    }
2593}
2594
2595#[cfg(feature = "kafka")]
2596/// List Kafka consumer groups
2597async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2598    if let Some(broker) = &state.kafka_broker {
2599        let consumer_groups = broker.consumer_groups.read().await;
2600        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2601            .groups()
2602            .iter()
2603            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2604                group_id: group_id.clone(),
2605                members: group.members.len(),
2606                state: "Stable".to_string(), // Simplified - could be more detailed
2607            })
2608            .collect();
2609
2610        Json(serde_json::json!({
2611            "groups": groups
2612        }))
2613        .into_response()
2614    } else {
2615        (
2616            StatusCode::SERVICE_UNAVAILABLE,
2617            Json(serde_json::json!({
2618                "error": "Kafka broker not available",
2619                "message": "Kafka broker is not enabled or not available."
2620            })),
2621        )
2622            .into_response()
2623    }
2624}
2625
2626#[cfg(feature = "kafka")]
2627/// Get Kafka consumer group details
2628async fn get_kafka_group(
2629    State(state): State<ManagementState>,
2630    Path(group_id): Path<String>,
2631) -> impl IntoResponse {
2632    if let Some(broker) = &state.kafka_broker {
2633        let consumer_groups = broker.consumer_groups.read().await;
2634        if let Some(group) = consumer_groups.groups().get(&group_id) {
2635            Json(serde_json::json!({
2636                "group_id": group_id,
2637                "members": group.members.len(),
2638                "state": "Stable",
2639                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2640                    "member_id": member_id,
2641                    "client_id": member.client_id,
2642                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2643                        "topic": a.topic,
2644                        "partitions": a.partitions
2645                    })).collect::<Vec<_>>()
2646                })).collect::<Vec<_>>(),
2647                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2648                    "topic": topic,
2649                    "partition": partition,
2650                    "offset": offset
2651                })).collect::<Vec<_>>()
2652            })).into_response()
2653        } else {
2654            (
2655                StatusCode::NOT_FOUND,
2656                Json(serde_json::json!({
2657                    "error": "Consumer group not found",
2658                    "group_id": group_id
2659                })),
2660            )
2661                .into_response()
2662        }
2663    } else {
2664        (
2665            StatusCode::SERVICE_UNAVAILABLE,
2666            Json(serde_json::json!({
2667                "error": "Kafka broker not available",
2668                "message": "Kafka broker is not enabled or not available."
2669            })),
2670        )
2671            .into_response()
2672    }
2673}
2674
2675// ========== Kafka Produce Handler ==========
2676
2677#[cfg(feature = "kafka")]
2678#[derive(Debug, Deserialize)]
2679pub struct KafkaProduceRequest {
2680    /// Topic to produce to
2681    pub topic: String,
2682    /// Message key (optional)
2683    #[serde(default)]
2684    pub key: Option<String>,
2685    /// Message value (JSON string or plain string)
2686    pub value: String,
2687    /// Partition ID (optional, auto-assigned if not provided)
2688    #[serde(default)]
2689    pub partition: Option<i32>,
2690    /// Message headers (optional, key-value pairs)
2691    #[serde(default)]
2692    pub headers: Option<std::collections::HashMap<String, String>>,
2693}
2694
2695#[cfg(feature = "kafka")]
2696/// Produce a message to a Kafka topic
2697async fn produce_kafka_message(
2698    State(state): State<ManagementState>,
2699    Json(request): Json<KafkaProduceRequest>,
2700) -> impl IntoResponse {
2701    if let Some(broker) = &state.kafka_broker {
2702        let mut topics = broker.topics.write().await;
2703
2704        // Get or create the topic
2705        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2706            mockforge_kafka::topics::Topic::new(
2707                request.topic.clone(),
2708                mockforge_kafka::topics::TopicConfig::default(),
2709            )
2710        });
2711
2712        // Determine partition
2713        let partition_id = if let Some(partition) = request.partition {
2714            partition
2715        } else {
2716            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2717        };
2718
2719        // Validate partition exists
2720        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2721            return (
2722                StatusCode::BAD_REQUEST,
2723                Json(serde_json::json!({
2724                    "error": "Invalid partition",
2725                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2726                })),
2727            )
2728                .into_response();
2729        }
2730
2731        // Create the message
2732        let key_clone = request.key.clone();
2733        let headers_clone = request.headers.clone();
2734        let message = mockforge_kafka::partitions::KafkaMessage {
2735            offset: 0, // Will be set by partition.append
2736            timestamp: chrono::Utc::now().timestamp_millis(),
2737            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2738            value: request.value.as_bytes().to_vec(),
2739            headers: headers_clone
2740                .clone()
2741                .unwrap_or_default()
2742                .into_iter()
2743                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2744                .collect(),
2745        };
2746
2747        // Produce to partition
2748        match topic_entry.produce(partition_id, message).await {
2749            Ok(offset) => {
2750                // Record metrics for successful message production
2751                if let Some(broker) = &state.kafka_broker {
2752                    broker.metrics().record_messages_produced(1);
2753                }
2754
2755                // Emit message event for real-time monitoring
2756                #[cfg(feature = "kafka")]
2757                {
2758                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2759                        topic: request.topic.clone(),
2760                        key: key_clone,
2761                        value: request.value.clone(),
2762                        partition: partition_id,
2763                        offset,
2764                        headers: headers_clone,
2765                        timestamp: chrono::Utc::now().to_rfc3339(),
2766                    });
2767                    let _ = state.message_events.send(event);
2768                }
2769
2770                Json(serde_json::json!({
2771                    "success": true,
2772                    "message": format!("Message produced to topic '{}'", request.topic),
2773                    "topic": request.topic,
2774                    "partition": partition_id,
2775                    "offset": offset
2776                }))
2777                .into_response()
2778            }
2779            Err(e) => (
2780                StatusCode::INTERNAL_SERVER_ERROR,
2781                Json(serde_json::json!({
2782                    "error": "Failed to produce message",
2783                    "message": e.to_string()
2784                })),
2785            )
2786                .into_response(),
2787        }
2788    } else {
2789        (
2790            StatusCode::SERVICE_UNAVAILABLE,
2791            Json(serde_json::json!({
2792                "error": "Kafka broker not available",
2793                "message": "Kafka broker is not enabled or not available."
2794            })),
2795        )
2796            .into_response()
2797    }
2798}
2799
2800#[cfg(feature = "kafka")]
2801#[derive(Debug, Deserialize)]
2802pub struct KafkaBatchProduceRequest {
2803    /// List of messages to produce
2804    pub messages: Vec<KafkaProduceRequest>,
2805    /// Delay between messages in milliseconds
2806    #[serde(default = "default_delay")]
2807    pub delay_ms: u64,
2808}
2809
2810#[cfg(feature = "kafka")]
2811/// Produce multiple messages to Kafka topics
2812async fn produce_kafka_batch(
2813    State(state): State<ManagementState>,
2814    Json(request): Json<KafkaBatchProduceRequest>,
2815) -> impl IntoResponse {
2816    if let Some(broker) = &state.kafka_broker {
2817        if request.messages.is_empty() {
2818            return (
2819                StatusCode::BAD_REQUEST,
2820                Json(serde_json::json!({
2821                    "error": "Empty batch",
2822                    "message": "At least one message is required"
2823                })),
2824            )
2825                .into_response();
2826        }
2827
2828        let mut results = Vec::new();
2829
2830        for (index, msg_request) in request.messages.iter().enumerate() {
2831            let mut topics = broker.topics.write().await;
2832
2833            // Get or create the topic
2834            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2835                mockforge_kafka::topics::Topic::new(
2836                    msg_request.topic.clone(),
2837                    mockforge_kafka::topics::TopicConfig::default(),
2838                )
2839            });
2840
2841            // Determine partition
2842            let partition_id = if let Some(partition) = msg_request.partition {
2843                partition
2844            } else {
2845                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2846            };
2847
2848            // Validate partition exists
2849            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2850                results.push(serde_json::json!({
2851                    "index": index,
2852                    "success": false,
2853                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2854                }));
2855                continue;
2856            }
2857
2858            // Create the message
2859            let message = mockforge_kafka::partitions::KafkaMessage {
2860                offset: 0,
2861                timestamp: chrono::Utc::now().timestamp_millis(),
2862                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2863                value: msg_request.value.as_bytes().to_vec(),
2864                headers: msg_request
2865                    .headers
2866                    .clone()
2867                    .unwrap_or_default()
2868                    .into_iter()
2869                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2870                    .collect(),
2871            };
2872
2873            // Produce to partition
2874            match topic_entry.produce(partition_id, message).await {
2875                Ok(offset) => {
2876                    // Record metrics for successful message production
2877                    if let Some(broker) = &state.kafka_broker {
2878                        broker.metrics().record_messages_produced(1);
2879                    }
2880
2881                    // Emit message event
2882                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2883                        topic: msg_request.topic.clone(),
2884                        key: msg_request.key.clone(),
2885                        value: msg_request.value.clone(),
2886                        partition: partition_id,
2887                        offset,
2888                        headers: msg_request.headers.clone(),
2889                        timestamp: chrono::Utc::now().to_rfc3339(),
2890                    });
2891                    let _ = state.message_events.send(event);
2892
2893                    results.push(serde_json::json!({
2894                        "index": index,
2895                        "success": true,
2896                        "topic": msg_request.topic,
2897                        "partition": partition_id,
2898                        "offset": offset
2899                    }));
2900                }
2901                Err(e) => {
2902                    results.push(serde_json::json!({
2903                        "index": index,
2904                        "success": false,
2905                        "error": e.to_string()
2906                    }));
2907                }
2908            }
2909
2910            // Add delay between messages (except for the last one)
2911            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2912                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2913            }
2914        }
2915
2916        let success_count =
2917            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2918
2919        Json(serde_json::json!({
2920            "success": true,
2921            "total": request.messages.len(),
2922            "succeeded": success_count,
2923            "failed": request.messages.len() - success_count,
2924            "results": results
2925        }))
2926        .into_response()
2927    } else {
2928        (
2929            StatusCode::SERVICE_UNAVAILABLE,
2930            Json(serde_json::json!({
2931                "error": "Kafka broker not available",
2932                "message": "Kafka broker is not enabled or not available."
2933            })),
2934        )
2935            .into_response()
2936    }
2937}
2938
2939// ========== Real-time Message Streaming (SSE) ==========
2940
2941#[cfg(feature = "mqtt")]
2942/// SSE stream for MQTT messages
2943async fn mqtt_messages_stream(
2944    State(state): State<ManagementState>,
2945    Query(params): Query<std::collections::HashMap<String, String>>,
2946) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2947    let rx = state.message_events.subscribe();
2948    let topic_filter = params.get("topic").cloned();
2949
2950    let stream = stream::unfold(rx, move |mut rx| {
2951        let topic_filter = topic_filter.clone();
2952
2953        async move {
2954            loop {
2955                match rx.recv().await {
2956                    Ok(MessageEvent::Mqtt(event)) => {
2957                        // Apply topic filter if specified
2958                        if let Some(filter) = &topic_filter {
2959                            if !event.topic.contains(filter) {
2960                                continue;
2961                            }
2962                        }
2963
2964                        let event_json = serde_json::json!({
2965                            "protocol": "mqtt",
2966                            "topic": event.topic,
2967                            "payload": event.payload,
2968                            "qos": event.qos,
2969                            "retain": event.retain,
2970                            "timestamp": event.timestamp,
2971                        });
2972
2973                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2974                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2975                            return Some((Ok(sse_event), rx));
2976                        }
2977                    }
2978                    #[cfg(feature = "kafka")]
2979                    Ok(MessageEvent::Kafka(_)) => {
2980                        // Skip Kafka events in MQTT stream
2981                        continue;
2982                    }
2983                    Err(broadcast::error::RecvError::Closed) => {
2984                        return None;
2985                    }
2986                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2987                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2988                        continue;
2989                    }
2990                }
2991            }
2992        }
2993    });
2994
2995    Sse::new(stream).keep_alive(
2996        axum::response::sse::KeepAlive::new()
2997            .interval(std::time::Duration::from_secs(15))
2998            .text("keep-alive-text"),
2999    )
3000}
3001
3002#[cfg(feature = "kafka")]
3003/// SSE stream for Kafka messages
3004async fn kafka_messages_stream(
3005    State(state): State<ManagementState>,
3006    Query(params): Query<std::collections::HashMap<String, String>>,
3007) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3008    let mut rx = state.message_events.subscribe();
3009    let topic_filter = params.get("topic").cloned();
3010
3011    let stream = stream::unfold(rx, move |mut rx| {
3012        let topic_filter = topic_filter.clone();
3013
3014        async move {
3015            loop {
3016                match rx.recv().await {
3017                    #[cfg(feature = "mqtt")]
3018                    Ok(MessageEvent::Mqtt(_)) => {
3019                        // Skip MQTT events in Kafka stream
3020                        continue;
3021                    }
3022                    Ok(MessageEvent::Kafka(event)) => {
3023                        // Apply topic filter if specified
3024                        if let Some(filter) = &topic_filter {
3025                            if !event.topic.contains(filter) {
3026                                continue;
3027                            }
3028                        }
3029
3030                        let event_json = serde_json::json!({
3031                            "protocol": "kafka",
3032                            "topic": event.topic,
3033                            "key": event.key,
3034                            "value": event.value,
3035                            "partition": event.partition,
3036                            "offset": event.offset,
3037                            "headers": event.headers,
3038                            "timestamp": event.timestamp,
3039                        });
3040
3041                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3042                            let sse_event =
3043                                Event::default().event("kafka_message").data(event_data);
3044                            return Some((Ok(sse_event), rx));
3045                        }
3046                    }
3047                    Err(broadcast::error::RecvError::Closed) => {
3048                        return None;
3049                    }
3050                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3051                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3052                        continue;
3053                    }
3054                }
3055            }
3056        }
3057    });
3058
3059    Sse::new(stream).keep_alive(
3060        axum::response::sse::KeepAlive::new()
3061            .interval(std::time::Duration::from_secs(15))
3062            .text("keep-alive-text"),
3063    )
3064}
3065
3066// ========== AI-Powered Features ==========
3067
3068/// Request for AI-powered API specification generation
3069#[derive(Debug, Deserialize)]
3070pub struct GenerateSpecRequest {
3071    /// Natural language description of the API to generate
3072    pub query: String,
3073    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3074    pub spec_type: String,
3075    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3076    pub api_version: Option<String>,
3077}
3078
3079/// Request for OpenAPI generation from recorded traffic
3080#[derive(Debug, Deserialize)]
3081pub struct GenerateOpenApiFromTrafficRequest {
3082    /// Path to recorder database (optional, defaults to ./recordings.db)
3083    #[serde(default)]
3084    pub database_path: Option<String>,
3085    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3086    #[serde(default)]
3087    pub since: Option<String>,
3088    /// End time for filtering (ISO 8601 format)
3089    #[serde(default)]
3090    pub until: Option<String>,
3091    /// Path pattern filter (supports wildcards, e.g., /api/*)
3092    #[serde(default)]
3093    pub path_pattern: Option<String>,
3094    /// Minimum confidence score for including paths (0.0 to 1.0)
3095    #[serde(default = "default_min_confidence")]
3096    pub min_confidence: f64,
3097}
3098
3099fn default_min_confidence() -> f64 {
3100    0.7
3101}
3102
3103/// Generate API specification from natural language using AI
3104#[cfg(feature = "data-faker")]
3105async fn generate_ai_spec(
3106    State(_state): State<ManagementState>,
3107    Json(request): Json<GenerateSpecRequest>,
3108) -> impl IntoResponse {
3109    use mockforge_data::rag::{
3110        config::{LlmProvider, RagConfig},
3111        engine::RagEngine,
3112        storage::DocumentStorage,
3113    };
3114    use std::sync::Arc;
3115
3116    // Build RAG config from environment variables
3117    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3118        .ok()
3119        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3120
3121    // Check if RAG is configured - require API key
3122    if api_key.is_none() {
3123        return (
3124            StatusCode::SERVICE_UNAVAILABLE,
3125            Json(serde_json::json!({
3126                "error": "AI service not configured",
3127                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3128            })),
3129        )
3130            .into_response();
3131    }
3132
3133    // Build RAG configuration
3134    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3135        .unwrap_or_else(|_| "openai".to_string())
3136        .to_lowercase();
3137
3138    let provider = match provider_str.as_str() {
3139        "openai" => LlmProvider::OpenAI,
3140        "anthropic" => LlmProvider::Anthropic,
3141        "ollama" => LlmProvider::Ollama,
3142        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3143        _ => LlmProvider::OpenAI,
3144    };
3145
3146    let api_endpoint =
3147        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3148            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3149            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3150            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3151            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3152        });
3153
3154    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3155        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3156        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3157        LlmProvider::Ollama => "llama2".to_string(),
3158        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3159    });
3160
3161    // Build RagConfig using default() and override fields
3162    let mut rag_config = RagConfig::default();
3163    rag_config.provider = provider;
3164    rag_config.api_endpoint = api_endpoint;
3165    rag_config.api_key = api_key;
3166    rag_config.model = model;
3167    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3168        .unwrap_or_else(|_| "4096".to_string())
3169        .parse()
3170        .unwrap_or(4096);
3171    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3172        .unwrap_or_else(|_| "0.3".to_string())
3173        .parse()
3174        .unwrap_or(0.3); // Lower temperature for more structured output
3175    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3176        .unwrap_or_else(|_| "60".to_string())
3177        .parse()
3178        .unwrap_or(60);
3179    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3180        .unwrap_or_else(|_| "4000".to_string())
3181        .parse()
3182        .unwrap_or(4000);
3183
3184    // Build the prompt for spec generation
3185    let spec_type_label = match request.spec_type.as_str() {
3186        "openapi" => "OpenAPI 3.0",
3187        "graphql" => "GraphQL",
3188        "asyncapi" => "AsyncAPI",
3189        _ => "OpenAPI 3.0",
3190    };
3191
3192    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3193
3194    let prompt = format!(
3195        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3196
3197User Requirements:
3198{}
3199
3200Instructions:
32011. Generate a complete, valid {} specification
32022. Include all paths, operations, request/response schemas, and components
32033. Use realistic field names and data types
32044. Include proper descriptions and examples
32055. Follow {} best practices
32066. Return ONLY the specification, no additional explanation
32077. For OpenAPI, use version {}
3208
3209Return the specification in {} format."#,
3210        spec_type_label,
3211        request.query,
3212        spec_type_label,
3213        spec_type_label,
3214        api_version,
3215        if request.spec_type == "graphql" {
3216            "GraphQL SDL"
3217        } else {
3218            "YAML"
3219        }
3220    );
3221
3222    // Create in-memory storage for RAG engine
3223    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3224    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3225    // a simpler approach: create InMemoryStorage directly
3226    use mockforge_data::rag::storage::InMemoryStorage;
3227    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3228
3229    // Create RAG engine
3230    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3231        Ok(engine) => engine,
3232        Err(e) => {
3233            return (
3234                StatusCode::INTERNAL_SERVER_ERROR,
3235                Json(serde_json::json!({
3236                    "error": "Failed to initialize RAG engine",
3237                    "message": e.to_string()
3238                })),
3239            )
3240                .into_response();
3241        }
3242    };
3243
3244    // Generate using RAG engine
3245    match rag_engine.generate(&prompt, None).await {
3246        Ok(generated_text) => {
3247            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3248            let spec = if request.spec_type == "graphql" {
3249                // For GraphQL, extract SDL
3250                extract_graphql_schema(&generated_text)
3251            } else {
3252                // For OpenAPI/AsyncAPI, extract YAML
3253                extract_yaml_spec(&generated_text)
3254            };
3255
3256            Json(serde_json::json!({
3257                "success": true,
3258                "spec": spec,
3259                "spec_type": request.spec_type,
3260            }))
3261            .into_response()
3262        }
3263        Err(e) => (
3264            StatusCode::INTERNAL_SERVER_ERROR,
3265            Json(serde_json::json!({
3266                "error": "AI generation failed",
3267                "message": e.to_string()
3268            })),
3269        )
3270            .into_response(),
3271    }
3272}
3273
3274#[cfg(not(feature = "data-faker"))]
3275async fn generate_ai_spec(
3276    State(_state): State<ManagementState>,
3277    Json(_request): Json<GenerateSpecRequest>,
3278) -> impl IntoResponse {
3279    (
3280        StatusCode::NOT_IMPLEMENTED,
3281        Json(serde_json::json!({
3282            "error": "AI features not enabled",
3283            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3284        })),
3285    )
3286        .into_response()
3287}
3288
3289/// Generate OpenAPI specification from recorded traffic
3290#[cfg(feature = "behavioral-cloning")]
3291async fn generate_openapi_from_traffic(
3292    State(_state): State<ManagementState>,
3293    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3294) -> impl IntoResponse {
3295    use chrono::{DateTime, Utc};
3296    use mockforge_core::intelligent_behavior::{
3297        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3298        IntelligentBehaviorConfig,
3299    };
3300    use mockforge_recorder::{
3301        database::RecorderDatabase,
3302        openapi_export::{QueryFilters, RecordingsToOpenApi},
3303    };
3304    use std::path::PathBuf;
3305
3306    // Determine database path
3307    let db_path = if let Some(ref path) = request.database_path {
3308        PathBuf::from(path)
3309    } else {
3310        std::env::current_dir()
3311            .unwrap_or_else(|_| PathBuf::from("."))
3312            .join("recordings.db")
3313    };
3314
3315    // Open database
3316    let db = match RecorderDatabase::new(&db_path).await {
3317        Ok(db) => db,
3318        Err(e) => {
3319            return (
3320                StatusCode::BAD_REQUEST,
3321                Json(serde_json::json!({
3322                    "error": "Database error",
3323                    "message": format!("Failed to open recorder database: {}", e)
3324                })),
3325            )
3326                .into_response();
3327        }
3328    };
3329
3330    // Parse time filters
3331    let since_dt = if let Some(ref since_str) = request.since {
3332        match DateTime::parse_from_rfc3339(since_str) {
3333            Ok(dt) => Some(dt.with_timezone(&Utc)),
3334            Err(e) => {
3335                return (
3336                    StatusCode::BAD_REQUEST,
3337                    Json(serde_json::json!({
3338                        "error": "Invalid date format",
3339                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3340                    })),
3341                )
3342                    .into_response();
3343            }
3344        }
3345    } else {
3346        None
3347    };
3348
3349    let until_dt = if let Some(ref until_str) = request.until {
3350        match DateTime::parse_from_rfc3339(until_str) {
3351            Ok(dt) => Some(dt.with_timezone(&Utc)),
3352            Err(e) => {
3353                return (
3354                    StatusCode::BAD_REQUEST,
3355                    Json(serde_json::json!({
3356                        "error": "Invalid date format",
3357                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3358                    })),
3359                )
3360                    .into_response();
3361            }
3362        }
3363    } else {
3364        None
3365    };
3366
3367    // Build query filters
3368    let query_filters = QueryFilters {
3369        since: since_dt,
3370        until: until_dt,
3371        path_pattern: request.path_pattern.clone(),
3372        min_status_code: None,
3373        max_requests: Some(1000),
3374    };
3375
3376    // Query HTTP exchanges
3377    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3378    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3379    // dependency, so we need to manually convert to the local version.
3380    let exchanges_from_recorder =
3381        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3382            Ok(exchanges) => exchanges,
3383            Err(e) => {
3384                return (
3385                    StatusCode::INTERNAL_SERVER_ERROR,
3386                    Json(serde_json::json!({
3387                        "error": "Query error",
3388                        "message": format!("Failed to query HTTP exchanges: {}", e)
3389                    })),
3390                )
3391                    .into_response();
3392            }
3393        };
3394
3395    if exchanges_from_recorder.is_empty() {
3396        return (
3397            StatusCode::NOT_FOUND,
3398            Json(serde_json::json!({
3399                "error": "No exchanges found",
3400                "message": "No HTTP exchanges found matching the specified filters"
3401            })),
3402        )
3403            .into_response();
3404    }
3405
3406    // Convert to local HttpExchange type to avoid version mismatch
3407    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3408    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3409        .into_iter()
3410        .map(|e| LocalHttpExchange {
3411            method: e.method,
3412            path: e.path,
3413            query_params: e.query_params,
3414            headers: e.headers,
3415            body: e.body,
3416            body_encoding: e.body_encoding,
3417            status_code: e.status_code,
3418            response_headers: e.response_headers,
3419            response_body: e.response_body,
3420            response_body_encoding: e.response_body_encoding,
3421            timestamp: e.timestamp,
3422        })
3423        .collect();
3424
3425    // Create OpenAPI generator config
3426    let behavior_config = IntelligentBehaviorConfig::default();
3427    let gen_config = OpenApiGenerationConfig {
3428        min_confidence: request.min_confidence,
3429        behavior_model: Some(behavior_config.behavior_model),
3430    };
3431
3432    // Generate OpenAPI spec
3433    let generator = OpenApiSpecGenerator::new(gen_config);
3434    let result = match generator.generate_from_exchanges(exchanges).await {
3435        Ok(result) => result,
3436        Err(e) => {
3437            return (
3438                StatusCode::INTERNAL_SERVER_ERROR,
3439                Json(serde_json::json!({
3440                    "error": "Generation error",
3441                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3442                })),
3443            )
3444                .into_response();
3445        }
3446    };
3447
3448    // Prepare response
3449    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3450        raw.clone()
3451    } else {
3452        match serde_json::to_value(&result.spec.spec) {
3453            Ok(json) => json,
3454            Err(e) => {
3455                return (
3456                    StatusCode::INTERNAL_SERVER_ERROR,
3457                    Json(serde_json::json!({
3458                        "error": "Serialization error",
3459                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3460                    })),
3461                )
3462                    .into_response();
3463            }
3464        }
3465    };
3466
3467    // Build response with metadata
3468    let response = serde_json::json!({
3469        "spec": spec_json,
3470        "metadata": {
3471            "requests_analyzed": result.metadata.requests_analyzed,
3472            "paths_inferred": result.metadata.paths_inferred,
3473            "path_confidence": result.metadata.path_confidence,
3474            "generated_at": result.metadata.generated_at.to_rfc3339(),
3475            "duration_ms": result.metadata.duration_ms,
3476        }
3477    });
3478
3479    Json(response).into_response()
3480}
3481
3482/// List all rule explanations
3483async fn list_rule_explanations(
3484    State(state): State<ManagementState>,
3485    Query(params): Query<std::collections::HashMap<String, String>>,
3486) -> impl IntoResponse {
3487    use mockforge_core::intelligent_behavior::RuleType;
3488
3489    let explanations = state.rule_explanations.read().await;
3490    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3491
3492    // Filter by rule type if provided
3493    if let Some(rule_type_str) = params.get("rule_type") {
3494        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3495            explanations_vec.retain(|e| e.rule_type == rule_type);
3496        }
3497    }
3498
3499    // Filter by minimum confidence if provided
3500    if let Some(min_confidence_str) = params.get("min_confidence") {
3501        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3502            explanations_vec.retain(|e| e.confidence >= min_confidence);
3503        }
3504    }
3505
3506    // Sort by confidence (descending) and then by generated_at (descending)
3507    explanations_vec.sort_by(|a, b| {
3508        b.confidence
3509            .partial_cmp(&a.confidence)
3510            .unwrap_or(std::cmp::Ordering::Equal)
3511            .then_with(|| b.generated_at.cmp(&a.generated_at))
3512    });
3513
3514    Json(serde_json::json!({
3515        "explanations": explanations_vec,
3516        "total": explanations_vec.len(),
3517    }))
3518    .into_response()
3519}
3520
3521/// Get a specific rule explanation by ID
3522async fn get_rule_explanation(
3523    State(state): State<ManagementState>,
3524    Path(rule_id): Path<String>,
3525) -> impl IntoResponse {
3526    let explanations = state.rule_explanations.read().await;
3527
3528    match explanations.get(&rule_id) {
3529        Some(explanation) => Json(serde_json::json!({
3530            "explanation": explanation,
3531        }))
3532        .into_response(),
3533        None => (
3534            StatusCode::NOT_FOUND,
3535            Json(serde_json::json!({
3536                "error": "Rule explanation not found",
3537                "message": format!("No explanation found for rule ID: {}", rule_id)
3538            })),
3539        )
3540            .into_response(),
3541    }
3542}
3543
3544/// Request for learning from examples
3545#[derive(Debug, Deserialize)]
3546pub struct LearnFromExamplesRequest {
3547    /// Example request/response pairs to learn from
3548    pub examples: Vec<ExamplePairRequest>,
3549    /// Optional configuration override
3550    #[serde(default)]
3551    pub config: Option<serde_json::Value>,
3552}
3553
3554/// Example pair request format
3555#[derive(Debug, Deserialize)]
3556pub struct ExamplePairRequest {
3557    /// Request data (method, path, body, etc.)
3558    pub request: serde_json::Value,
3559    /// Response data (status_code, body, etc.)
3560    pub response: serde_json::Value,
3561}
3562
3563/// Learn behavioral rules from example pairs
3564///
3565/// This endpoint accepts example request/response pairs, generates behavioral rules
3566/// with explanations, and stores the explanations for later retrieval.
3567async fn learn_from_examples(
3568    State(state): State<ManagementState>,
3569    Json(request): Json<LearnFromExamplesRequest>,
3570) -> impl IntoResponse {
3571    use mockforge_core::intelligent_behavior::{
3572        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3573        rule_generator::{ExamplePair, RuleGenerator},
3574    };
3575
3576    if request.examples.is_empty() {
3577        return (
3578            StatusCode::BAD_REQUEST,
3579            Json(serde_json::json!({
3580                "error": "No examples provided",
3581                "message": "At least one example pair is required"
3582            })),
3583        )
3584            .into_response();
3585    }
3586
3587    // Convert request examples to ExamplePair format
3588    let example_pairs: Result<Vec<ExamplePair>, String> = request
3589        .examples
3590        .into_iter()
3591        .enumerate()
3592        .map(|(idx, ex)| {
3593            // Parse request JSON to extract method, path, body, etc.
3594            let method = ex
3595                .request
3596                .get("method")
3597                .and_then(|v| v.as_str())
3598                .map(|s| s.to_string())
3599                .unwrap_or_else(|| "GET".to_string());
3600            let path = ex
3601                .request
3602                .get("path")
3603                .and_then(|v| v.as_str())
3604                .map(|s| s.to_string())
3605                .unwrap_or_else(|| "/".to_string());
3606            let request_body = ex.request.get("body").cloned();
3607            let query_params = ex
3608                .request
3609                .get("query_params")
3610                .and_then(|v| v.as_object())
3611                .map(|obj| {
3612                    obj.iter()
3613                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3614                        .collect()
3615                })
3616                .unwrap_or_default();
3617            let headers = ex
3618                .request
3619                .get("headers")
3620                .and_then(|v| v.as_object())
3621                .map(|obj| {
3622                    obj.iter()
3623                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3624                        .collect()
3625                })
3626                .unwrap_or_default();
3627
3628            // Parse response JSON to extract status, body, etc.
3629            let status = ex
3630                .response
3631                .get("status_code")
3632                .or_else(|| ex.response.get("status"))
3633                .and_then(|v| v.as_u64())
3634                .map(|n| n as u16)
3635                .unwrap_or(200);
3636            let response_body = ex.response.get("body").cloned();
3637
3638            Ok(ExamplePair {
3639                method,
3640                path,
3641                request: request_body,
3642                status,
3643                response: response_body,
3644                query_params,
3645                headers,
3646                metadata: {
3647                    let mut meta = std::collections::HashMap::new();
3648                    meta.insert("source".to_string(), "api".to_string());
3649                    meta.insert("example_index".to_string(), idx.to_string());
3650                    meta
3651                },
3652            })
3653        })
3654        .collect();
3655
3656    let example_pairs = match example_pairs {
3657        Ok(pairs) => pairs,
3658        Err(e) => {
3659            return (
3660                StatusCode::BAD_REQUEST,
3661                Json(serde_json::json!({
3662                    "error": "Invalid examples",
3663                    "message": e
3664                })),
3665            )
3666                .into_response();
3667        }
3668    };
3669
3670    // Create behavior config (use provided config or default)
3671    let behavior_config = if let Some(config_json) = request.config {
3672        // Try to deserialize custom config, fall back to default
3673        serde_json::from_value(config_json)
3674            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3675            .behavior_model
3676    } else {
3677        BehaviorModelConfig::default()
3678    };
3679
3680    // Create rule generator
3681    let generator = RuleGenerator::new(behavior_config);
3682
3683    // Generate rules with explanations
3684    let (rules, explanations) =
3685        match generator.generate_rules_with_explanations(example_pairs).await {
3686            Ok(result) => result,
3687            Err(e) => {
3688                return (
3689                    StatusCode::INTERNAL_SERVER_ERROR,
3690                    Json(serde_json::json!({
3691                        "error": "Rule generation failed",
3692                        "message": format!("Failed to generate rules: {}", e)
3693                    })),
3694                )
3695                    .into_response();
3696            }
3697        };
3698
3699    // Store explanations in ManagementState
3700    {
3701        let mut stored_explanations = state.rule_explanations.write().await;
3702        for explanation in &explanations {
3703            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3704        }
3705    }
3706
3707    // Prepare response
3708    let response = serde_json::json!({
3709        "success": true,
3710        "rules_generated": {
3711            "consistency_rules": rules.consistency_rules.len(),
3712            "schemas": rules.schemas.len(),
3713            "state_machines": rules.state_transitions.len(),
3714            "system_prompt": !rules.system_prompt.is_empty(),
3715        },
3716        "explanations": explanations.iter().map(|e| serde_json::json!({
3717            "rule_id": e.rule_id,
3718            "rule_type": e.rule_type,
3719            "confidence": e.confidence,
3720            "reasoning": e.reasoning,
3721        })).collect::<Vec<_>>(),
3722        "total_explanations": explanations.len(),
3723    });
3724
3725    Json(response).into_response()
3726}
3727
3728#[cfg(feature = "data-faker")]
3729fn extract_yaml_spec(text: &str) -> String {
3730    // Try to find YAML code blocks
3731    if let Some(start) = text.find("```yaml") {
3732        let yaml_start = text[start + 7..].trim_start();
3733        if let Some(end) = yaml_start.find("```") {
3734            return yaml_start[..end].trim().to_string();
3735        }
3736    }
3737    if let Some(start) = text.find("```") {
3738        let content_start = text[start + 3..].trim_start();
3739        if let Some(end) = content_start.find("```") {
3740            return content_start[..end].trim().to_string();
3741        }
3742    }
3743
3744    // Check if it starts with openapi: or asyncapi:
3745    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3746        return text.trim().to_string();
3747    }
3748
3749    // Return as-is if no code blocks found
3750    text.trim().to_string()
3751}
3752
3753/// Extract GraphQL schema from text content
3754#[cfg(feature = "data-faker")]
3755fn extract_graphql_schema(text: &str) -> String {
3756    // Try to find GraphQL code blocks
3757    if let Some(start) = text.find("```graphql") {
3758        let schema_start = text[start + 10..].trim_start();
3759        if let Some(end) = schema_start.find("```") {
3760            return schema_start[..end].trim().to_string();
3761        }
3762    }
3763    if let Some(start) = text.find("```") {
3764        let content_start = text[start + 3..].trim_start();
3765        if let Some(end) = content_start.find("```") {
3766            return content_start[..end].trim().to_string();
3767        }
3768    }
3769
3770    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3771    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3772        return text.trim().to_string();
3773    }
3774
3775    text.trim().to_string()
3776}
3777
3778// ========== Chaos Engineering Management ==========
3779
3780/// Get current chaos engineering configuration
3781async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3782    #[cfg(feature = "chaos")]
3783    {
3784        if let Some(chaos_state) = &_state.chaos_api_state {
3785            let config = chaos_state.config.read().await;
3786            // Convert ChaosConfig to JSON response format
3787            Json(serde_json::json!({
3788                "enabled": config.enabled,
3789                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3790                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3791                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3792                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3793            }))
3794            .into_response()
3795        } else {
3796            // Chaos API not available, return default
3797            Json(serde_json::json!({
3798                "enabled": false,
3799                "latency": null,
3800                "fault_injection": null,
3801                "rate_limit": null,
3802                "traffic_shaping": null,
3803            }))
3804            .into_response()
3805        }
3806    }
3807    #[cfg(not(feature = "chaos"))]
3808    {
3809        // Chaos feature not enabled
3810        Json(serde_json::json!({
3811            "enabled": false,
3812            "latency": null,
3813            "fault_injection": null,
3814            "rate_limit": null,
3815            "traffic_shaping": null,
3816        }))
3817        .into_response()
3818    }
3819}
3820
3821/// Request to update chaos configuration
3822#[derive(Debug, Deserialize)]
3823pub struct ChaosConfigUpdate {
3824    /// Whether to enable chaos engineering
3825    pub enabled: Option<bool>,
3826    /// Latency configuration
3827    pub latency: Option<serde_json::Value>,
3828    /// Fault injection configuration
3829    pub fault_injection: Option<serde_json::Value>,
3830    /// Rate limiting configuration
3831    pub rate_limit: Option<serde_json::Value>,
3832    /// Traffic shaping configuration
3833    pub traffic_shaping: Option<serde_json::Value>,
3834}
3835
3836/// Update chaos engineering configuration
3837async fn update_chaos_config(
3838    State(_state): State<ManagementState>,
3839    Json(_config_update): Json<ChaosConfigUpdate>,
3840) -> impl IntoResponse {
3841    #[cfg(feature = "chaos")]
3842    {
3843        if let Some(chaos_state) = &_state.chaos_api_state {
3844            use mockforge_chaos::config::{
3845                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3846                TrafficShapingConfig,
3847            };
3848
3849            let mut config = chaos_state.config.write().await;
3850
3851            // Update enabled flag if provided
3852            if let Some(enabled) = _config_update.enabled {
3853                config.enabled = enabled;
3854            }
3855
3856            // Update latency config if provided
3857            if let Some(latency_json) = _config_update.latency {
3858                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3859                    config.latency = Some(latency);
3860                }
3861            }
3862
3863            // Update fault injection config if provided
3864            if let Some(fault_json) = _config_update.fault_injection {
3865                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3866                    config.fault_injection = Some(fault);
3867                }
3868            }
3869
3870            // Update rate limit config if provided
3871            if let Some(rate_json) = _config_update.rate_limit {
3872                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3873                    config.rate_limit = Some(rate);
3874                }
3875            }
3876
3877            // Update traffic shaping config if provided
3878            if let Some(traffic_json) = _config_update.traffic_shaping {
3879                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3880                    config.traffic_shaping = Some(traffic);
3881                }
3882            }
3883
3884            // Reinitialize middleware injectors with new config
3885            // The middleware will pick up the changes on the next request
3886            drop(config);
3887
3888            info!("Chaos configuration updated successfully");
3889            Json(serde_json::json!({
3890                "success": true,
3891                "message": "Chaos configuration updated and applied"
3892            }))
3893            .into_response()
3894        } else {
3895            (
3896                StatusCode::SERVICE_UNAVAILABLE,
3897                Json(serde_json::json!({
3898                    "success": false,
3899                    "error": "Chaos API not available",
3900                    "message": "Chaos engineering is not enabled or configured"
3901                })),
3902            )
3903                .into_response()
3904        }
3905    }
3906    #[cfg(not(feature = "chaos"))]
3907    {
3908        (
3909            StatusCode::NOT_IMPLEMENTED,
3910            Json(serde_json::json!({
3911                "success": false,
3912                "error": "Chaos feature not enabled",
3913                "message": "Chaos engineering feature is not compiled into this build"
3914            })),
3915        )
3916            .into_response()
3917    }
3918}
3919
3920// ========== Network Profile Management ==========
3921
3922/// List available network profiles
3923async fn list_network_profiles() -> impl IntoResponse {
3924    use mockforge_core::network_profiles::NetworkProfileCatalog;
3925
3926    let catalog = NetworkProfileCatalog::default();
3927    let profiles: Vec<serde_json::Value> = catalog
3928        .list_profiles_with_description()
3929        .iter()
3930        .map(|(name, description)| {
3931            serde_json::json!({
3932                "name": name,
3933                "description": description,
3934            })
3935        })
3936        .collect();
3937
3938    Json(serde_json::json!({
3939        "profiles": profiles
3940    }))
3941    .into_response()
3942}
3943
3944#[derive(Debug, Deserialize)]
3945/// Request to apply a network profile
3946pub struct ApplyNetworkProfileRequest {
3947    /// Name of the network profile to apply
3948    pub profile_name: String,
3949}
3950
3951/// Apply a network profile
3952async fn apply_network_profile(
3953    State(state): State<ManagementState>,
3954    Json(request): Json<ApplyNetworkProfileRequest>,
3955) -> impl IntoResponse {
3956    use mockforge_core::network_profiles::NetworkProfileCatalog;
3957
3958    let catalog = NetworkProfileCatalog::default();
3959    if let Some(profile) = catalog.get(&request.profile_name) {
3960        // Apply profile to server configuration if available
3961        // NetworkProfile contains latency and traffic_shaping configs
3962        if let Some(server_config) = &state.server_config {
3963            let mut config = server_config.write().await;
3964
3965            // Apply network profile's traffic shaping to core config
3966            use mockforge_core::config::NetworkShapingConfig;
3967
3968            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3969            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3970            // which has bandwidth and burst_loss fields
3971            let network_shaping = NetworkShapingConfig {
3972                enabled: profile.traffic_shaping.bandwidth.enabled
3973                    || profile.traffic_shaping.burst_loss.enabled,
3974                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3975                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3976                max_connections: 1000, // Default value
3977            };
3978
3979            // Update chaos config if it exists, or create it
3980            // Chaos config is in observability.chaos, not core.chaos
3981            if let Some(ref mut chaos) = config.observability.chaos {
3982                chaos.traffic_shaping = Some(network_shaping);
3983            } else {
3984                // Create minimal chaos config with traffic shaping
3985                use mockforge_core::config::ChaosEngConfig;
3986                config.observability.chaos = Some(ChaosEngConfig {
3987                    enabled: true,
3988                    latency: None,
3989                    fault_injection: None,
3990                    rate_limit: None,
3991                    traffic_shaping: Some(network_shaping),
3992                    scenario: None,
3993                });
3994            }
3995
3996            info!("Network profile '{}' applied to server configuration", request.profile_name);
3997        } else {
3998            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3999        }
4000
4001        // Also update chaos API state if available
4002        #[cfg(feature = "chaos")]
4003        {
4004            if let Some(chaos_state) = &state.chaos_api_state {
4005                use mockforge_chaos::config::TrafficShapingConfig;
4006
4007                let mut chaos_config = chaos_state.config.write().await;
4008                // Apply profile's traffic shaping to chaos API state
4009                let chaos_traffic_shaping = TrafficShapingConfig {
4010                    enabled: profile.traffic_shaping.bandwidth.enabled
4011                        || profile.traffic_shaping.burst_loss.enabled,
4012                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4013                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4014                    max_connections: 0,
4015                    connection_timeout_ms: 30000,
4016                };
4017                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4018                chaos_config.enabled = true; // Enable chaos when applying a profile
4019                drop(chaos_config);
4020                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4021            }
4022        }
4023
4024        Json(serde_json::json!({
4025            "success": true,
4026            "message": format!("Network profile '{}' applied", request.profile_name),
4027            "profile": {
4028                "name": profile.name,
4029                "description": profile.description,
4030            }
4031        }))
4032        .into_response()
4033    } else {
4034        (
4035            StatusCode::NOT_FOUND,
4036            Json(serde_json::json!({
4037                "error": "Profile not found",
4038                "message": format!("Network profile '{}' not found", request.profile_name)
4039            })),
4040        )
4041            .into_response()
4042    }
4043}
4044
4045/// Build the management API router with UI Builder support
4046pub fn management_router_with_ui_builder(
4047    state: ManagementState,
4048    server_config: mockforge_core::config::ServerConfig,
4049) -> Router {
4050    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4051
4052    // Create the base management router
4053    let management = management_router(state);
4054
4055    // Create UI Builder state and router
4056    let ui_builder_state = UIBuilderState::new(server_config);
4057    let ui_builder = create_ui_builder_router(ui_builder_state);
4058
4059    // Nest UI Builder under /ui-builder
4060    management.nest("/ui-builder", ui_builder)
4061}
4062
4063/// Build management router with spec import API
4064pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4065    use crate::spec_import::{spec_import_router, SpecImportState};
4066
4067    // Create base management router
4068    let management = management_router(state);
4069
4070    // Merge with spec import router
4071    Router::new()
4072        .merge(management)
4073        .merge(spec_import_router(SpecImportState::new()))
4074}
4075
4076#[cfg(test)]
4077mod tests {
4078    use super::*;
4079
4080    #[tokio::test]
4081    async fn test_create_and_get_mock() {
4082        let state = ManagementState::new(None, None, 3000);
4083
4084        let mock = MockConfig {
4085            id: "test-1".to_string(),
4086            name: "Test Mock".to_string(),
4087            method: "GET".to_string(),
4088            path: "/test".to_string(),
4089            response: MockResponse {
4090                body: serde_json::json!({"message": "test"}),
4091                headers: None,
4092            },
4093            enabled: true,
4094            latency_ms: None,
4095            status_code: Some(200),
4096            request_match: None,
4097            priority: None,
4098            scenario: None,
4099            required_scenario_state: None,
4100            new_scenario_state: None,
4101        };
4102
4103        // Create mock
4104        {
4105            let mut mocks = state.mocks.write().await;
4106            mocks.push(mock.clone());
4107        }
4108
4109        // Get mock
4110        let mocks = state.mocks.read().await;
4111        let found = mocks.iter().find(|m| m.id == "test-1");
4112        assert!(found.is_some());
4113        assert_eq!(found.unwrap().name, "Test Mock");
4114    }
4115
4116    #[tokio::test]
4117    async fn test_server_stats() {
4118        let state = ManagementState::new(None, None, 3000);
4119
4120        // Add some mocks
4121        {
4122            let mut mocks = state.mocks.write().await;
4123            mocks.push(MockConfig {
4124                id: "1".to_string(),
4125                name: "Mock 1".to_string(),
4126                method: "GET".to_string(),
4127                path: "/test1".to_string(),
4128                response: MockResponse {
4129                    body: serde_json::json!({}),
4130                    headers: None,
4131                },
4132                enabled: true,
4133                latency_ms: None,
4134                status_code: Some(200),
4135                request_match: None,
4136                priority: None,
4137                scenario: None,
4138                required_scenario_state: None,
4139                new_scenario_state: None,
4140            });
4141            mocks.push(MockConfig {
4142                id: "2".to_string(),
4143                name: "Mock 2".to_string(),
4144                method: "POST".to_string(),
4145                path: "/test2".to_string(),
4146                response: MockResponse {
4147                    body: serde_json::json!({}),
4148                    headers: None,
4149                },
4150                enabled: false,
4151                latency_ms: None,
4152                status_code: Some(201),
4153                request_match: None,
4154                priority: None,
4155                scenario: None,
4156                required_scenario_state: None,
4157                new_scenario_state: None,
4158            });
4159        }
4160
4161        let mocks = state.mocks.read().await;
4162        assert_eq!(mocks.len(), 2);
4163        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4164    }
4165
4166    #[test]
4167    fn test_mock_matches_request_with_xpath_absolute_path() {
4168        let mock = MockConfig {
4169            id: "xpath-1".to_string(),
4170            name: "XPath Match".to_string(),
4171            method: "POST".to_string(),
4172            path: "/xml".to_string(),
4173            response: MockResponse {
4174                body: serde_json::json!({"ok": true}),
4175                headers: None,
4176            },
4177            enabled: true,
4178            latency_ms: None,
4179            status_code: Some(200),
4180            request_match: Some(RequestMatchCriteria {
4181                xpath: Some("/root/order/id".to_string()),
4182                ..Default::default()
4183            }),
4184            priority: None,
4185            scenario: None,
4186            required_scenario_state: None,
4187            new_scenario_state: None,
4188        };
4189
4190        let body = br#"<root><order><id>123</id></order></root>"#;
4191        let headers = std::collections::HashMap::new();
4192        let query = std::collections::HashMap::new();
4193
4194        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4195    }
4196
4197    #[test]
4198    fn test_mock_matches_request_with_xpath_text_predicate() {
4199        let mock = MockConfig {
4200            id: "xpath-2".to_string(),
4201            name: "XPath Predicate Match".to_string(),
4202            method: "POST".to_string(),
4203            path: "/xml".to_string(),
4204            response: MockResponse {
4205                body: serde_json::json!({"ok": true}),
4206                headers: None,
4207            },
4208            enabled: true,
4209            latency_ms: None,
4210            status_code: Some(200),
4211            request_match: Some(RequestMatchCriteria {
4212                xpath: Some("//order/id[text()='123']".to_string()),
4213                ..Default::default()
4214            }),
4215            priority: None,
4216            scenario: None,
4217            required_scenario_state: None,
4218            new_scenario_state: None,
4219        };
4220
4221        let body = br#"<root><order><id>123</id></order></root>"#;
4222        let headers = std::collections::HashMap::new();
4223        let query = std::collections::HashMap::new();
4224
4225        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4226    }
4227
4228    #[test]
4229    fn test_mock_matches_request_with_xpath_no_match() {
4230        let mock = MockConfig {
4231            id: "xpath-3".to_string(),
4232            name: "XPath No Match".to_string(),
4233            method: "POST".to_string(),
4234            path: "/xml".to_string(),
4235            response: MockResponse {
4236                body: serde_json::json!({"ok": true}),
4237                headers: None,
4238            },
4239            enabled: true,
4240            latency_ms: None,
4241            status_code: Some(200),
4242            request_match: Some(RequestMatchCriteria {
4243                xpath: Some("//order/id[text()='456']".to_string()),
4244                ..Default::default()
4245            }),
4246            priority: None,
4247            scenario: None,
4248            required_scenario_state: None,
4249            new_scenario_state: None,
4250        };
4251
4252        let body = br#"<root><order><id>123</id></order></root>"#;
4253        let headers = std::collections::HashMap::new();
4254        let query = std::collections::HashMap::new();
4255
4256        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4257    }
4258}