Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32/// Get the broadcast channel capacity from environment or use default
33fn get_message_broadcast_capacity() -> usize {
34    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35        .ok()
36        .and_then(|s| s.parse().ok())
37        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40/// Message event types for real-time monitoring
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45    #[cfg(feature = "mqtt")]
46    /// MQTT message event
47    Mqtt(MqttMessageEvent),
48    #[cfg(feature = "kafka")]
49    /// Kafka message event
50    Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54/// MQTT message event for real-time monitoring
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57    /// MQTT topic name
58    pub topic: String,
59    /// Message payload content
60    pub payload: String,
61    /// Quality of Service level (0, 1, or 2)
62    pub qos: u8,
63    /// Whether the message is retained
64    pub retain: bool,
65    /// RFC3339 formatted timestamp
66    pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72    pub topic: String,
73    pub key: Option<String>,
74    pub value: String,
75    pub partition: i32,
76    pub offset: i64,
77    pub headers: Option<std::collections::HashMap<String, String>>,
78    pub timestamp: String,
79}
80
81/// Mock configuration representation
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84    /// Unique identifier for the mock
85    #[serde(skip_serializing_if = "String::is_empty")]
86    pub id: String,
87    /// Human-readable name for the mock
88    pub name: String,
89    /// HTTP method (GET, POST, etc.)
90    pub method: String,
91    /// API path pattern to match
92    pub path: String,
93    /// Response configuration
94    pub response: MockResponse,
95    /// Whether this mock is currently enabled
96    #[serde(default = "default_true")]
97    pub enabled: bool,
98    /// Optional latency to inject in milliseconds
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub latency_ms: Option<u64>,
101    /// Optional HTTP status code override
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub status_code: Option<u16>,
104    /// Request matching criteria (headers, query params, body patterns)
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub request_match: Option<RequestMatchCriteria>,
107    /// Priority for mock ordering (higher priority mocks are matched first)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub priority: Option<i32>,
110    /// Scenario name for stateful mocking
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub scenario: Option<String>,
113    /// Required scenario state for this mock to be active
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub required_scenario_state: Option<String>,
116    /// New scenario state after this mock is matched
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122    true
123}
124
125/// Mock response configuration
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128    /// Response body as JSON
129    pub body: serde_json::Value,
130    /// Optional custom response headers
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135/// Request matching criteria for advanced request matching
136#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138    /// Headers that must be present and match (case-insensitive header names)
139    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140    pub headers: std::collections::HashMap<String, String>,
141    /// Query parameters that must be present and match
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub query_params: std::collections::HashMap<String, String>,
144    /// Request body pattern (supports exact match or regex)
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub body_pattern: Option<String>,
147    /// JSONPath expression for JSON body matching
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub json_path: Option<String>,
150    /// XPath expression for XML body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub xpath: Option<String>,
153    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub custom_matcher: Option<String>,
156}
157
158/// Check if a request matches the given mock configuration
159///
160/// This function implements comprehensive request matching including:
161/// - Method and path matching
162/// - Header matching (with regex support)
163/// - Query parameter matching
164/// - Body pattern matching (exact, regex, JSONPath, XPath)
165/// - Custom matcher expressions
166pub fn mock_matches_request(
167    mock: &MockConfig,
168    method: &str,
169    path: &str,
170    headers: &std::collections::HashMap<String, String>,
171    query_params: &std::collections::HashMap<String, String>,
172    body: Option<&[u8]>,
173) -> bool {
174    use regex::Regex;
175
176    // Check if mock is enabled
177    if !mock.enabled {
178        return false;
179    }
180
181    // Check method (case-insensitive)
182    if mock.method.to_uppercase() != method.to_uppercase() {
183        return false;
184    }
185
186    // Check path pattern (supports wildcards and path parameters)
187    if !path_matches_pattern(&mock.path, path) {
188        return false;
189    }
190
191    // Check request matching criteria if present
192    if let Some(criteria) = &mock.request_match {
193        // Check headers
194        for (key, expected_value) in &criteria.headers {
195            let header_key_lower = key.to_lowercase();
196            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198            if let Some((_, actual_value)) = found {
199                // Try regex match first, then exact match
200                if let Ok(re) = Regex::new(expected_value) {
201                    if !re.is_match(actual_value) {
202                        return false;
203                    }
204                } else if actual_value != expected_value {
205                    return false;
206                }
207            } else {
208                return false; // Header not found
209            }
210        }
211
212        // Check query parameters
213        for (key, expected_value) in &criteria.query_params {
214            if let Some(actual_value) = query_params.get(key) {
215                if actual_value != expected_value {
216                    return false;
217                }
218            } else {
219                return false; // Query param not found
220            }
221        }
222
223        // Check body pattern
224        if let Some(pattern) = &criteria.body_pattern {
225            if let Some(body_bytes) = body {
226                let body_str = String::from_utf8_lossy(body_bytes);
227                // Try regex first, then exact match
228                if let Ok(re) = Regex::new(pattern) {
229                    if !re.is_match(&body_str) {
230                        return false;
231                    }
232                } else if body_str.as_ref() != pattern {
233                    return false;
234                }
235            } else {
236                return false; // Body required but not present
237            }
238        }
239
240        // Check JSONPath (simplified implementation)
241        if let Some(json_path) = &criteria.json_path {
242            if let Some(body_bytes) = body {
243                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245                        // Simple JSONPath check
246                        if !json_path_exists(&json_value, json_path) {
247                            return false;
248                        }
249                    }
250                }
251            }
252        }
253
254        // Check XPath (supports a focused subset)
255        if let Some(xpath) = &criteria.xpath {
256            if let Some(body_bytes) = body {
257                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258                    if !xml_xpath_exists(body_str, xpath) {
259                        return false;
260                    }
261                } else {
262                    return false;
263                }
264            } else {
265                return false; // Body required but not present
266            }
267        }
268
269        // Check custom matcher
270        if let Some(custom) = &criteria.custom_matcher {
271            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272                return false;
273            }
274        }
275    }
276
277    true
278}
279
280/// Check if a path matches a pattern (supports wildcards and path parameters)
281fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282    // Exact match
283    if pattern == path {
284        return true;
285    }
286
287    // Wildcard match
288    if pattern == "*" {
289        return true;
290    }
291
292    // Path parameter matching (e.g., /users/{id} matches /users/123)
293    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296    if pattern_parts.len() != path_parts.len() {
297        // Check for wildcard patterns
298        if pattern.contains('*') {
299            return matches_wildcard_pattern(pattern, path);
300        }
301        return false;
302    }
303
304    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305        // Check for path parameters {param}
306        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307            continue; // Matches any value
308        }
309
310        if pattern_part != path_part {
311            return false;
312        }
313    }
314
315    true
316}
317
318/// Check if path matches a wildcard pattern
319fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320    use regex::Regex;
321
322    // Convert pattern to regex
323    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326        return re.is_match(path);
327    }
328
329    false
330}
331
332/// Check if a JSONPath exists in a JSON value (simplified implementation)
333fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
335    // This handles simple paths like $.field or $.field.subfield
336    if let Some(path) = json_path.strip_prefix("$.") {
337        let parts: Vec<&str> = path.split('.').collect();
338
339        let mut current = json;
340        for part in parts {
341            if let Some(obj) = current.as_object() {
342                if let Some(value) = obj.get(part) {
343                    current = value;
344                } else {
345                    return false;
346                }
347            } else {
348                return false;
349            }
350        }
351        true
352    } else {
353        // For complex JSONPath expressions, would need a proper JSONPath library
354        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355        false
356    }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361    name: String,
362    text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366    if segment.is_empty() {
367        return None;
368    }
369
370    let trimmed = segment.trim();
371    if let Some(bracket_start) = trimmed.find('[') {
372        if !trimmed.ends_with(']') {
373            return None;
374        }
375
376        let name = trimmed[..bracket_start].trim();
377        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378        let predicate = predicate.trim();
379
380        // Support simple predicate: [text()="value"] or [text()='value']
381        if let Some(raw) = predicate.strip_prefix("text()=") {
382            let raw = raw.trim();
383            if raw.len() >= 2
384                && ((raw.starts_with('"') && raw.ends_with('"'))
385                    || (raw.starts_with('\'') && raw.ends_with('\'')))
386            {
387                let text = raw[1..raw.len() - 1].to_string();
388                if !name.is_empty() {
389                    return Some(XPathSegment {
390                        name: name.to_string(),
391                        text_equals: Some(text),
392                    });
393                }
394            }
395        }
396
397        None
398    } else {
399        Some(XPathSegment {
400            name: trimmed.to_string(),
401            text_equals: None,
402        })
403    }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407    if !node.is_element() {
408        return false;
409    }
410    if node.tag_name().name() != segment.name {
411        return false;
412    }
413    match &segment.text_equals {
414        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415        None => true,
416    }
417}
418
419/// Check if an XPath expression matches an XML body.
420///
421/// Supported subset:
422/// - Absolute paths: `/root/child/item`
423/// - Descendant search: `//item` and `//parent/child`
424/// - Optional text predicate per segment: `item[text()="value"]`
425fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426    let doc = match roxmltree::Document::parse(xml_body) {
427        Ok(doc) => doc,
428        Err(err) => {
429            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430            return false;
431        }
432    };
433
434    let expr = xpath.trim();
435    if expr.is_empty() {
436        return false;
437    }
438
439    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440        (true, rest)
441    } else if let Some(rest) = expr.strip_prefix('/') {
442        (false, rest)
443    } else {
444        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445        return false;
446    };
447
448    let segments: Vec<XPathSegment> = path_str
449        .split('/')
450        .filter(|s| !s.trim().is_empty())
451        .filter_map(parse_xpath_segment)
452        .collect();
453
454    if segments.is_empty() {
455        return false;
456    }
457
458    if is_descendant {
459        let first = &segments[0];
460        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461            let mut frontier = vec![node];
462            for segment in &segments[1..] {
463                let mut next_frontier = Vec::new();
464                for parent in &frontier {
465                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466                        next_frontier.push(child);
467                    }
468                }
469                if next_frontier.is_empty() {
470                    frontier.clear();
471                    break;
472                }
473                frontier = next_frontier;
474            }
475            if !frontier.is_empty() {
476                return true;
477            }
478        }
479        false
480    } else {
481        let mut frontier = vec![doc.root_element()];
482        for (index, segment) in segments.iter().enumerate() {
483            let mut next_frontier = Vec::new();
484            for parent in &frontier {
485                if index == 0 {
486                    if segment_matches(*parent, segment) {
487                        next_frontier.push(*parent);
488                    }
489                    continue;
490                }
491                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492                    next_frontier.push(child);
493                }
494            }
495            if next_frontier.is_empty() {
496                return false;
497            }
498            frontier = next_frontier;
499        }
500        !frontier.is_empty()
501    }
502}
503
504/// Evaluate a custom matcher expression
505fn evaluate_custom_matcher(
506    expression: &str,
507    method: &str,
508    path: &str,
509    headers: &std::collections::HashMap<String, String>,
510    query_params: &std::collections::HashMap<String, String>,
511    body: Option<&[u8]>,
512) -> bool {
513    use regex::Regex;
514
515    let expr = expression.trim();
516
517    // Handle equality expressions (field == "value")
518    if expr.contains("==") {
519        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520        if parts.len() != 2 {
521            return false;
522        }
523
524        let field = parts[0];
525        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527        match field {
528            "method" => method == expected_value,
529            "path" => path == expected_value,
530            _ if field.starts_with("headers.") => {
531                let header_name = &field[8..];
532                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533            }
534            _ if field.starts_with("query.") => {
535                let param_name = &field[6..];
536                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537            }
538            _ => false,
539        }
540    }
541    // Handle regex match expressions (field =~ "pattern")
542    else if expr.contains("=~") {
543        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544        if parts.len() != 2 {
545            return false;
546        }
547
548        let field = parts[0];
549        let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551        if let Ok(re) = Regex::new(pattern) {
552            match field {
553                "method" => re.is_match(method),
554                "path" => re.is_match(path),
555                _ if field.starts_with("headers.") => {
556                    let header_name = &field[8..];
557                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558                }
559                _ if field.starts_with("query.") => {
560                    let param_name = &field[6..];
561                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562                }
563                _ => false,
564            }
565        } else {
566            false
567        }
568    }
569    // Handle contains expressions (field contains "value")
570    else if expr.contains("contains") {
571        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572        if parts.len() != 2 {
573            return false;
574        }
575
576        let field = parts[0];
577        let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579        match field {
580            "path" => path.contains(search_value),
581            _ if field.starts_with("headers.") => {
582                let header_name = &field[8..];
583                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584            }
585            _ if field.starts_with("body") => {
586                if let Some(body_bytes) = body {
587                    let body_str = String::from_utf8_lossy(body_bytes);
588                    body_str.contains(search_value)
589                } else {
590                    false
591                }
592            }
593            _ => false,
594        }
595    } else {
596        // Unknown expression format
597        tracing::warn!("Unknown custom matcher expression format: {}", expr);
598        false
599    }
600}
601
602/// Server statistics
603#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605    /// Server uptime in seconds
606    pub uptime_seconds: u64,
607    /// Total number of requests processed
608    pub total_requests: u64,
609    /// Number of active mock configurations
610    pub active_mocks: usize,
611    /// Number of currently enabled mocks
612    pub enabled_mocks: usize,
613    /// Number of registered API routes
614    pub registered_routes: usize,
615}
616
617/// Server configuration info
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620    /// MockForge version string
621    pub version: String,
622    /// Server port number
623    pub port: u16,
624    /// Whether an OpenAPI spec is loaded
625    pub has_openapi_spec: bool,
626    /// Optional path to the OpenAPI spec file
627    #[serde(skip_serializing_if = "Option::is_none")]
628    pub spec_path: Option<String>,
629}
630
631/// Shared state for the management API
632#[derive(Clone)]
633pub struct ManagementState {
634    /// Collection of mock configurations
635    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636    /// Optional OpenAPI specification
637    pub spec: Option<Arc<OpenApiSpec>>,
638    /// Optional path to the OpenAPI spec file
639    pub spec_path: Option<String>,
640    /// Server port number
641    pub port: u16,
642    /// Server start time for uptime calculation
643    pub start_time: std::time::Instant,
644    /// Counter for total requests processed
645    pub request_counter: Arc<RwLock<u64>>,
646    /// Optional proxy configuration for migration pipeline
647    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648    /// Optional SMTP registry for email mocking
649    #[cfg(feature = "smtp")]
650    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651    /// Optional MQTT broker for message mocking
652    #[cfg(feature = "mqtt")]
653    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654    /// Optional Kafka broker for event streaming
655    #[cfg(feature = "kafka")]
656    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657    /// Broadcast channel for message events (MQTT & Kafka)
658    #[cfg(any(feature = "mqtt", feature = "kafka"))]
659    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660    /// State machine manager for scenario state machines
661    pub state_machine_manager:
662        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663    /// Optional WebSocket broadcast channel for real-time updates
664    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665    /// Lifecycle hook registry for extensibility
666    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667    /// Rule explanations storage (in-memory for now)
668    pub rule_explanations: Arc<
669        RwLock<
670            std::collections::HashMap<
671                String,
672                mockforge_core::intelligent_behavior::RuleExplanation,
673            >,
674        >,
675    >,
676    /// Optional chaos API state for chaos config management
677    #[cfg(feature = "chaos")]
678    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679    /// Optional server configuration for profile application
680    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684    /// Create a new management state
685    ///
686    /// # Arguments
687    /// * `spec` - Optional OpenAPI specification
688    /// * `spec_path` - Optional path to the OpenAPI spec file
689    /// * `port` - Server port number
690    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691        Self {
692            mocks: Arc::new(RwLock::new(Vec::new())),
693            spec,
694            spec_path,
695            port,
696            start_time: std::time::Instant::now(),
697            request_counter: Arc::new(RwLock::new(0)),
698            proxy_config: None,
699            #[cfg(feature = "smtp")]
700            smtp_registry: None,
701            #[cfg(feature = "mqtt")]
702            mqtt_broker: None,
703            #[cfg(feature = "kafka")]
704            kafka_broker: None,
705            #[cfg(any(feature = "mqtt", feature = "kafka"))]
706            message_events: {
707                let capacity = get_message_broadcast_capacity();
708                let (tx, _) = broadcast::channel(capacity);
709                Arc::new(tx)
710            },
711            state_machine_manager: Arc::new(RwLock::new(
712                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713            )),
714            ws_broadcast: None,
715            lifecycle_hooks: None,
716            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717            #[cfg(feature = "chaos")]
718            chaos_api_state: None,
719            server_config: None,
720        }
721    }
722
723    /// Add lifecycle hook registry to management state
724    pub fn with_lifecycle_hooks(
725        mut self,
726        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727    ) -> Self {
728        self.lifecycle_hooks = Some(hooks);
729        self
730    }
731
732    /// Add WebSocket broadcast channel to management state
733    pub fn with_ws_broadcast(
734        mut self,
735        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736    ) -> Self {
737        self.ws_broadcast = Some(ws_broadcast);
738        self
739    }
740
741    /// Add proxy configuration to management state
742    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743        self.proxy_config = Some(proxy_config);
744        self
745    }
746
747    #[cfg(feature = "smtp")]
748    /// Add SMTP registry to management state
749    pub fn with_smtp_registry(
750        mut self,
751        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752    ) -> Self {
753        self.smtp_registry = Some(smtp_registry);
754        self
755    }
756
757    #[cfg(feature = "mqtt")]
758    /// Add MQTT broker to management state
759    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760        self.mqtt_broker = Some(mqtt_broker);
761        self
762    }
763
764    #[cfg(feature = "kafka")]
765    /// Add Kafka broker to management state
766    pub fn with_kafka_broker(
767        mut self,
768        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769    ) -> Self {
770        self.kafka_broker = Some(kafka_broker);
771        self
772    }
773
774    #[cfg(feature = "chaos")]
775    /// Add chaos API state to management state
776    pub fn with_chaos_api_state(
777        mut self,
778        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779    ) -> Self {
780        self.chaos_api_state = Some(chaos_api_state);
781        self
782    }
783
784    /// Add server configuration to management state
785    pub fn with_server_config(
786        mut self,
787        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788    ) -> Self {
789        self.server_config = Some(server_config);
790        self
791    }
792}
793
794/// List all mocks
795async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796    let mocks = state.mocks.read().await;
797    Json(serde_json::json!({
798        "mocks": *mocks,
799        "total": mocks.len(),
800        "enabled": mocks.iter().filter(|m| m.enabled).count()
801    }))
802}
803
804/// Get a specific mock by ID
805async fn get_mock(
806    State(state): State<ManagementState>,
807    Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809    let mocks = state.mocks.read().await;
810    mocks
811        .iter()
812        .find(|m| m.id == id)
813        .cloned()
814        .map(Json)
815        .ok_or(StatusCode::NOT_FOUND)
816}
817
818/// Create a new mock
819async fn create_mock(
820    State(state): State<ManagementState>,
821    Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823    let mut mocks = state.mocks.write().await;
824
825    // Generate ID if not provided
826    if mock.id.is_empty() {
827        mock.id = uuid::Uuid::new_v4().to_string();
828    }
829
830    // Check for duplicate ID
831    if mocks.iter().any(|m| m.id == mock.id) {
832        return Err(StatusCode::CONFLICT);
833    }
834
835    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837    // Invoke lifecycle hooks
838    if let Some(hooks) = &state.lifecycle_hooks {
839        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840            id: mock.id.clone(),
841            name: mock.name.clone(),
842            config: serde_json::to_value(&mock).unwrap_or_default(),
843        };
844        hooks.invoke_mock_created(&event).await;
845    }
846
847    mocks.push(mock.clone());
848
849    // Broadcast WebSocket event
850    if let Some(tx) = &state.ws_broadcast {
851        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852    }
853
854    Ok(Json(mock))
855}
856
857/// Update an existing mock
858async fn update_mock(
859    State(state): State<ManagementState>,
860    Path(id): Path<String>,
861    Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863    let mut mocks = state.mocks.write().await;
864
865    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867    // Get old mock for comparison
868    let old_mock = mocks[position].clone();
869
870    info!("Updating mock: {}", id);
871    mocks[position] = updated_mock.clone();
872
873    // Invoke lifecycle hooks
874    if let Some(hooks) = &state.lifecycle_hooks {
875        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876            id: updated_mock.id.clone(),
877            name: updated_mock.name.clone(),
878            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879        };
880        hooks.invoke_mock_updated(&event).await;
881
882        // Check if enabled state changed
883        if old_mock.enabled != updated_mock.enabled {
884            let state_event = if updated_mock.enabled {
885                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886                    id: updated_mock.id.clone(),
887                }
888            } else {
889                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890                    id: updated_mock.id.clone(),
891                }
892            };
893            hooks.invoke_mock_state_changed(&state_event).await;
894        }
895    }
896
897    // Broadcast WebSocket event
898    if let Some(tx) = &state.ws_broadcast {
899        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900    }
901
902    Ok(Json(updated_mock))
903}
904
905/// Delete a mock
906async fn delete_mock(
907    State(state): State<ManagementState>,
908    Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910    let mut mocks = state.mocks.write().await;
911
912    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914    // Get mock info before deletion for lifecycle hooks
915    let deleted_mock = mocks[position].clone();
916
917    info!("Deleting mock: {}", id);
918    mocks.remove(position);
919
920    // Invoke lifecycle hooks
921    if let Some(hooks) = &state.lifecycle_hooks {
922        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923            id: deleted_mock.id.clone(),
924            name: deleted_mock.name.clone(),
925        };
926        hooks.invoke_mock_deleted(&event).await;
927    }
928
929    // Broadcast WebSocket event
930    if let Some(tx) = &state.ws_broadcast {
931        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932    }
933
934    Ok(StatusCode::NO_CONTENT)
935}
936
937/// Request to validate configuration
938#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940    /// Configuration to validate (as JSON)
941    pub config: serde_json::Value,
942    /// Format of the configuration ("json" or "yaml")
943    #[serde(default = "default_format")]
944    pub format: String,
945}
946
947fn default_format() -> String {
948    "json".to_string()
949}
950
951/// Validate configuration without applying it
952async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953    use mockforge_core::config::ServerConfig;
954
955    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956        "yaml" | "yml" => {
957            let yaml_str = match serde_json::to_string(&request.config) {
958                Ok(s) => s,
959                Err(e) => {
960                    return (
961                        StatusCode::BAD_REQUEST,
962                        Json(serde_json::json!({
963                            "valid": false,
964                            "error": format!("Failed to convert to string: {}", e),
965                            "message": "Configuration validation failed"
966                        })),
967                    )
968                        .into_response();
969                }
970            };
971            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972        }
973        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974    };
975
976    match config_result {
977        Ok(_) => Json(serde_json::json!({
978            "valid": true,
979            "message": "Configuration is valid"
980        }))
981        .into_response(),
982        Err(e) => (
983            StatusCode::BAD_REQUEST,
984            Json(serde_json::json!({
985                "valid": false,
986                "error": format!("Invalid configuration: {}", e),
987                "message": "Configuration validation failed"
988            })),
989        )
990            .into_response(),
991    }
992}
993
994/// Request for bulk configuration update
995#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997    /// Partial configuration updates (only specified fields will be updated)
998    pub updates: serde_json::Value,
999}
1000
1001/// Bulk update configuration
1002///
1003/// This endpoint allows updating multiple configuration options at once.
1004/// Only the specified fields in the updates object will be modified.
1005///
1006/// Configuration updates are applied to the server configuration if available
1007/// in ManagementState. Changes take effect immediately for supported settings.
1008async fn bulk_update_config(
1009    State(_state): State<ManagementState>,
1010    Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012    // Validate the updates structure
1013    if !request.updates.is_object() {
1014        return (
1015            StatusCode::BAD_REQUEST,
1016            Json(serde_json::json!({
1017                "error": "Invalid request",
1018                "message": "Updates must be a JSON object"
1019            })),
1020        )
1021            .into_response();
1022    }
1023
1024    // Try to validate as partial ServerConfig
1025    use mockforge_core::config::ServerConfig;
1026
1027    // Create a minimal valid config and try to merge updates
1028    let base_config = ServerConfig::default();
1029    let base_json = match serde_json::to_value(&base_config) {
1030        Ok(v) => v,
1031        Err(e) => {
1032            return (
1033                StatusCode::INTERNAL_SERVER_ERROR,
1034                Json(serde_json::json!({
1035                    "error": "Internal error",
1036                    "message": format!("Failed to serialize base config: {}", e)
1037                })),
1038            )
1039                .into_response();
1040        }
1041    };
1042
1043    // Merge updates into base config (simplified merge)
1044    let mut merged = base_json.clone();
1045    if let (Some(merged_obj), Some(updates_obj)) =
1046        (merged.as_object_mut(), request.updates.as_object())
1047    {
1048        for (key, value) in updates_obj {
1049            merged_obj.insert(key.clone(), value.clone());
1050        }
1051    }
1052
1053    // Validate the merged config
1054    match serde_json::from_value::<ServerConfig>(merged) {
1055        Ok(_) => {
1056            // Config is valid
1057            // Note: Runtime application of config changes would require:
1058            // 1. Storing ServerConfig in ManagementState
1059            // 2. Implementing hot-reload mechanism for server configuration
1060            // 3. Updating router state and middleware based on new config
1061            // For now, this endpoint only validates the configuration structure
1062            Json(serde_json::json!({
1063                "success": true,
1064                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
1065                "updates_received": request.updates,
1066                "validated": true
1067            }))
1068            .into_response()
1069        }
1070        Err(e) => (
1071            StatusCode::BAD_REQUEST,
1072            Json(serde_json::json!({
1073                "error": "Invalid configuration",
1074                "message": format!("Configuration validation failed: {}", e),
1075                "validated": false
1076            })),
1077        )
1078            .into_response(),
1079    }
1080}
1081
1082/// Get server statistics
1083async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1084    let mocks = state.mocks.read().await;
1085    let request_count = *state.request_counter.read().await;
1086
1087    Json(ServerStats {
1088        uptime_seconds: state.start_time.elapsed().as_secs(),
1089        total_requests: request_count,
1090        active_mocks: mocks.len(),
1091        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1092        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1093    })
1094}
1095
1096/// Get server configuration
1097async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1098    Json(ServerConfig {
1099        version: env!("CARGO_PKG_VERSION").to_string(),
1100        port: state.port,
1101        has_openapi_spec: state.spec.is_some(),
1102        spec_path: state.spec_path.clone(),
1103    })
1104}
1105
1106/// Health check endpoint
1107async fn health_check() -> Json<serde_json::Value> {
1108    Json(serde_json::json!({
1109        "status": "healthy",
1110        "service": "mockforge-management",
1111        "timestamp": chrono::Utc::now().to_rfc3339()
1112    }))
1113}
1114
1115/// Export format for mock configurations
1116#[derive(Debug, Clone, Serialize, Deserialize)]
1117#[serde(rename_all = "lowercase")]
1118pub enum ExportFormat {
1119    /// JSON format
1120    Json,
1121    /// YAML format
1122    Yaml,
1123}
1124
1125/// Export mocks in specified format
1126async fn export_mocks(
1127    State(state): State<ManagementState>,
1128    Query(params): Query<std::collections::HashMap<String, String>>,
1129) -> Result<(StatusCode, String), StatusCode> {
1130    let mocks = state.mocks.read().await;
1131
1132    let format = params
1133        .get("format")
1134        .map(|f| match f.as_str() {
1135            "yaml" | "yml" => ExportFormat::Yaml,
1136            _ => ExportFormat::Json,
1137        })
1138        .unwrap_or(ExportFormat::Json);
1139
1140    match format {
1141        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1142            .map(|json| (StatusCode::OK, json))
1143            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1144        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1145            .map(|yaml| (StatusCode::OK, yaml))
1146            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1147    }
1148}
1149
1150/// Import mocks from JSON/YAML
1151async fn import_mocks(
1152    State(state): State<ManagementState>,
1153    Json(mocks): Json<Vec<MockConfig>>,
1154) -> impl IntoResponse {
1155    let mut current_mocks = state.mocks.write().await;
1156    current_mocks.clear();
1157    current_mocks.extend(mocks);
1158    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1159}
1160
1161#[cfg(feature = "smtp")]
1162/// List SMTP emails in mailbox
1163async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1164    if let Some(ref smtp_registry) = state.smtp_registry {
1165        match smtp_registry.get_emails() {
1166            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1167            Err(e) => (
1168                StatusCode::INTERNAL_SERVER_ERROR,
1169                Json(serde_json::json!({
1170                    "error": "Failed to retrieve emails",
1171                    "message": e.to_string()
1172                })),
1173            ),
1174        }
1175    } else {
1176        (
1177            StatusCode::NOT_IMPLEMENTED,
1178            Json(serde_json::json!({
1179                "error": "SMTP mailbox management not available",
1180                "message": "SMTP server is not enabled or registry not available."
1181            })),
1182        )
1183    }
1184}
1185
1186/// Get specific SMTP email
1187#[cfg(feature = "smtp")]
1188async fn get_smtp_email(
1189    State(state): State<ManagementState>,
1190    Path(id): Path<String>,
1191) -> impl IntoResponse {
1192    if let Some(ref smtp_registry) = state.smtp_registry {
1193        match smtp_registry.get_email_by_id(&id) {
1194            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1195            Ok(None) => (
1196                StatusCode::NOT_FOUND,
1197                Json(serde_json::json!({
1198                    "error": "Email not found",
1199                    "id": id
1200                })),
1201            ),
1202            Err(e) => (
1203                StatusCode::INTERNAL_SERVER_ERROR,
1204                Json(serde_json::json!({
1205                    "error": "Failed to retrieve email",
1206                    "message": e.to_string()
1207                })),
1208            ),
1209        }
1210    } else {
1211        (
1212            StatusCode::NOT_IMPLEMENTED,
1213            Json(serde_json::json!({
1214                "error": "SMTP mailbox management not available",
1215                "message": "SMTP server is not enabled or registry not available."
1216            })),
1217        )
1218    }
1219}
1220
1221/// Clear SMTP mailbox
1222#[cfg(feature = "smtp")]
1223async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1224    if let Some(ref smtp_registry) = state.smtp_registry {
1225        match smtp_registry.clear_mailbox() {
1226            Ok(()) => (
1227                StatusCode::OK,
1228                Json(serde_json::json!({
1229                    "message": "Mailbox cleared successfully"
1230                })),
1231            ),
1232            Err(e) => (
1233                StatusCode::INTERNAL_SERVER_ERROR,
1234                Json(serde_json::json!({
1235                    "error": "Failed to clear mailbox",
1236                    "message": e.to_string()
1237                })),
1238            ),
1239        }
1240    } else {
1241        (
1242            StatusCode::NOT_IMPLEMENTED,
1243            Json(serde_json::json!({
1244                "error": "SMTP mailbox management not available",
1245                "message": "SMTP server is not enabled or registry not available."
1246            })),
1247        )
1248    }
1249}
1250
1251/// Export SMTP mailbox
1252#[cfg(feature = "smtp")]
1253async fn export_smtp_mailbox(
1254    Query(params): Query<std::collections::HashMap<String, String>>,
1255) -> impl IntoResponse {
1256    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1257    (
1258        StatusCode::NOT_IMPLEMENTED,
1259        Json(serde_json::json!({
1260            "error": "SMTP mailbox management not available via HTTP API",
1261            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1262            "requested_format": format
1263        })),
1264    )
1265}
1266
1267/// Search SMTP emails
1268#[cfg(feature = "smtp")]
1269async fn search_smtp_emails(
1270    State(state): State<ManagementState>,
1271    Query(params): Query<std::collections::HashMap<String, String>>,
1272) -> impl IntoResponse {
1273    if let Some(ref smtp_registry) = state.smtp_registry {
1274        let filters = EmailSearchFilters {
1275            sender: params.get("sender").cloned(),
1276            recipient: params.get("recipient").cloned(),
1277            subject: params.get("subject").cloned(),
1278            body: params.get("body").cloned(),
1279            since: params
1280                .get("since")
1281                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1282                .map(|dt| dt.with_timezone(&chrono::Utc)),
1283            until: params
1284                .get("until")
1285                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1286                .map(|dt| dt.with_timezone(&chrono::Utc)),
1287            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1288            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1289        };
1290
1291        match smtp_registry.search_emails(filters) {
1292            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1293            Err(e) => (
1294                StatusCode::INTERNAL_SERVER_ERROR,
1295                Json(serde_json::json!({
1296                    "error": "Failed to search emails",
1297                    "message": e.to_string()
1298                })),
1299            ),
1300        }
1301    } else {
1302        (
1303            StatusCode::NOT_IMPLEMENTED,
1304            Json(serde_json::json!({
1305                "error": "SMTP mailbox management not available",
1306                "message": "SMTP server is not enabled or registry not available."
1307            })),
1308        )
1309    }
1310}
1311
1312/// MQTT broker statistics
1313#[cfg(feature = "mqtt")]
1314#[derive(Debug, Clone, Serialize, Deserialize)]
1315pub struct MqttBrokerStats {
1316    /// Number of connected MQTT clients
1317    pub connected_clients: usize,
1318    /// Number of active MQTT topics
1319    pub active_topics: usize,
1320    /// Number of retained messages
1321    pub retained_messages: usize,
1322    /// Total number of subscriptions
1323    pub total_subscriptions: usize,
1324}
1325
1326/// MQTT management handlers
1327#[cfg(feature = "mqtt")]
1328async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1329    if let Some(broker) = &state.mqtt_broker {
1330        let connected_clients = broker.get_connected_clients().await.len();
1331        let active_topics = broker.get_active_topics().await.len();
1332        let stats = broker.get_topic_stats().await;
1333
1334        let broker_stats = MqttBrokerStats {
1335            connected_clients,
1336            active_topics,
1337            retained_messages: stats.retained_messages,
1338            total_subscriptions: stats.total_subscriptions,
1339        };
1340
1341        Json(broker_stats).into_response()
1342    } else {
1343        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1344    }
1345}
1346
1347#[cfg(feature = "mqtt")]
1348async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1349    if let Some(broker) = &state.mqtt_broker {
1350        let clients = broker.get_connected_clients().await;
1351        Json(serde_json::json!({
1352            "clients": clients
1353        }))
1354        .into_response()
1355    } else {
1356        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1357    }
1358}
1359
1360#[cfg(feature = "mqtt")]
1361async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1362    if let Some(broker) = &state.mqtt_broker {
1363        let topics = broker.get_active_topics().await;
1364        Json(serde_json::json!({
1365            "topics": topics
1366        }))
1367        .into_response()
1368    } else {
1369        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1370    }
1371}
1372
1373#[cfg(feature = "mqtt")]
1374async fn disconnect_mqtt_client(
1375    State(state): State<ManagementState>,
1376    Path(client_id): Path<String>,
1377) -> impl IntoResponse {
1378    if let Some(broker) = &state.mqtt_broker {
1379        match broker.disconnect_client(&client_id).await {
1380            Ok(_) => {
1381                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1382            }
1383            Err(e) => {
1384                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1385                    .into_response()
1386            }
1387        }
1388    } else {
1389        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1390    }
1391}
1392
1393// ========== MQTT Publish Handler ==========
1394
1395#[cfg(feature = "mqtt")]
1396/// Request to publish a single MQTT message
1397#[derive(Debug, Deserialize)]
1398pub struct MqttPublishRequest {
1399    /// Topic to publish to
1400    pub topic: String,
1401    /// Message payload (string or JSON)
1402    pub payload: String,
1403    /// QoS level (0, 1, or 2)
1404    #[serde(default = "default_qos")]
1405    pub qos: u8,
1406    /// Whether to retain the message
1407    #[serde(default)]
1408    pub retain: bool,
1409}
1410
1411#[cfg(feature = "mqtt")]
1412fn default_qos() -> u8 {
1413    0
1414}
1415
1416#[cfg(feature = "mqtt")]
1417/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1418async fn publish_mqtt_message_handler(
1419    State(state): State<ManagementState>,
1420    Json(request): Json<serde_json::Value>,
1421) -> impl IntoResponse {
1422    // Extract fields from JSON manually
1423    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1424    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1425    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1426    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1427
1428    if topic.is_none() || payload.is_none() {
1429        return (
1430            StatusCode::BAD_REQUEST,
1431            Json(serde_json::json!({
1432                "error": "Invalid request",
1433                "message": "Missing required fields: topic and payload"
1434            })),
1435        );
1436    }
1437
1438    let topic = topic.unwrap();
1439    let payload = payload.unwrap();
1440
1441    if let Some(broker) = &state.mqtt_broker {
1442        // Validate QoS
1443        if qos > 2 {
1444            return (
1445                StatusCode::BAD_REQUEST,
1446                Json(serde_json::json!({
1447                    "error": "Invalid QoS",
1448                    "message": "QoS must be 0, 1, or 2"
1449                })),
1450            );
1451        }
1452
1453        // Convert payload to bytes
1454        let payload_bytes = payload.as_bytes().to_vec();
1455        let client_id = "mockforge-management-api".to_string();
1456
1457        let publish_result = broker
1458            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1459            .await
1460            .map_err(|e| format!("{}", e));
1461
1462        match publish_result {
1463            Ok(_) => {
1464                // Emit message event for real-time monitoring
1465                let event = MessageEvent::Mqtt(MqttMessageEvent {
1466                    topic: topic.clone(),
1467                    payload: payload.clone(),
1468                    qos,
1469                    retain,
1470                    timestamp: chrono::Utc::now().to_rfc3339(),
1471                });
1472                let _ = state.message_events.send(event);
1473
1474                (
1475                    StatusCode::OK,
1476                    Json(serde_json::json!({
1477                        "success": true,
1478                        "message": format!("Message published to topic '{}'", topic),
1479                        "topic": topic,
1480                        "qos": qos,
1481                        "retain": retain
1482                    })),
1483                )
1484            }
1485            Err(error_msg) => (
1486                StatusCode::INTERNAL_SERVER_ERROR,
1487                Json(serde_json::json!({
1488                    "error": "Failed to publish message",
1489                    "message": error_msg
1490                })),
1491            ),
1492        }
1493    } else {
1494        (
1495            StatusCode::SERVICE_UNAVAILABLE,
1496            Json(serde_json::json!({
1497                "error": "MQTT broker not available",
1498                "message": "MQTT broker is not enabled or not available."
1499            })),
1500        )
1501    }
1502}
1503
1504#[cfg(not(feature = "mqtt"))]
1505/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1506async fn publish_mqtt_message_handler(
1507    State(_state): State<ManagementState>,
1508    Json(_request): Json<serde_json::Value>,
1509) -> impl IntoResponse {
1510    (
1511        StatusCode::SERVICE_UNAVAILABLE,
1512        Json(serde_json::json!({
1513            "error": "MQTT feature not enabled",
1514            "message": "MQTT support is not compiled into this build"
1515        })),
1516    )
1517}
1518
1519#[cfg(feature = "mqtt")]
1520/// Request to publish multiple MQTT messages
1521#[derive(Debug, Deserialize)]
1522pub struct MqttBatchPublishRequest {
1523    /// List of messages to publish
1524    pub messages: Vec<MqttPublishRequest>,
1525    /// Delay between messages in milliseconds
1526    #[serde(default = "default_delay")]
1527    pub delay_ms: u64,
1528}
1529
1530#[cfg(feature = "mqtt")]
1531fn default_delay() -> u64 {
1532    100
1533}
1534
1535#[cfg(feature = "mqtt")]
1536/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1537async fn publish_mqtt_batch_handler(
1538    State(state): State<ManagementState>,
1539    Json(request): Json<serde_json::Value>,
1540) -> impl IntoResponse {
1541    // Extract fields from JSON manually
1542    let messages_json = request.get("messages").and_then(|v| v.as_array());
1543    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1544
1545    if messages_json.is_none() {
1546        return (
1547            StatusCode::BAD_REQUEST,
1548            Json(serde_json::json!({
1549                "error": "Invalid request",
1550                "message": "Missing required field: messages"
1551            })),
1552        );
1553    }
1554
1555    let messages_json = messages_json.unwrap();
1556
1557    if let Some(broker) = &state.mqtt_broker {
1558        if messages_json.is_empty() {
1559            return (
1560                StatusCode::BAD_REQUEST,
1561                Json(serde_json::json!({
1562                    "error": "Empty batch",
1563                    "message": "At least one message is required"
1564                })),
1565            );
1566        }
1567
1568        let mut results = Vec::new();
1569        let client_id = "mockforge-management-api".to_string();
1570
1571        for (index, msg_json) in messages_json.iter().enumerate() {
1572            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1573            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1574            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1575            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1576
1577            if topic.is_none() || payload.is_none() {
1578                results.push(serde_json::json!({
1579                    "index": index,
1580                    "success": false,
1581                    "error": "Missing required fields: topic and payload"
1582                }));
1583                continue;
1584            }
1585
1586            let topic = topic.unwrap();
1587            let payload = payload.unwrap();
1588
1589            // Validate QoS
1590            if qos > 2 {
1591                results.push(serde_json::json!({
1592                    "index": index,
1593                    "success": false,
1594                    "error": "Invalid QoS (must be 0, 1, or 2)"
1595                }));
1596                continue;
1597            }
1598
1599            // Convert payload to bytes
1600            let payload_bytes = payload.as_bytes().to_vec();
1601
1602            let publish_result = broker
1603                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1604                .await
1605                .map_err(|e| format!("{}", e));
1606
1607            match publish_result {
1608                Ok(_) => {
1609                    // Emit message event
1610                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1611                        topic: topic.clone(),
1612                        payload: payload.clone(),
1613                        qos,
1614                        retain,
1615                        timestamp: chrono::Utc::now().to_rfc3339(),
1616                    });
1617                    let _ = state.message_events.send(event);
1618
1619                    results.push(serde_json::json!({
1620                        "index": index,
1621                        "success": true,
1622                        "topic": topic,
1623                        "qos": qos
1624                    }));
1625                }
1626                Err(error_msg) => {
1627                    results.push(serde_json::json!({
1628                        "index": index,
1629                        "success": false,
1630                        "error": error_msg
1631                    }));
1632                }
1633            }
1634
1635            // Add delay between messages (except for the last one)
1636            if index < messages_json.len() - 1 && delay_ms > 0 {
1637                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1638            }
1639        }
1640
1641        let success_count =
1642            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1643
1644        (
1645            StatusCode::OK,
1646            Json(serde_json::json!({
1647                "success": true,
1648                "total": messages_json.len(),
1649                "succeeded": success_count,
1650                "failed": messages_json.len() - success_count,
1651                "results": results
1652            })),
1653        )
1654    } else {
1655        (
1656            StatusCode::SERVICE_UNAVAILABLE,
1657            Json(serde_json::json!({
1658                "error": "MQTT broker not available",
1659                "message": "MQTT broker is not enabled or not available."
1660            })),
1661        )
1662    }
1663}
1664
1665#[cfg(not(feature = "mqtt"))]
1666/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1667async fn publish_mqtt_batch_handler(
1668    State(_state): State<ManagementState>,
1669    Json(_request): Json<serde_json::Value>,
1670) -> impl IntoResponse {
1671    (
1672        StatusCode::SERVICE_UNAVAILABLE,
1673        Json(serde_json::json!({
1674            "error": "MQTT feature not enabled",
1675            "message": "MQTT support is not compiled into this build"
1676        })),
1677    )
1678}
1679
1680// Migration pipeline handlers
1681
1682/// Request to set migration mode
1683#[derive(Debug, Deserialize)]
1684struct SetMigrationModeRequest {
1685    mode: String,
1686}
1687
1688/// Get all migration routes
1689async fn get_migration_routes(
1690    State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692    let proxy_config = match &state.proxy_config {
1693        Some(config) => config,
1694        None => {
1695            return Ok(Json(serde_json::json!({
1696                "error": "Migration not configured. Proxy config not available."
1697            })));
1698        }
1699    };
1700
1701    let config = proxy_config.read().await;
1702    let routes = config.get_migration_routes();
1703
1704    Ok(Json(serde_json::json!({
1705        "routes": routes
1706    })))
1707}
1708
1709/// Toggle a route's migration mode
1710async fn toggle_route_migration(
1711    State(state): State<ManagementState>,
1712    Path(pattern): Path<String>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714    let proxy_config = match &state.proxy_config {
1715        Some(config) => config,
1716        None => {
1717            return Ok(Json(serde_json::json!({
1718                "error": "Migration not configured. Proxy config not available."
1719            })));
1720        }
1721    };
1722
1723    let mut config = proxy_config.write().await;
1724    let new_mode = match config.toggle_route_migration(&pattern) {
1725        Some(mode) => mode,
1726        None => {
1727            return Ok(Json(serde_json::json!({
1728                "error": format!("Route pattern not found: {}", pattern)
1729            })));
1730        }
1731    };
1732
1733    Ok(Json(serde_json::json!({
1734        "pattern": pattern,
1735        "mode": format!("{:?}", new_mode).to_lowercase()
1736    })))
1737}
1738
1739/// Set a route's migration mode explicitly
1740async fn set_route_migration_mode(
1741    State(state): State<ManagementState>,
1742    Path(pattern): Path<String>,
1743    Json(request): Json<SetMigrationModeRequest>,
1744) -> Result<Json<serde_json::Value>, StatusCode> {
1745    let proxy_config = match &state.proxy_config {
1746        Some(config) => config,
1747        None => {
1748            return Ok(Json(serde_json::json!({
1749                "error": "Migration not configured. Proxy config not available."
1750            })));
1751        }
1752    };
1753
1754    use mockforge_core::proxy::config::MigrationMode;
1755    let mode = match request.mode.to_lowercase().as_str() {
1756        "mock" => MigrationMode::Mock,
1757        "shadow" => MigrationMode::Shadow,
1758        "real" => MigrationMode::Real,
1759        "auto" => MigrationMode::Auto,
1760        _ => {
1761            return Ok(Json(serde_json::json!({
1762                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1763            })));
1764        }
1765    };
1766
1767    let mut config = proxy_config.write().await;
1768    let updated = config.update_rule_migration_mode(&pattern, mode);
1769
1770    if !updated {
1771        return Ok(Json(serde_json::json!({
1772            "error": format!("Route pattern not found: {}", pattern)
1773        })));
1774    }
1775
1776    Ok(Json(serde_json::json!({
1777        "pattern": pattern,
1778        "mode": format!("{:?}", mode).to_lowercase()
1779    })))
1780}
1781
1782/// Toggle a group's migration mode
1783async fn toggle_group_migration(
1784    State(state): State<ManagementState>,
1785    Path(group): Path<String>,
1786) -> Result<Json<serde_json::Value>, StatusCode> {
1787    let proxy_config = match &state.proxy_config {
1788        Some(config) => config,
1789        None => {
1790            return Ok(Json(serde_json::json!({
1791                "error": "Migration not configured. Proxy config not available."
1792            })));
1793        }
1794    };
1795
1796    let mut config = proxy_config.write().await;
1797    let new_mode = config.toggle_group_migration(&group);
1798
1799    Ok(Json(serde_json::json!({
1800        "group": group,
1801        "mode": format!("{:?}", new_mode).to_lowercase()
1802    })))
1803}
1804
1805/// Set a group's migration mode explicitly
1806async fn set_group_migration_mode(
1807    State(state): State<ManagementState>,
1808    Path(group): Path<String>,
1809    Json(request): Json<SetMigrationModeRequest>,
1810) -> Result<Json<serde_json::Value>, StatusCode> {
1811    let proxy_config = match &state.proxy_config {
1812        Some(config) => config,
1813        None => {
1814            return Ok(Json(serde_json::json!({
1815                "error": "Migration not configured. Proxy config not available."
1816            })));
1817        }
1818    };
1819
1820    use mockforge_core::proxy::config::MigrationMode;
1821    let mode = match request.mode.to_lowercase().as_str() {
1822        "mock" => MigrationMode::Mock,
1823        "shadow" => MigrationMode::Shadow,
1824        "real" => MigrationMode::Real,
1825        "auto" => MigrationMode::Auto,
1826        _ => {
1827            return Ok(Json(serde_json::json!({
1828                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1829            })));
1830        }
1831    };
1832
1833    let mut config = proxy_config.write().await;
1834    config.update_group_migration_mode(&group, mode);
1835
1836    Ok(Json(serde_json::json!({
1837        "group": group,
1838        "mode": format!("{:?}", mode).to_lowercase()
1839    })))
1840}
1841
1842/// Get all migration groups
1843async fn get_migration_groups(
1844    State(state): State<ManagementState>,
1845) -> Result<Json<serde_json::Value>, StatusCode> {
1846    let proxy_config = match &state.proxy_config {
1847        Some(config) => config,
1848        None => {
1849            return Ok(Json(serde_json::json!({
1850                "error": "Migration not configured. Proxy config not available."
1851            })));
1852        }
1853    };
1854
1855    let config = proxy_config.read().await;
1856    let groups = config.get_migration_groups();
1857
1858    // Convert to JSON-serializable format
1859    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1860        .into_iter()
1861        .map(|(name, info)| {
1862            (
1863                name,
1864                serde_json::json!({
1865                    "name": info.name,
1866                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1867                    "route_count": info.route_count
1868                }),
1869            )
1870        })
1871        .collect();
1872
1873    Ok(Json(serde_json::json!(groups_json)))
1874}
1875
1876/// Get overall migration status
1877async fn get_migration_status(
1878    State(state): State<ManagementState>,
1879) -> Result<Json<serde_json::Value>, StatusCode> {
1880    let proxy_config = match &state.proxy_config {
1881        Some(config) => config,
1882        None => {
1883            return Ok(Json(serde_json::json!({
1884                "error": "Migration not configured. Proxy config not available."
1885            })));
1886        }
1887    };
1888
1889    let config = proxy_config.read().await;
1890    let routes = config.get_migration_routes();
1891    let groups = config.get_migration_groups();
1892
1893    let mut mock_count = 0;
1894    let mut shadow_count = 0;
1895    let mut real_count = 0;
1896    let mut auto_count = 0;
1897
1898    for route in &routes {
1899        match route.migration_mode {
1900            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1901            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1902            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1903            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1904        }
1905    }
1906
1907    Ok(Json(serde_json::json!({
1908        "total_routes": routes.len(),
1909        "mock_routes": mock_count,
1910        "shadow_routes": shadow_count,
1911        "real_routes": real_count,
1912        "auto_routes": auto_count,
1913        "total_groups": groups.len(),
1914        "migration_enabled": config.migration_enabled
1915    })))
1916}
1917
1918// ========== Proxy Replacement Rules Management ==========
1919
1920/// Request body for creating/updating proxy replacement rules
1921#[derive(Debug, Deserialize, Serialize)]
1922pub struct ProxyRuleRequest {
1923    /// URL pattern to match (supports wildcards like "/api/users/*")
1924    pub pattern: String,
1925    /// Rule type: "request" or "response"
1926    #[serde(rename = "type")]
1927    pub rule_type: String,
1928    /// Optional status code filter for response rules
1929    #[serde(default)]
1930    pub status_codes: Vec<u16>,
1931    /// Body transformations to apply
1932    pub body_transforms: Vec<BodyTransformRequest>,
1933    /// Whether this rule is enabled
1934    #[serde(default = "default_true")]
1935    pub enabled: bool,
1936}
1937
1938/// Request body for individual body transformations
1939#[derive(Debug, Deserialize, Serialize)]
1940pub struct BodyTransformRequest {
1941    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1942    pub path: String,
1943    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1944    pub replace: String,
1945    /// Operation to perform: "replace", "add", or "remove"
1946    #[serde(default)]
1947    pub operation: String,
1948}
1949
1950/// Response format for proxy rules
1951#[derive(Debug, Serialize)]
1952pub struct ProxyRuleResponse {
1953    /// Rule ID (index in the array)
1954    pub id: usize,
1955    /// URL pattern
1956    pub pattern: String,
1957    /// Rule type
1958    #[serde(rename = "type")]
1959    pub rule_type: String,
1960    /// Status codes (for response rules)
1961    pub status_codes: Vec<u16>,
1962    /// Body transformations
1963    pub body_transforms: Vec<BodyTransformRequest>,
1964    /// Whether enabled
1965    pub enabled: bool,
1966}
1967
1968/// List all proxy replacement rules
1969async fn list_proxy_rules(
1970    State(state): State<ManagementState>,
1971) -> Result<Json<serde_json::Value>, StatusCode> {
1972    let proxy_config = match &state.proxy_config {
1973        Some(config) => config,
1974        None => {
1975            return Ok(Json(serde_json::json!({
1976                "error": "Proxy not configured. Proxy config not available."
1977            })));
1978        }
1979    };
1980
1981    let config = proxy_config.read().await;
1982
1983    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1984
1985    // Add request replacement rules
1986    for (idx, rule) in config.request_replacements.iter().enumerate() {
1987        rules.push(ProxyRuleResponse {
1988            id: idx,
1989            pattern: rule.pattern.clone(),
1990            rule_type: "request".to_string(),
1991            status_codes: Vec::new(),
1992            body_transforms: rule
1993                .body_transforms
1994                .iter()
1995                .map(|t| BodyTransformRequest {
1996                    path: t.path.clone(),
1997                    replace: t.replace.clone(),
1998                    operation: format!("{:?}", t.operation).to_lowercase(),
1999                })
2000                .collect(),
2001            enabled: rule.enabled,
2002        });
2003    }
2004
2005    // Add response replacement rules
2006    let request_count = config.request_replacements.len();
2007    for (idx, rule) in config.response_replacements.iter().enumerate() {
2008        rules.push(ProxyRuleResponse {
2009            id: request_count + idx,
2010            pattern: rule.pattern.clone(),
2011            rule_type: "response".to_string(),
2012            status_codes: rule.status_codes.clone(),
2013            body_transforms: rule
2014                .body_transforms
2015                .iter()
2016                .map(|t| BodyTransformRequest {
2017                    path: t.path.clone(),
2018                    replace: t.replace.clone(),
2019                    operation: format!("{:?}", t.operation).to_lowercase(),
2020                })
2021                .collect(),
2022            enabled: rule.enabled,
2023        });
2024    }
2025
2026    Ok(Json(serde_json::json!({
2027        "rules": rules
2028    })))
2029}
2030
2031/// Create a new proxy replacement rule
2032async fn create_proxy_rule(
2033    State(state): State<ManagementState>,
2034    Json(request): Json<ProxyRuleRequest>,
2035) -> Result<Json<serde_json::Value>, StatusCode> {
2036    let proxy_config = match &state.proxy_config {
2037        Some(config) => config,
2038        None => {
2039            return Ok(Json(serde_json::json!({
2040                "error": "Proxy not configured. Proxy config not available."
2041            })));
2042        }
2043    };
2044
2045    // Validate request
2046    if request.body_transforms.is_empty() {
2047        return Ok(Json(serde_json::json!({
2048            "error": "At least one body transform is required"
2049        })));
2050    }
2051
2052    let body_transforms: Vec<BodyTransform> = request
2053        .body_transforms
2054        .iter()
2055        .map(|t| {
2056            let op = match t.operation.as_str() {
2057                "replace" => TransformOperation::Replace,
2058                "add" => TransformOperation::Add,
2059                "remove" => TransformOperation::Remove,
2060                _ => TransformOperation::Replace,
2061            };
2062            BodyTransform {
2063                path: t.path.clone(),
2064                replace: t.replace.clone(),
2065                operation: op,
2066            }
2067        })
2068        .collect();
2069
2070    let new_rule = BodyTransformRule {
2071        pattern: request.pattern.clone(),
2072        status_codes: request.status_codes.clone(),
2073        body_transforms,
2074        enabled: request.enabled,
2075    };
2076
2077    let mut config = proxy_config.write().await;
2078
2079    let rule_id = if request.rule_type == "request" {
2080        config.request_replacements.push(new_rule);
2081        config.request_replacements.len() - 1
2082    } else if request.rule_type == "response" {
2083        config.response_replacements.push(new_rule);
2084        config.request_replacements.len() + config.response_replacements.len() - 1
2085    } else {
2086        return Ok(Json(serde_json::json!({
2087            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2088        })));
2089    };
2090
2091    Ok(Json(serde_json::json!({
2092        "id": rule_id,
2093        "message": "Rule created successfully"
2094    })))
2095}
2096
2097/// Get a specific proxy replacement rule
2098async fn get_proxy_rule(
2099    State(state): State<ManagementState>,
2100    Path(id): Path<String>,
2101) -> Result<Json<serde_json::Value>, StatusCode> {
2102    let proxy_config = match &state.proxy_config {
2103        Some(config) => config,
2104        None => {
2105            return Ok(Json(serde_json::json!({
2106                "error": "Proxy not configured. Proxy config not available."
2107            })));
2108        }
2109    };
2110
2111    let config = proxy_config.read().await;
2112    let rule_id: usize = match id.parse() {
2113        Ok(id) => id,
2114        Err(_) => {
2115            return Ok(Json(serde_json::json!({
2116                "error": format!("Invalid rule ID: {}", id)
2117            })));
2118        }
2119    };
2120
2121    let request_count = config.request_replacements.len();
2122
2123    if rule_id < request_count {
2124        // Request rule
2125        let rule = &config.request_replacements[rule_id];
2126        Ok(Json(serde_json::json!({
2127            "id": rule_id,
2128            "pattern": rule.pattern,
2129            "type": "request",
2130            "status_codes": [],
2131            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2132                "path": t.path,
2133                "replace": t.replace,
2134                "operation": format!("{:?}", t.operation).to_lowercase()
2135            })).collect::<Vec<_>>(),
2136            "enabled": rule.enabled
2137        })))
2138    } else if rule_id < request_count + config.response_replacements.len() {
2139        // Response rule
2140        let response_idx = rule_id - request_count;
2141        let rule = &config.response_replacements[response_idx];
2142        Ok(Json(serde_json::json!({
2143            "id": rule_id,
2144            "pattern": rule.pattern,
2145            "type": "response",
2146            "status_codes": rule.status_codes,
2147            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2148                "path": t.path,
2149                "replace": t.replace,
2150                "operation": format!("{:?}", t.operation).to_lowercase()
2151            })).collect::<Vec<_>>(),
2152            "enabled": rule.enabled
2153        })))
2154    } else {
2155        Ok(Json(serde_json::json!({
2156            "error": format!("Rule ID {} not found", rule_id)
2157        })))
2158    }
2159}
2160
2161/// Update a proxy replacement rule
2162async fn update_proxy_rule(
2163    State(state): State<ManagementState>,
2164    Path(id): Path<String>,
2165    Json(request): Json<ProxyRuleRequest>,
2166) -> Result<Json<serde_json::Value>, StatusCode> {
2167    let proxy_config = match &state.proxy_config {
2168        Some(config) => config,
2169        None => {
2170            return Ok(Json(serde_json::json!({
2171                "error": "Proxy not configured. Proxy config not available."
2172            })));
2173        }
2174    };
2175
2176    let mut config = proxy_config.write().await;
2177    let rule_id: usize = match id.parse() {
2178        Ok(id) => id,
2179        Err(_) => {
2180            return Ok(Json(serde_json::json!({
2181                "error": format!("Invalid rule ID: {}", id)
2182            })));
2183        }
2184    };
2185
2186    let body_transforms: Vec<BodyTransform> = request
2187        .body_transforms
2188        .iter()
2189        .map(|t| {
2190            let op = match t.operation.as_str() {
2191                "replace" => TransformOperation::Replace,
2192                "add" => TransformOperation::Add,
2193                "remove" => TransformOperation::Remove,
2194                _ => TransformOperation::Replace,
2195            };
2196            BodyTransform {
2197                path: t.path.clone(),
2198                replace: t.replace.clone(),
2199                operation: op,
2200            }
2201        })
2202        .collect();
2203
2204    let updated_rule = BodyTransformRule {
2205        pattern: request.pattern.clone(),
2206        status_codes: request.status_codes.clone(),
2207        body_transforms,
2208        enabled: request.enabled,
2209    };
2210
2211    let request_count = config.request_replacements.len();
2212
2213    if rule_id < request_count {
2214        // Update request rule
2215        config.request_replacements[rule_id] = updated_rule;
2216    } else if rule_id < request_count + config.response_replacements.len() {
2217        // Update response rule
2218        let response_idx = rule_id - request_count;
2219        config.response_replacements[response_idx] = updated_rule;
2220    } else {
2221        return Ok(Json(serde_json::json!({
2222            "error": format!("Rule ID {} not found", rule_id)
2223        })));
2224    }
2225
2226    Ok(Json(serde_json::json!({
2227        "id": rule_id,
2228        "message": "Rule updated successfully"
2229    })))
2230}
2231
2232/// Delete a proxy replacement rule
2233async fn delete_proxy_rule(
2234    State(state): State<ManagementState>,
2235    Path(id): Path<String>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237    let proxy_config = match &state.proxy_config {
2238        Some(config) => config,
2239        None => {
2240            return Ok(Json(serde_json::json!({
2241                "error": "Proxy not configured. Proxy config not available."
2242            })));
2243        }
2244    };
2245
2246    let mut config = proxy_config.write().await;
2247    let rule_id: usize = match id.parse() {
2248        Ok(id) => id,
2249        Err(_) => {
2250            return Ok(Json(serde_json::json!({
2251                "error": format!("Invalid rule ID: {}", id)
2252            })));
2253        }
2254    };
2255
2256    let request_count = config.request_replacements.len();
2257
2258    if rule_id < request_count {
2259        // Delete request rule
2260        config.request_replacements.remove(rule_id);
2261    } else if rule_id < request_count + config.response_replacements.len() {
2262        // Delete response rule
2263        let response_idx = rule_id - request_count;
2264        config.response_replacements.remove(response_idx);
2265    } else {
2266        return Ok(Json(serde_json::json!({
2267            "error": format!("Rule ID {} not found", rule_id)
2268        })));
2269    }
2270
2271    Ok(Json(serde_json::json!({
2272        "id": rule_id,
2273        "message": "Rule deleted successfully"
2274    })))
2275}
2276
2277/// Get proxy rules and transformation configuration for inspection
2278async fn get_proxy_inspect(
2279    State(state): State<ManagementState>,
2280    Query(params): Query<std::collections::HashMap<String, String>>,
2281) -> Result<Json<serde_json::Value>, StatusCode> {
2282    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2283    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2284
2285    let proxy_config = match &state.proxy_config {
2286        Some(config) => config.read().await,
2287        None => {
2288            return Ok(Json(serde_json::json!({
2289                "error": "Proxy not configured. Proxy config not available."
2290            })));
2291        }
2292    };
2293
2294    let mut rules = Vec::new();
2295    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2296        rules.push(serde_json::json!({
2297            "id": idx,
2298            "kind": "request",
2299            "pattern": rule.pattern,
2300            "enabled": rule.enabled,
2301            "status_codes": rule.status_codes,
2302            "transform_count": rule.body_transforms.len(),
2303            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2304                "path": t.path,
2305                "operation": t.operation,
2306                "replace": t.replace
2307            })).collect::<Vec<_>>()
2308        }));
2309    }
2310    let request_rule_count = rules.len();
2311    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2312        rules.push(serde_json::json!({
2313            "id": request_rule_count + idx,
2314            "kind": "response",
2315            "pattern": rule.pattern,
2316            "enabled": rule.enabled,
2317            "status_codes": rule.status_codes,
2318            "transform_count": rule.body_transforms.len(),
2319            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2320                "path": t.path,
2321                "operation": t.operation,
2322                "replace": t.replace
2323            })).collect::<Vec<_>>()
2324        }));
2325    }
2326
2327    let total = rules.len();
2328    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2329
2330    Ok(Json(serde_json::json!({
2331        "enabled": proxy_config.enabled,
2332        "target_url": proxy_config.target_url,
2333        "prefix": proxy_config.prefix,
2334        "timeout_seconds": proxy_config.timeout_seconds,
2335        "follow_redirects": proxy_config.follow_redirects,
2336        "passthrough_by_default": proxy_config.passthrough_by_default,
2337        "rules": paged_rules,
2338        "request_rule_count": request_rule_count,
2339        "response_rule_count": total.saturating_sub(request_rule_count),
2340        "limit": limit,
2341        "offset": offset,
2342        "total": total
2343    })))
2344}
2345
2346/// Build the management API router
2347pub fn management_router(state: ManagementState) -> Router {
2348    let router = Router::new()
2349        .route("/health", get(health_check))
2350        .route("/stats", get(get_stats))
2351        .route("/config", get(get_config))
2352        .route("/config/validate", post(validate_config))
2353        .route("/config/bulk", post(bulk_update_config))
2354        .route("/mocks", get(list_mocks))
2355        .route("/mocks", post(create_mock))
2356        .route("/mocks/{id}", get(get_mock))
2357        .route("/mocks/{id}", put(update_mock))
2358        .route("/mocks/{id}", delete(delete_mock))
2359        .route("/export", get(export_mocks))
2360        .route("/import", post(import_mocks));
2361
2362    #[cfg(feature = "smtp")]
2363    let router = router
2364        .route("/smtp/mailbox", get(list_smtp_emails))
2365        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2366        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2367        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2368        .route("/smtp/mailbox/search", get(search_smtp_emails));
2369
2370    #[cfg(not(feature = "smtp"))]
2371    let router = router;
2372
2373    // MQTT routes
2374    #[cfg(feature = "mqtt")]
2375    let router = router
2376        .route("/mqtt/stats", get(get_mqtt_stats))
2377        .route("/mqtt/clients", get(get_mqtt_clients))
2378        .route("/mqtt/topics", get(get_mqtt_topics))
2379        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2380        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2381        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2382        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2383
2384    #[cfg(not(feature = "mqtt"))]
2385    let router = router
2386        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2387        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2388
2389    #[cfg(feature = "kafka")]
2390    let router = router
2391        .route("/kafka/stats", get(get_kafka_stats))
2392        .route("/kafka/topics", get(get_kafka_topics))
2393        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2394        .route("/kafka/groups", get(get_kafka_groups))
2395        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2396        .route("/kafka/produce", post(produce_kafka_message))
2397        .route("/kafka/produce/batch", post(produce_kafka_batch))
2398        .route("/kafka/messages/stream", get(kafka_messages_stream));
2399
2400    #[cfg(not(feature = "kafka"))]
2401    let router = router;
2402
2403    // Migration pipeline routes
2404    let router = router
2405        .route("/migration/routes", get(get_migration_routes))
2406        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2407        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2408        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2409        .route("/migration/groups/{group}", put(set_group_migration_mode))
2410        .route("/migration/groups", get(get_migration_groups))
2411        .route("/migration/status", get(get_migration_status));
2412
2413    // Proxy replacement rules routes
2414    let router = router
2415        .route("/proxy/rules", get(list_proxy_rules))
2416        .route("/proxy/rules", post(create_proxy_rule))
2417        .route("/proxy/rules/{id}", get(get_proxy_rule))
2418        .route("/proxy/rules/{id}", put(update_proxy_rule))
2419        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2420        .route("/proxy/inspect", get(get_proxy_inspect));
2421
2422    // AI-powered features
2423    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2424
2425    // Snapshot diff endpoints
2426    let router = router.nest(
2427        "/snapshot-diff",
2428        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2429    );
2430
2431    #[cfg(feature = "behavioral-cloning")]
2432    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2433
2434    let router = router
2435        .route("/mockai/learn", post(learn_from_examples))
2436        .route("/mockai/rules/explanations", get(list_rule_explanations))
2437        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2438        .route("/chaos/config", get(get_chaos_config))
2439        .route("/chaos/config", post(update_chaos_config))
2440        .route("/network/profiles", get(list_network_profiles))
2441        .route("/network/profile/apply", post(apply_network_profile));
2442
2443    // State machine API routes
2444    let router =
2445        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2446
2447    router.with_state(state)
2448}
2449
2450#[cfg(feature = "kafka")]
2451#[derive(Debug, Clone, Serialize, Deserialize)]
2452pub struct KafkaBrokerStats {
2453    /// Number of topics
2454    pub topics: usize,
2455    /// Total number of partitions
2456    pub partitions: usize,
2457    /// Number of consumer groups
2458    pub consumer_groups: usize,
2459    /// Total messages produced
2460    pub messages_produced: u64,
2461    /// Total messages consumed
2462    pub messages_consumed: u64,
2463}
2464
2465#[cfg(feature = "kafka")]
2466#[derive(Debug, Clone, Serialize, Deserialize)]
2467pub struct KafkaTopicInfo {
2468    pub name: String,
2469    pub partitions: usize,
2470    pub replication_factor: i32,
2471}
2472
2473#[cfg(feature = "kafka")]
2474#[derive(Debug, Clone, Serialize, Deserialize)]
2475pub struct KafkaConsumerGroupInfo {
2476    pub group_id: String,
2477    pub members: usize,
2478    pub state: String,
2479}
2480
2481#[cfg(feature = "kafka")]
2482/// Get Kafka broker statistics
2483async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2484    if let Some(broker) = &state.kafka_broker {
2485        let topics = broker.topics.read().await;
2486        let consumer_groups = broker.consumer_groups.read().await;
2487
2488        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2489
2490        // Get metrics snapshot for message counts
2491        let metrics_snapshot = broker.metrics().snapshot();
2492
2493        let stats = KafkaBrokerStats {
2494            topics: topics.len(),
2495            partitions: total_partitions,
2496            consumer_groups: consumer_groups.groups().len(),
2497            messages_produced: metrics_snapshot.messages_produced_total,
2498            messages_consumed: metrics_snapshot.messages_consumed_total,
2499        };
2500
2501        Json(stats).into_response()
2502    } else {
2503        (
2504            StatusCode::SERVICE_UNAVAILABLE,
2505            Json(serde_json::json!({
2506                "error": "Kafka broker not available",
2507                "message": "Kafka broker is not enabled or not available."
2508            })),
2509        )
2510            .into_response()
2511    }
2512}
2513
2514#[cfg(feature = "kafka")]
2515/// List Kafka topics
2516async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2517    if let Some(broker) = &state.kafka_broker {
2518        let topics = broker.topics.read().await;
2519        let topic_list: Vec<KafkaTopicInfo> = topics
2520            .iter()
2521            .map(|(name, topic)| KafkaTopicInfo {
2522                name: name.clone(),
2523                partitions: topic.partitions.len(),
2524                replication_factor: topic.config.replication_factor as i32,
2525            })
2526            .collect();
2527
2528        Json(serde_json::json!({
2529            "topics": topic_list
2530        }))
2531        .into_response()
2532    } else {
2533        (
2534            StatusCode::SERVICE_UNAVAILABLE,
2535            Json(serde_json::json!({
2536                "error": "Kafka broker not available",
2537                "message": "Kafka broker is not enabled or not available."
2538            })),
2539        )
2540            .into_response()
2541    }
2542}
2543
2544#[cfg(feature = "kafka")]
2545/// Get Kafka topic details
2546async fn get_kafka_topic(
2547    State(state): State<ManagementState>,
2548    Path(topic_name): Path<String>,
2549) -> impl IntoResponse {
2550    if let Some(broker) = &state.kafka_broker {
2551        let topics = broker.topics.read().await;
2552        if let Some(topic) = topics.get(&topic_name) {
2553            Json(serde_json::json!({
2554                "name": topic_name,
2555                "partitions": topic.partitions.len(),
2556                "replication_factor": topic.config.replication_factor,
2557                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2558                    "id": idx as i32,
2559                    "leader": 0,
2560                    "replicas": vec![0],
2561                    "message_count": partition.messages.len()
2562                })).collect::<Vec<_>>()
2563            })).into_response()
2564        } else {
2565            (
2566                StatusCode::NOT_FOUND,
2567                Json(serde_json::json!({
2568                    "error": "Topic not found",
2569                    "topic": topic_name
2570                })),
2571            )
2572                .into_response()
2573        }
2574    } else {
2575        (
2576            StatusCode::SERVICE_UNAVAILABLE,
2577            Json(serde_json::json!({
2578                "error": "Kafka broker not available",
2579                "message": "Kafka broker is not enabled or not available."
2580            })),
2581        )
2582            .into_response()
2583    }
2584}
2585
2586#[cfg(feature = "kafka")]
2587/// List Kafka consumer groups
2588async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2589    if let Some(broker) = &state.kafka_broker {
2590        let consumer_groups = broker.consumer_groups.read().await;
2591        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2592            .groups()
2593            .iter()
2594            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2595                group_id: group_id.clone(),
2596                members: group.members.len(),
2597                state: "Stable".to_string(), // Simplified - could be more detailed
2598            })
2599            .collect();
2600
2601        Json(serde_json::json!({
2602            "groups": groups
2603        }))
2604        .into_response()
2605    } else {
2606        (
2607            StatusCode::SERVICE_UNAVAILABLE,
2608            Json(serde_json::json!({
2609                "error": "Kafka broker not available",
2610                "message": "Kafka broker is not enabled or not available."
2611            })),
2612        )
2613            .into_response()
2614    }
2615}
2616
2617#[cfg(feature = "kafka")]
2618/// Get Kafka consumer group details
2619async fn get_kafka_group(
2620    State(state): State<ManagementState>,
2621    Path(group_id): Path<String>,
2622) -> impl IntoResponse {
2623    if let Some(broker) = &state.kafka_broker {
2624        let consumer_groups = broker.consumer_groups.read().await;
2625        if let Some(group) = consumer_groups.groups().get(&group_id) {
2626            Json(serde_json::json!({
2627                "group_id": group_id,
2628                "members": group.members.len(),
2629                "state": "Stable",
2630                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2631                    "member_id": member_id,
2632                    "client_id": member.client_id,
2633                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2634                        "topic": a.topic,
2635                        "partitions": a.partitions
2636                    })).collect::<Vec<_>>()
2637                })).collect::<Vec<_>>(),
2638                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2639                    "topic": topic,
2640                    "partition": partition,
2641                    "offset": offset
2642                })).collect::<Vec<_>>()
2643            })).into_response()
2644        } else {
2645            (
2646                StatusCode::NOT_FOUND,
2647                Json(serde_json::json!({
2648                    "error": "Consumer group not found",
2649                    "group_id": group_id
2650                })),
2651            )
2652                .into_response()
2653        }
2654    } else {
2655        (
2656            StatusCode::SERVICE_UNAVAILABLE,
2657            Json(serde_json::json!({
2658                "error": "Kafka broker not available",
2659                "message": "Kafka broker is not enabled or not available."
2660            })),
2661        )
2662            .into_response()
2663    }
2664}
2665
2666// ========== Kafka Produce Handler ==========
2667
2668#[cfg(feature = "kafka")]
2669#[derive(Debug, Deserialize)]
2670pub struct KafkaProduceRequest {
2671    /// Topic to produce to
2672    pub topic: String,
2673    /// Message key (optional)
2674    #[serde(default)]
2675    pub key: Option<String>,
2676    /// Message value (JSON string or plain string)
2677    pub value: String,
2678    /// Partition ID (optional, auto-assigned if not provided)
2679    #[serde(default)]
2680    pub partition: Option<i32>,
2681    /// Message headers (optional, key-value pairs)
2682    #[serde(default)]
2683    pub headers: Option<std::collections::HashMap<String, String>>,
2684}
2685
2686#[cfg(feature = "kafka")]
2687/// Produce a message to a Kafka topic
2688async fn produce_kafka_message(
2689    State(state): State<ManagementState>,
2690    Json(request): Json<KafkaProduceRequest>,
2691) -> impl IntoResponse {
2692    if let Some(broker) = &state.kafka_broker {
2693        let mut topics = broker.topics.write().await;
2694
2695        // Get or create the topic
2696        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2697            mockforge_kafka::topics::Topic::new(
2698                request.topic.clone(),
2699                mockforge_kafka::topics::TopicConfig::default(),
2700            )
2701        });
2702
2703        // Determine partition
2704        let partition_id = if let Some(partition) = request.partition {
2705            partition
2706        } else {
2707            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2708        };
2709
2710        // Validate partition exists
2711        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2712            return (
2713                StatusCode::BAD_REQUEST,
2714                Json(serde_json::json!({
2715                    "error": "Invalid partition",
2716                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2717                })),
2718            )
2719                .into_response();
2720        }
2721
2722        // Create the message
2723        let key_clone = request.key.clone();
2724        let headers_clone = request.headers.clone();
2725        let message = mockforge_kafka::partitions::KafkaMessage {
2726            offset: 0, // Will be set by partition.append
2727            timestamp: chrono::Utc::now().timestamp_millis(),
2728            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2729            value: request.value.as_bytes().to_vec(),
2730            headers: headers_clone
2731                .clone()
2732                .unwrap_or_default()
2733                .into_iter()
2734                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2735                .collect(),
2736        };
2737
2738        // Produce to partition
2739        match topic_entry.produce(partition_id, message).await {
2740            Ok(offset) => {
2741                // Record metrics for successful message production
2742                if let Some(broker) = &state.kafka_broker {
2743                    broker.metrics().record_messages_produced(1);
2744                }
2745
2746                // Emit message event for real-time monitoring
2747                #[cfg(feature = "kafka")]
2748                {
2749                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2750                        topic: request.topic.clone(),
2751                        key: key_clone,
2752                        value: request.value.clone(),
2753                        partition: partition_id,
2754                        offset,
2755                        headers: headers_clone,
2756                        timestamp: chrono::Utc::now().to_rfc3339(),
2757                    });
2758                    let _ = state.message_events.send(event);
2759                }
2760
2761                Json(serde_json::json!({
2762                    "success": true,
2763                    "message": format!("Message produced to topic '{}'", request.topic),
2764                    "topic": request.topic,
2765                    "partition": partition_id,
2766                    "offset": offset
2767                }))
2768                .into_response()
2769            }
2770            Err(e) => (
2771                StatusCode::INTERNAL_SERVER_ERROR,
2772                Json(serde_json::json!({
2773                    "error": "Failed to produce message",
2774                    "message": e.to_string()
2775                })),
2776            )
2777                .into_response(),
2778        }
2779    } else {
2780        (
2781            StatusCode::SERVICE_UNAVAILABLE,
2782            Json(serde_json::json!({
2783                "error": "Kafka broker not available",
2784                "message": "Kafka broker is not enabled or not available."
2785            })),
2786        )
2787            .into_response()
2788    }
2789}
2790
2791#[cfg(feature = "kafka")]
2792#[derive(Debug, Deserialize)]
2793pub struct KafkaBatchProduceRequest {
2794    /// List of messages to produce
2795    pub messages: Vec<KafkaProduceRequest>,
2796    /// Delay between messages in milliseconds
2797    #[serde(default = "default_delay")]
2798    pub delay_ms: u64,
2799}
2800
2801#[cfg(feature = "kafka")]
2802/// Produce multiple messages to Kafka topics
2803async fn produce_kafka_batch(
2804    State(state): State<ManagementState>,
2805    Json(request): Json<KafkaBatchProduceRequest>,
2806) -> impl IntoResponse {
2807    if let Some(broker) = &state.kafka_broker {
2808        if request.messages.is_empty() {
2809            return (
2810                StatusCode::BAD_REQUEST,
2811                Json(serde_json::json!({
2812                    "error": "Empty batch",
2813                    "message": "At least one message is required"
2814                })),
2815            )
2816                .into_response();
2817        }
2818
2819        let mut results = Vec::new();
2820
2821        for (index, msg_request) in request.messages.iter().enumerate() {
2822            let mut topics = broker.topics.write().await;
2823
2824            // Get or create the topic
2825            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2826                mockforge_kafka::topics::Topic::new(
2827                    msg_request.topic.clone(),
2828                    mockforge_kafka::topics::TopicConfig::default(),
2829                )
2830            });
2831
2832            // Determine partition
2833            let partition_id = if let Some(partition) = msg_request.partition {
2834                partition
2835            } else {
2836                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2837            };
2838
2839            // Validate partition exists
2840            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2841                results.push(serde_json::json!({
2842                    "index": index,
2843                    "success": false,
2844                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2845                }));
2846                continue;
2847            }
2848
2849            // Create the message
2850            let message = mockforge_kafka::partitions::KafkaMessage {
2851                offset: 0,
2852                timestamp: chrono::Utc::now().timestamp_millis(),
2853                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2854                value: msg_request.value.as_bytes().to_vec(),
2855                headers: msg_request
2856                    .headers
2857                    .clone()
2858                    .unwrap_or_default()
2859                    .into_iter()
2860                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2861                    .collect(),
2862            };
2863
2864            // Produce to partition
2865            match topic_entry.produce(partition_id, message).await {
2866                Ok(offset) => {
2867                    // Record metrics for successful message production
2868                    if let Some(broker) = &state.kafka_broker {
2869                        broker.metrics().record_messages_produced(1);
2870                    }
2871
2872                    // Emit message event
2873                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2874                        topic: msg_request.topic.clone(),
2875                        key: msg_request.key.clone(),
2876                        value: msg_request.value.clone(),
2877                        partition: partition_id,
2878                        offset,
2879                        headers: msg_request.headers.clone(),
2880                        timestamp: chrono::Utc::now().to_rfc3339(),
2881                    });
2882                    let _ = state.message_events.send(event);
2883
2884                    results.push(serde_json::json!({
2885                        "index": index,
2886                        "success": true,
2887                        "topic": msg_request.topic,
2888                        "partition": partition_id,
2889                        "offset": offset
2890                    }));
2891                }
2892                Err(e) => {
2893                    results.push(serde_json::json!({
2894                        "index": index,
2895                        "success": false,
2896                        "error": e.to_string()
2897                    }));
2898                }
2899            }
2900
2901            // Add delay between messages (except for the last one)
2902            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2903                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2904            }
2905        }
2906
2907        let success_count =
2908            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2909
2910        Json(serde_json::json!({
2911            "success": true,
2912            "total": request.messages.len(),
2913            "succeeded": success_count,
2914            "failed": request.messages.len() - success_count,
2915            "results": results
2916        }))
2917        .into_response()
2918    } else {
2919        (
2920            StatusCode::SERVICE_UNAVAILABLE,
2921            Json(serde_json::json!({
2922                "error": "Kafka broker not available",
2923                "message": "Kafka broker is not enabled or not available."
2924            })),
2925        )
2926            .into_response()
2927    }
2928}
2929
2930// ========== Real-time Message Streaming (SSE) ==========
2931
2932#[cfg(feature = "mqtt")]
2933/// SSE stream for MQTT messages
2934async fn mqtt_messages_stream(
2935    State(state): State<ManagementState>,
2936    Query(params): Query<std::collections::HashMap<String, String>>,
2937) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2938    let rx = state.message_events.subscribe();
2939    let topic_filter = params.get("topic").cloned();
2940
2941    let stream = stream::unfold(rx, move |mut rx| {
2942        let topic_filter = topic_filter.clone();
2943
2944        async move {
2945            loop {
2946                match rx.recv().await {
2947                    Ok(MessageEvent::Mqtt(event)) => {
2948                        // Apply topic filter if specified
2949                        if let Some(filter) = &topic_filter {
2950                            if !event.topic.contains(filter) {
2951                                continue;
2952                            }
2953                        }
2954
2955                        let event_json = serde_json::json!({
2956                            "protocol": "mqtt",
2957                            "topic": event.topic,
2958                            "payload": event.payload,
2959                            "qos": event.qos,
2960                            "retain": event.retain,
2961                            "timestamp": event.timestamp,
2962                        });
2963
2964                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2965                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2966                            return Some((Ok(sse_event), rx));
2967                        }
2968                    }
2969                    #[cfg(feature = "kafka")]
2970                    Ok(MessageEvent::Kafka(_)) => {
2971                        // Skip Kafka events in MQTT stream
2972                        continue;
2973                    }
2974                    Err(broadcast::error::RecvError::Closed) => {
2975                        return None;
2976                    }
2977                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2978                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2979                        continue;
2980                    }
2981                }
2982            }
2983        }
2984    });
2985
2986    Sse::new(stream).keep_alive(
2987        axum::response::sse::KeepAlive::new()
2988            .interval(std::time::Duration::from_secs(15))
2989            .text("keep-alive-text"),
2990    )
2991}
2992
2993#[cfg(feature = "kafka")]
2994/// SSE stream for Kafka messages
2995async fn kafka_messages_stream(
2996    State(state): State<ManagementState>,
2997    Query(params): Query<std::collections::HashMap<String, String>>,
2998) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2999    let mut rx = state.message_events.subscribe();
3000    let topic_filter = params.get("topic").cloned();
3001
3002    let stream = stream::unfold(rx, move |mut rx| {
3003        let topic_filter = topic_filter.clone();
3004
3005        async move {
3006            loop {
3007                match rx.recv().await {
3008                    #[cfg(feature = "mqtt")]
3009                    Ok(MessageEvent::Mqtt(_)) => {
3010                        // Skip MQTT events in Kafka stream
3011                        continue;
3012                    }
3013                    Ok(MessageEvent::Kafka(event)) => {
3014                        // Apply topic filter if specified
3015                        if let Some(filter) = &topic_filter {
3016                            if !event.topic.contains(filter) {
3017                                continue;
3018                            }
3019                        }
3020
3021                        let event_json = serde_json::json!({
3022                            "protocol": "kafka",
3023                            "topic": event.topic,
3024                            "key": event.key,
3025                            "value": event.value,
3026                            "partition": event.partition,
3027                            "offset": event.offset,
3028                            "headers": event.headers,
3029                            "timestamp": event.timestamp,
3030                        });
3031
3032                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3033                            let sse_event =
3034                                Event::default().event("kafka_message").data(event_data);
3035                            return Some((Ok(sse_event), rx));
3036                        }
3037                    }
3038                    Err(broadcast::error::RecvError::Closed) => {
3039                        return None;
3040                    }
3041                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3042                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3043                        continue;
3044                    }
3045                }
3046            }
3047        }
3048    });
3049
3050    Sse::new(stream).keep_alive(
3051        axum::response::sse::KeepAlive::new()
3052            .interval(std::time::Duration::from_secs(15))
3053            .text("keep-alive-text"),
3054    )
3055}
3056
3057// ========== AI-Powered Features ==========
3058
3059/// Request for AI-powered API specification generation
3060#[derive(Debug, Deserialize)]
3061pub struct GenerateSpecRequest {
3062    /// Natural language description of the API to generate
3063    pub query: String,
3064    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3065    pub spec_type: String,
3066    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3067    pub api_version: Option<String>,
3068}
3069
3070/// Request for OpenAPI generation from recorded traffic
3071#[derive(Debug, Deserialize)]
3072pub struct GenerateOpenApiFromTrafficRequest {
3073    /// Path to recorder database (optional, defaults to ./recordings.db)
3074    #[serde(default)]
3075    pub database_path: Option<String>,
3076    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3077    #[serde(default)]
3078    pub since: Option<String>,
3079    /// End time for filtering (ISO 8601 format)
3080    #[serde(default)]
3081    pub until: Option<String>,
3082    /// Path pattern filter (supports wildcards, e.g., /api/*)
3083    #[serde(default)]
3084    pub path_pattern: Option<String>,
3085    /// Minimum confidence score for including paths (0.0 to 1.0)
3086    #[serde(default = "default_min_confidence")]
3087    pub min_confidence: f64,
3088}
3089
3090fn default_min_confidence() -> f64 {
3091    0.7
3092}
3093
3094/// Generate API specification from natural language using AI
3095#[cfg(feature = "data-faker")]
3096async fn generate_ai_spec(
3097    State(_state): State<ManagementState>,
3098    Json(request): Json<GenerateSpecRequest>,
3099) -> impl IntoResponse {
3100    use mockforge_data::rag::{
3101        config::{LlmProvider, RagConfig},
3102        engine::RagEngine,
3103        storage::DocumentStorage,
3104    };
3105    use std::sync::Arc;
3106
3107    // Build RAG config from environment variables
3108    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3109        .ok()
3110        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3111
3112    // Check if RAG is configured - require API key
3113    if api_key.is_none() {
3114        return (
3115            StatusCode::SERVICE_UNAVAILABLE,
3116            Json(serde_json::json!({
3117                "error": "AI service not configured",
3118                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3119            })),
3120        )
3121            .into_response();
3122    }
3123
3124    // Build RAG configuration
3125    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3126        .unwrap_or_else(|_| "openai".to_string())
3127        .to_lowercase();
3128
3129    let provider = match provider_str.as_str() {
3130        "openai" => LlmProvider::OpenAI,
3131        "anthropic" => LlmProvider::Anthropic,
3132        "ollama" => LlmProvider::Ollama,
3133        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3134        _ => LlmProvider::OpenAI,
3135    };
3136
3137    let api_endpoint =
3138        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3139            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3140            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3141            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3142            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3143        });
3144
3145    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3146        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3147        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3148        LlmProvider::Ollama => "llama2".to_string(),
3149        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3150    });
3151
3152    // Build RagConfig using default() and override fields
3153    let mut rag_config = RagConfig::default();
3154    rag_config.provider = provider;
3155    rag_config.api_endpoint = api_endpoint;
3156    rag_config.api_key = api_key;
3157    rag_config.model = model;
3158    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3159        .unwrap_or_else(|_| "4096".to_string())
3160        .parse()
3161        .unwrap_or(4096);
3162    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3163        .unwrap_or_else(|_| "0.3".to_string())
3164        .parse()
3165        .unwrap_or(0.3); // Lower temperature for more structured output
3166    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3167        .unwrap_or_else(|_| "60".to_string())
3168        .parse()
3169        .unwrap_or(60);
3170    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3171        .unwrap_or_else(|_| "4000".to_string())
3172        .parse()
3173        .unwrap_or(4000);
3174
3175    // Build the prompt for spec generation
3176    let spec_type_label = match request.spec_type.as_str() {
3177        "openapi" => "OpenAPI 3.0",
3178        "graphql" => "GraphQL",
3179        "asyncapi" => "AsyncAPI",
3180        _ => "OpenAPI 3.0",
3181    };
3182
3183    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3184
3185    let prompt = format!(
3186        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3187
3188User Requirements:
3189{}
3190
3191Instructions:
31921. Generate a complete, valid {} specification
31932. Include all paths, operations, request/response schemas, and components
31943. Use realistic field names and data types
31954. Include proper descriptions and examples
31965. Follow {} best practices
31976. Return ONLY the specification, no additional explanation
31987. For OpenAPI, use version {}
3199
3200Return the specification in {} format."#,
3201        spec_type_label,
3202        request.query,
3203        spec_type_label,
3204        spec_type_label,
3205        api_version,
3206        if request.spec_type == "graphql" {
3207            "GraphQL SDL"
3208        } else {
3209            "YAML"
3210        }
3211    );
3212
3213    // Create in-memory storage for RAG engine
3214    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3215    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3216    // a simpler approach: create InMemoryStorage directly
3217    use mockforge_data::rag::storage::InMemoryStorage;
3218    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3219
3220    // Create RAG engine
3221    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3222        Ok(engine) => engine,
3223        Err(e) => {
3224            return (
3225                StatusCode::INTERNAL_SERVER_ERROR,
3226                Json(serde_json::json!({
3227                    "error": "Failed to initialize RAG engine",
3228                    "message": e.to_string()
3229                })),
3230            )
3231                .into_response();
3232        }
3233    };
3234
3235    // Generate using RAG engine
3236    match rag_engine.generate(&prompt, None).await {
3237        Ok(generated_text) => {
3238            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3239            let spec = if request.spec_type == "graphql" {
3240                // For GraphQL, extract SDL
3241                extract_graphql_schema(&generated_text)
3242            } else {
3243                // For OpenAPI/AsyncAPI, extract YAML
3244                extract_yaml_spec(&generated_text)
3245            };
3246
3247            Json(serde_json::json!({
3248                "success": true,
3249                "spec": spec,
3250                "spec_type": request.spec_type,
3251            }))
3252            .into_response()
3253        }
3254        Err(e) => (
3255            StatusCode::INTERNAL_SERVER_ERROR,
3256            Json(serde_json::json!({
3257                "error": "AI generation failed",
3258                "message": e.to_string()
3259            })),
3260        )
3261            .into_response(),
3262    }
3263}
3264
3265#[cfg(not(feature = "data-faker"))]
3266async fn generate_ai_spec(
3267    State(_state): State<ManagementState>,
3268    Json(_request): Json<GenerateSpecRequest>,
3269) -> impl IntoResponse {
3270    (
3271        StatusCode::NOT_IMPLEMENTED,
3272        Json(serde_json::json!({
3273            "error": "AI features not enabled",
3274            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3275        })),
3276    )
3277        .into_response()
3278}
3279
3280/// Generate OpenAPI specification from recorded traffic
3281#[cfg(feature = "behavioral-cloning")]
3282async fn generate_openapi_from_traffic(
3283    State(_state): State<ManagementState>,
3284    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3285) -> impl IntoResponse {
3286    use chrono::{DateTime, Utc};
3287    use mockforge_core::intelligent_behavior::{
3288        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3289        IntelligentBehaviorConfig,
3290    };
3291    use mockforge_recorder::{
3292        database::RecorderDatabase,
3293        openapi_export::{QueryFilters, RecordingsToOpenApi},
3294    };
3295    use std::path::PathBuf;
3296
3297    // Determine database path
3298    let db_path = if let Some(ref path) = request.database_path {
3299        PathBuf::from(path)
3300    } else {
3301        std::env::current_dir()
3302            .unwrap_or_else(|_| PathBuf::from("."))
3303            .join("recordings.db")
3304    };
3305
3306    // Open database
3307    let db = match RecorderDatabase::new(&db_path).await {
3308        Ok(db) => db,
3309        Err(e) => {
3310            return (
3311                StatusCode::BAD_REQUEST,
3312                Json(serde_json::json!({
3313                    "error": "Database error",
3314                    "message": format!("Failed to open recorder database: {}", e)
3315                })),
3316            )
3317                .into_response();
3318        }
3319    };
3320
3321    // Parse time filters
3322    let since_dt = if let Some(ref since_str) = request.since {
3323        match DateTime::parse_from_rfc3339(since_str) {
3324            Ok(dt) => Some(dt.with_timezone(&Utc)),
3325            Err(e) => {
3326                return (
3327                    StatusCode::BAD_REQUEST,
3328                    Json(serde_json::json!({
3329                        "error": "Invalid date format",
3330                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3331                    })),
3332                )
3333                    .into_response();
3334            }
3335        }
3336    } else {
3337        None
3338    };
3339
3340    let until_dt = if let Some(ref until_str) = request.until {
3341        match DateTime::parse_from_rfc3339(until_str) {
3342            Ok(dt) => Some(dt.with_timezone(&Utc)),
3343            Err(e) => {
3344                return (
3345                    StatusCode::BAD_REQUEST,
3346                    Json(serde_json::json!({
3347                        "error": "Invalid date format",
3348                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3349                    })),
3350                )
3351                    .into_response();
3352            }
3353        }
3354    } else {
3355        None
3356    };
3357
3358    // Build query filters
3359    let query_filters = QueryFilters {
3360        since: since_dt,
3361        until: until_dt,
3362        path_pattern: request.path_pattern.clone(),
3363        min_status_code: None,
3364        max_requests: Some(1000),
3365    };
3366
3367    // Query HTTP exchanges
3368    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3369    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3370    // dependency, so we need to manually convert to the local version.
3371    let exchanges_from_recorder =
3372        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3373            Ok(exchanges) => exchanges,
3374            Err(e) => {
3375                return (
3376                    StatusCode::INTERNAL_SERVER_ERROR,
3377                    Json(serde_json::json!({
3378                        "error": "Query error",
3379                        "message": format!("Failed to query HTTP exchanges: {}", e)
3380                    })),
3381                )
3382                    .into_response();
3383            }
3384        };
3385
3386    if exchanges_from_recorder.is_empty() {
3387        return (
3388            StatusCode::NOT_FOUND,
3389            Json(serde_json::json!({
3390                "error": "No exchanges found",
3391                "message": "No HTTP exchanges found matching the specified filters"
3392            })),
3393        )
3394            .into_response();
3395    }
3396
3397    // Convert to local HttpExchange type to avoid version mismatch
3398    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3399    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3400        .into_iter()
3401        .map(|e| LocalHttpExchange {
3402            method: e.method,
3403            path: e.path,
3404            query_params: e.query_params,
3405            headers: e.headers,
3406            body: e.body,
3407            body_encoding: e.body_encoding,
3408            status_code: e.status_code,
3409            response_headers: e.response_headers,
3410            response_body: e.response_body,
3411            response_body_encoding: e.response_body_encoding,
3412            timestamp: e.timestamp,
3413        })
3414        .collect();
3415
3416    // Create OpenAPI generator config
3417    let behavior_config = IntelligentBehaviorConfig::default();
3418    let gen_config = OpenApiGenerationConfig {
3419        min_confidence: request.min_confidence,
3420        behavior_model: Some(behavior_config.behavior_model),
3421    };
3422
3423    // Generate OpenAPI spec
3424    let generator = OpenApiSpecGenerator::new(gen_config);
3425    let result = match generator.generate_from_exchanges(exchanges).await {
3426        Ok(result) => result,
3427        Err(e) => {
3428            return (
3429                StatusCode::INTERNAL_SERVER_ERROR,
3430                Json(serde_json::json!({
3431                    "error": "Generation error",
3432                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3433                })),
3434            )
3435                .into_response();
3436        }
3437    };
3438
3439    // Prepare response
3440    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3441        raw.clone()
3442    } else {
3443        match serde_json::to_value(&result.spec.spec) {
3444            Ok(json) => json,
3445            Err(e) => {
3446                return (
3447                    StatusCode::INTERNAL_SERVER_ERROR,
3448                    Json(serde_json::json!({
3449                        "error": "Serialization error",
3450                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3451                    })),
3452                )
3453                    .into_response();
3454            }
3455        }
3456    };
3457
3458    // Build response with metadata
3459    let response = serde_json::json!({
3460        "spec": spec_json,
3461        "metadata": {
3462            "requests_analyzed": result.metadata.requests_analyzed,
3463            "paths_inferred": result.metadata.paths_inferred,
3464            "path_confidence": result.metadata.path_confidence,
3465            "generated_at": result.metadata.generated_at.to_rfc3339(),
3466            "duration_ms": result.metadata.duration_ms,
3467        }
3468    });
3469
3470    Json(response).into_response()
3471}
3472
3473/// List all rule explanations
3474async fn list_rule_explanations(
3475    State(state): State<ManagementState>,
3476    Query(params): Query<std::collections::HashMap<String, String>>,
3477) -> impl IntoResponse {
3478    use mockforge_core::intelligent_behavior::RuleType;
3479
3480    let explanations = state.rule_explanations.read().await;
3481    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3482
3483    // Filter by rule type if provided
3484    if let Some(rule_type_str) = params.get("rule_type") {
3485        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3486            explanations_vec.retain(|e| e.rule_type == rule_type);
3487        }
3488    }
3489
3490    // Filter by minimum confidence if provided
3491    if let Some(min_confidence_str) = params.get("min_confidence") {
3492        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3493            explanations_vec.retain(|e| e.confidence >= min_confidence);
3494        }
3495    }
3496
3497    // Sort by confidence (descending) and then by generated_at (descending)
3498    explanations_vec.sort_by(|a, b| {
3499        b.confidence
3500            .partial_cmp(&a.confidence)
3501            .unwrap_or(std::cmp::Ordering::Equal)
3502            .then_with(|| b.generated_at.cmp(&a.generated_at))
3503    });
3504
3505    Json(serde_json::json!({
3506        "explanations": explanations_vec,
3507        "total": explanations_vec.len(),
3508    }))
3509    .into_response()
3510}
3511
3512/// Get a specific rule explanation by ID
3513async fn get_rule_explanation(
3514    State(state): State<ManagementState>,
3515    Path(rule_id): Path<String>,
3516) -> impl IntoResponse {
3517    let explanations = state.rule_explanations.read().await;
3518
3519    match explanations.get(&rule_id) {
3520        Some(explanation) => Json(serde_json::json!({
3521            "explanation": explanation,
3522        }))
3523        .into_response(),
3524        None => (
3525            StatusCode::NOT_FOUND,
3526            Json(serde_json::json!({
3527                "error": "Rule explanation not found",
3528                "message": format!("No explanation found for rule ID: {}", rule_id)
3529            })),
3530        )
3531            .into_response(),
3532    }
3533}
3534
3535/// Request for learning from examples
3536#[derive(Debug, Deserialize)]
3537pub struct LearnFromExamplesRequest {
3538    /// Example request/response pairs to learn from
3539    pub examples: Vec<ExamplePairRequest>,
3540    /// Optional configuration override
3541    #[serde(default)]
3542    pub config: Option<serde_json::Value>,
3543}
3544
3545/// Example pair request format
3546#[derive(Debug, Deserialize)]
3547pub struct ExamplePairRequest {
3548    /// Request data (method, path, body, etc.)
3549    pub request: serde_json::Value,
3550    /// Response data (status_code, body, etc.)
3551    pub response: serde_json::Value,
3552}
3553
3554/// Learn behavioral rules from example pairs
3555///
3556/// This endpoint accepts example request/response pairs, generates behavioral rules
3557/// with explanations, and stores the explanations for later retrieval.
3558async fn learn_from_examples(
3559    State(state): State<ManagementState>,
3560    Json(request): Json<LearnFromExamplesRequest>,
3561) -> impl IntoResponse {
3562    use mockforge_core::intelligent_behavior::{
3563        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3564        rule_generator::{ExamplePair, RuleGenerator},
3565    };
3566
3567    if request.examples.is_empty() {
3568        return (
3569            StatusCode::BAD_REQUEST,
3570            Json(serde_json::json!({
3571                "error": "No examples provided",
3572                "message": "At least one example pair is required"
3573            })),
3574        )
3575            .into_response();
3576    }
3577
3578    // Convert request examples to ExamplePair format
3579    let example_pairs: Result<Vec<ExamplePair>, String> = request
3580        .examples
3581        .into_iter()
3582        .enumerate()
3583        .map(|(idx, ex)| {
3584            // Parse request JSON to extract method, path, body, etc.
3585            let method = ex
3586                .request
3587                .get("method")
3588                .and_then(|v| v.as_str())
3589                .map(|s| s.to_string())
3590                .unwrap_or_else(|| "GET".to_string());
3591            let path = ex
3592                .request
3593                .get("path")
3594                .and_then(|v| v.as_str())
3595                .map(|s| s.to_string())
3596                .unwrap_or_else(|| "/".to_string());
3597            let request_body = ex.request.get("body").cloned();
3598            let query_params = ex
3599                .request
3600                .get("query_params")
3601                .and_then(|v| v.as_object())
3602                .map(|obj| {
3603                    obj.iter()
3604                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3605                        .collect()
3606                })
3607                .unwrap_or_default();
3608            let headers = ex
3609                .request
3610                .get("headers")
3611                .and_then(|v| v.as_object())
3612                .map(|obj| {
3613                    obj.iter()
3614                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3615                        .collect()
3616                })
3617                .unwrap_or_default();
3618
3619            // Parse response JSON to extract status, body, etc.
3620            let status = ex
3621                .response
3622                .get("status_code")
3623                .or_else(|| ex.response.get("status"))
3624                .and_then(|v| v.as_u64())
3625                .map(|n| n as u16)
3626                .unwrap_or(200);
3627            let response_body = ex.response.get("body").cloned();
3628
3629            Ok(ExamplePair {
3630                method,
3631                path,
3632                request: request_body,
3633                status,
3634                response: response_body,
3635                query_params,
3636                headers,
3637                metadata: {
3638                    let mut meta = std::collections::HashMap::new();
3639                    meta.insert("source".to_string(), "api".to_string());
3640                    meta.insert("example_index".to_string(), idx.to_string());
3641                    meta
3642                },
3643            })
3644        })
3645        .collect();
3646
3647    let example_pairs = match example_pairs {
3648        Ok(pairs) => pairs,
3649        Err(e) => {
3650            return (
3651                StatusCode::BAD_REQUEST,
3652                Json(serde_json::json!({
3653                    "error": "Invalid examples",
3654                    "message": e
3655                })),
3656            )
3657                .into_response();
3658        }
3659    };
3660
3661    // Create behavior config (use provided config or default)
3662    let behavior_config = if let Some(config_json) = request.config {
3663        // Try to deserialize custom config, fall back to default
3664        serde_json::from_value(config_json)
3665            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3666            .behavior_model
3667    } else {
3668        BehaviorModelConfig::default()
3669    };
3670
3671    // Create rule generator
3672    let generator = RuleGenerator::new(behavior_config);
3673
3674    // Generate rules with explanations
3675    let (rules, explanations) =
3676        match generator.generate_rules_with_explanations(example_pairs).await {
3677            Ok(result) => result,
3678            Err(e) => {
3679                return (
3680                    StatusCode::INTERNAL_SERVER_ERROR,
3681                    Json(serde_json::json!({
3682                        "error": "Rule generation failed",
3683                        "message": format!("Failed to generate rules: {}", e)
3684                    })),
3685                )
3686                    .into_response();
3687            }
3688        };
3689
3690    // Store explanations in ManagementState
3691    {
3692        let mut stored_explanations = state.rule_explanations.write().await;
3693        for explanation in &explanations {
3694            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3695        }
3696    }
3697
3698    // Prepare response
3699    let response = serde_json::json!({
3700        "success": true,
3701        "rules_generated": {
3702            "consistency_rules": rules.consistency_rules.len(),
3703            "schemas": rules.schemas.len(),
3704            "state_machines": rules.state_transitions.len(),
3705            "system_prompt": !rules.system_prompt.is_empty(),
3706        },
3707        "explanations": explanations.iter().map(|e| serde_json::json!({
3708            "rule_id": e.rule_id,
3709            "rule_type": e.rule_type,
3710            "confidence": e.confidence,
3711            "reasoning": e.reasoning,
3712        })).collect::<Vec<_>>(),
3713        "total_explanations": explanations.len(),
3714    });
3715
3716    Json(response).into_response()
3717}
3718
3719#[cfg(feature = "data-faker")]
3720fn extract_yaml_spec(text: &str) -> String {
3721    // Try to find YAML code blocks
3722    if let Some(start) = text.find("```yaml") {
3723        let yaml_start = text[start + 7..].trim_start();
3724        if let Some(end) = yaml_start.find("```") {
3725            return yaml_start[..end].trim().to_string();
3726        }
3727    }
3728    if let Some(start) = text.find("```") {
3729        let content_start = text[start + 3..].trim_start();
3730        if let Some(end) = content_start.find("```") {
3731            return content_start[..end].trim().to_string();
3732        }
3733    }
3734
3735    // Check if it starts with openapi: or asyncapi:
3736    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3737        return text.trim().to_string();
3738    }
3739
3740    // Return as-is if no code blocks found
3741    text.trim().to_string()
3742}
3743
3744/// Extract GraphQL schema from text content
3745#[cfg(feature = "data-faker")]
3746fn extract_graphql_schema(text: &str) -> String {
3747    // Try to find GraphQL code blocks
3748    if let Some(start) = text.find("```graphql") {
3749        let schema_start = text[start + 10..].trim_start();
3750        if let Some(end) = schema_start.find("```") {
3751            return schema_start[..end].trim().to_string();
3752        }
3753    }
3754    if let Some(start) = text.find("```") {
3755        let content_start = text[start + 3..].trim_start();
3756        if let Some(end) = content_start.find("```") {
3757            return content_start[..end].trim().to_string();
3758        }
3759    }
3760
3761    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3762    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3763        return text.trim().to_string();
3764    }
3765
3766    text.trim().to_string()
3767}
3768
3769// ========== Chaos Engineering Management ==========
3770
3771/// Get current chaos engineering configuration
3772async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3773    #[cfg(feature = "chaos")]
3774    {
3775        if let Some(chaos_state) = &_state.chaos_api_state {
3776            let config = chaos_state.config.read().await;
3777            // Convert ChaosConfig to JSON response format
3778            Json(serde_json::json!({
3779                "enabled": config.enabled,
3780                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3781                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3782                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3783                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3784            }))
3785            .into_response()
3786        } else {
3787            // Chaos API not available, return default
3788            Json(serde_json::json!({
3789                "enabled": false,
3790                "latency": null,
3791                "fault_injection": null,
3792                "rate_limit": null,
3793                "traffic_shaping": null,
3794            }))
3795            .into_response()
3796        }
3797    }
3798    #[cfg(not(feature = "chaos"))]
3799    {
3800        // Chaos feature not enabled
3801        Json(serde_json::json!({
3802            "enabled": false,
3803            "latency": null,
3804            "fault_injection": null,
3805            "rate_limit": null,
3806            "traffic_shaping": null,
3807        }))
3808        .into_response()
3809    }
3810}
3811
3812/// Request to update chaos configuration
3813#[derive(Debug, Deserialize)]
3814pub struct ChaosConfigUpdate {
3815    /// Whether to enable chaos engineering
3816    pub enabled: Option<bool>,
3817    /// Latency configuration
3818    pub latency: Option<serde_json::Value>,
3819    /// Fault injection configuration
3820    pub fault_injection: Option<serde_json::Value>,
3821    /// Rate limiting configuration
3822    pub rate_limit: Option<serde_json::Value>,
3823    /// Traffic shaping configuration
3824    pub traffic_shaping: Option<serde_json::Value>,
3825}
3826
3827/// Update chaos engineering configuration
3828async fn update_chaos_config(
3829    State(_state): State<ManagementState>,
3830    Json(_config_update): Json<ChaosConfigUpdate>,
3831) -> impl IntoResponse {
3832    #[cfg(feature = "chaos")]
3833    {
3834        if let Some(chaos_state) = &_state.chaos_api_state {
3835            use mockforge_chaos::config::{
3836                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3837                TrafficShapingConfig,
3838            };
3839
3840            let mut config = chaos_state.config.write().await;
3841
3842            // Update enabled flag if provided
3843            if let Some(enabled) = _config_update.enabled {
3844                config.enabled = enabled;
3845            }
3846
3847            // Update latency config if provided
3848            if let Some(latency_json) = _config_update.latency {
3849                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3850                    config.latency = Some(latency);
3851                }
3852            }
3853
3854            // Update fault injection config if provided
3855            if let Some(fault_json) = _config_update.fault_injection {
3856                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3857                    config.fault_injection = Some(fault);
3858                }
3859            }
3860
3861            // Update rate limit config if provided
3862            if let Some(rate_json) = _config_update.rate_limit {
3863                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3864                    config.rate_limit = Some(rate);
3865                }
3866            }
3867
3868            // Update traffic shaping config if provided
3869            if let Some(traffic_json) = _config_update.traffic_shaping {
3870                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3871                    config.traffic_shaping = Some(traffic);
3872                }
3873            }
3874
3875            // Reinitialize middleware injectors with new config
3876            // The middleware will pick up the changes on the next request
3877            drop(config);
3878
3879            info!("Chaos configuration updated successfully");
3880            Json(serde_json::json!({
3881                "success": true,
3882                "message": "Chaos configuration updated and applied"
3883            }))
3884            .into_response()
3885        } else {
3886            (
3887                StatusCode::SERVICE_UNAVAILABLE,
3888                Json(serde_json::json!({
3889                    "success": false,
3890                    "error": "Chaos API not available",
3891                    "message": "Chaos engineering is not enabled or configured"
3892                })),
3893            )
3894                .into_response()
3895        }
3896    }
3897    #[cfg(not(feature = "chaos"))]
3898    {
3899        (
3900            StatusCode::NOT_IMPLEMENTED,
3901            Json(serde_json::json!({
3902                "success": false,
3903                "error": "Chaos feature not enabled",
3904                "message": "Chaos engineering feature is not compiled into this build"
3905            })),
3906        )
3907            .into_response()
3908    }
3909}
3910
3911// ========== Network Profile Management ==========
3912
3913/// List available network profiles
3914async fn list_network_profiles() -> impl IntoResponse {
3915    use mockforge_core::network_profiles::NetworkProfileCatalog;
3916
3917    let catalog = NetworkProfileCatalog::default();
3918    let profiles: Vec<serde_json::Value> = catalog
3919        .list_profiles_with_description()
3920        .iter()
3921        .map(|(name, description)| {
3922            serde_json::json!({
3923                "name": name,
3924                "description": description,
3925            })
3926        })
3927        .collect();
3928
3929    Json(serde_json::json!({
3930        "profiles": profiles
3931    }))
3932    .into_response()
3933}
3934
3935#[derive(Debug, Deserialize)]
3936/// Request to apply a network profile
3937pub struct ApplyNetworkProfileRequest {
3938    /// Name of the network profile to apply
3939    pub profile_name: String,
3940}
3941
3942/// Apply a network profile
3943async fn apply_network_profile(
3944    State(state): State<ManagementState>,
3945    Json(request): Json<ApplyNetworkProfileRequest>,
3946) -> impl IntoResponse {
3947    use mockforge_core::network_profiles::NetworkProfileCatalog;
3948
3949    let catalog = NetworkProfileCatalog::default();
3950    if let Some(profile) = catalog.get(&request.profile_name) {
3951        // Apply profile to server configuration if available
3952        // NetworkProfile contains latency and traffic_shaping configs
3953        if let Some(server_config) = &state.server_config {
3954            let mut config = server_config.write().await;
3955
3956            // Apply network profile's traffic shaping to core config
3957            use mockforge_core::config::NetworkShapingConfig;
3958
3959            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3960            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3961            // which has bandwidth and burst_loss fields
3962            let network_shaping = NetworkShapingConfig {
3963                enabled: profile.traffic_shaping.bandwidth.enabled
3964                    || profile.traffic_shaping.burst_loss.enabled,
3965                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3966                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3967                max_connections: 1000, // Default value
3968            };
3969
3970            // Update chaos config if it exists, or create it
3971            // Chaos config is in observability.chaos, not core.chaos
3972            if let Some(ref mut chaos) = config.observability.chaos {
3973                chaos.traffic_shaping = Some(network_shaping);
3974            } else {
3975                // Create minimal chaos config with traffic shaping
3976                use mockforge_core::config::ChaosEngConfig;
3977                config.observability.chaos = Some(ChaosEngConfig {
3978                    enabled: true,
3979                    latency: None,
3980                    fault_injection: None,
3981                    rate_limit: None,
3982                    traffic_shaping: Some(network_shaping),
3983                    scenario: None,
3984                });
3985            }
3986
3987            info!("Network profile '{}' applied to server configuration", request.profile_name);
3988        } else {
3989            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3990        }
3991
3992        // Also update chaos API state if available
3993        #[cfg(feature = "chaos")]
3994        {
3995            if let Some(chaos_state) = &state.chaos_api_state {
3996                use mockforge_chaos::config::TrafficShapingConfig;
3997
3998                let mut chaos_config = chaos_state.config.write().await;
3999                // Apply profile's traffic shaping to chaos API state
4000                let chaos_traffic_shaping = TrafficShapingConfig {
4001                    enabled: profile.traffic_shaping.bandwidth.enabled
4002                        || profile.traffic_shaping.burst_loss.enabled,
4003                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4004                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4005                    max_connections: 0,
4006                    connection_timeout_ms: 30000,
4007                };
4008                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4009                chaos_config.enabled = true; // Enable chaos when applying a profile
4010                drop(chaos_config);
4011                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4012            }
4013        }
4014
4015        Json(serde_json::json!({
4016            "success": true,
4017            "message": format!("Network profile '{}' applied", request.profile_name),
4018            "profile": {
4019                "name": profile.name,
4020                "description": profile.description,
4021            }
4022        }))
4023        .into_response()
4024    } else {
4025        (
4026            StatusCode::NOT_FOUND,
4027            Json(serde_json::json!({
4028                "error": "Profile not found",
4029                "message": format!("Network profile '{}' not found", request.profile_name)
4030            })),
4031        )
4032            .into_response()
4033    }
4034}
4035
4036/// Build the management API router with UI Builder support
4037pub fn management_router_with_ui_builder(
4038    state: ManagementState,
4039    server_config: mockforge_core::config::ServerConfig,
4040) -> Router {
4041    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4042
4043    // Create the base management router
4044    let management = management_router(state);
4045
4046    // Create UI Builder state and router
4047    let ui_builder_state = UIBuilderState::new(server_config);
4048    let ui_builder = create_ui_builder_router(ui_builder_state);
4049
4050    // Nest UI Builder under /ui-builder
4051    management.nest("/ui-builder", ui_builder)
4052}
4053
4054/// Build management router with spec import API
4055pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4056    use crate::spec_import::{spec_import_router, SpecImportState};
4057
4058    // Create base management router
4059    let management = management_router(state);
4060
4061    // Merge with spec import router
4062    Router::new()
4063        .merge(management)
4064        .merge(spec_import_router(SpecImportState::new()))
4065}
4066
4067#[cfg(test)]
4068mod tests {
4069    use super::*;
4070
4071    #[tokio::test]
4072    async fn test_create_and_get_mock() {
4073        let state = ManagementState::new(None, None, 3000);
4074
4075        let mock = MockConfig {
4076            id: "test-1".to_string(),
4077            name: "Test Mock".to_string(),
4078            method: "GET".to_string(),
4079            path: "/test".to_string(),
4080            response: MockResponse {
4081                body: serde_json::json!({"message": "test"}),
4082                headers: None,
4083            },
4084            enabled: true,
4085            latency_ms: None,
4086            status_code: Some(200),
4087            request_match: None,
4088            priority: None,
4089            scenario: None,
4090            required_scenario_state: None,
4091            new_scenario_state: None,
4092        };
4093
4094        // Create mock
4095        {
4096            let mut mocks = state.mocks.write().await;
4097            mocks.push(mock.clone());
4098        }
4099
4100        // Get mock
4101        let mocks = state.mocks.read().await;
4102        let found = mocks.iter().find(|m| m.id == "test-1");
4103        assert!(found.is_some());
4104        assert_eq!(found.unwrap().name, "Test Mock");
4105    }
4106
4107    #[tokio::test]
4108    async fn test_server_stats() {
4109        let state = ManagementState::new(None, None, 3000);
4110
4111        // Add some mocks
4112        {
4113            let mut mocks = state.mocks.write().await;
4114            mocks.push(MockConfig {
4115                id: "1".to_string(),
4116                name: "Mock 1".to_string(),
4117                method: "GET".to_string(),
4118                path: "/test1".to_string(),
4119                response: MockResponse {
4120                    body: serde_json::json!({}),
4121                    headers: None,
4122                },
4123                enabled: true,
4124                latency_ms: None,
4125                status_code: Some(200),
4126                request_match: None,
4127                priority: None,
4128                scenario: None,
4129                required_scenario_state: None,
4130                new_scenario_state: None,
4131            });
4132            mocks.push(MockConfig {
4133                id: "2".to_string(),
4134                name: "Mock 2".to_string(),
4135                method: "POST".to_string(),
4136                path: "/test2".to_string(),
4137                response: MockResponse {
4138                    body: serde_json::json!({}),
4139                    headers: None,
4140                },
4141                enabled: false,
4142                latency_ms: None,
4143                status_code: Some(201),
4144                request_match: None,
4145                priority: None,
4146                scenario: None,
4147                required_scenario_state: None,
4148                new_scenario_state: None,
4149            });
4150        }
4151
4152        let mocks = state.mocks.read().await;
4153        assert_eq!(mocks.len(), 2);
4154        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4155    }
4156
4157    #[test]
4158    fn test_mock_matches_request_with_xpath_absolute_path() {
4159        let mock = MockConfig {
4160            id: "xpath-1".to_string(),
4161            name: "XPath Match".to_string(),
4162            method: "POST".to_string(),
4163            path: "/xml".to_string(),
4164            response: MockResponse {
4165                body: serde_json::json!({"ok": true}),
4166                headers: None,
4167            },
4168            enabled: true,
4169            latency_ms: None,
4170            status_code: Some(200),
4171            request_match: Some(RequestMatchCriteria {
4172                xpath: Some("/root/order/id".to_string()),
4173                ..Default::default()
4174            }),
4175            priority: None,
4176            scenario: None,
4177            required_scenario_state: None,
4178            new_scenario_state: None,
4179        };
4180
4181        let body = br#"<root><order><id>123</id></order></root>"#;
4182        let headers = std::collections::HashMap::new();
4183        let query = std::collections::HashMap::new();
4184
4185        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4186    }
4187
4188    #[test]
4189    fn test_mock_matches_request_with_xpath_text_predicate() {
4190        let mock = MockConfig {
4191            id: "xpath-2".to_string(),
4192            name: "XPath Predicate Match".to_string(),
4193            method: "POST".to_string(),
4194            path: "/xml".to_string(),
4195            response: MockResponse {
4196                body: serde_json::json!({"ok": true}),
4197                headers: None,
4198            },
4199            enabled: true,
4200            latency_ms: None,
4201            status_code: Some(200),
4202            request_match: Some(RequestMatchCriteria {
4203                xpath: Some("//order/id[text()='123']".to_string()),
4204                ..Default::default()
4205            }),
4206            priority: None,
4207            scenario: None,
4208            required_scenario_state: None,
4209            new_scenario_state: None,
4210        };
4211
4212        let body = br#"<root><order><id>123</id></order></root>"#;
4213        let headers = std::collections::HashMap::new();
4214        let query = std::collections::HashMap::new();
4215
4216        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4217    }
4218
4219    #[test]
4220    fn test_mock_matches_request_with_xpath_no_match() {
4221        let mock = MockConfig {
4222            id: "xpath-3".to_string(),
4223            name: "XPath No Match".to_string(),
4224            method: "POST".to_string(),
4225            path: "/xml".to_string(),
4226            response: MockResponse {
4227                body: serde_json::json!({"ok": true}),
4228                headers: None,
4229            },
4230            enabled: true,
4231            latency_ms: None,
4232            status_code: Some(200),
4233            request_match: Some(RequestMatchCriteria {
4234                xpath: Some("//order/id[text()='456']".to_string()),
4235                ..Default::default()
4236            }),
4237            priority: None,
4238            scenario: None,
4239            required_scenario_state: None,
4240            new_scenario_state: None,
4241        };
4242
4243        let body = br#"<root><order><id>123</id></order></root>"#;
4244        let headers = std::collections::HashMap::new();
4245        let query = std::collections::HashMap::new();
4246
4247        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4248    }
4249}