Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32/// Get the broadcast channel capacity from environment or use default
33fn get_message_broadcast_capacity() -> usize {
34    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35        .ok()
36        .and_then(|s| s.parse().ok())
37        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40/// Message event types for real-time monitoring
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45    #[cfg(feature = "mqtt")]
46    /// MQTT message event
47    Mqtt(MqttMessageEvent),
48    #[cfg(feature = "kafka")]
49    /// Kafka message event
50    Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54/// MQTT message event for real-time monitoring
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57    /// MQTT topic name
58    pub topic: String,
59    /// Message payload content
60    pub payload: String,
61    /// Quality of Service level (0, 1, or 2)
62    pub qos: u8,
63    /// Whether the message is retained
64    pub retain: bool,
65    /// RFC3339 formatted timestamp
66    pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72    pub topic: String,
73    pub key: Option<String>,
74    pub value: String,
75    pub partition: i32,
76    pub offset: i64,
77    pub headers: Option<std::collections::HashMap<String, String>>,
78    pub timestamp: String,
79}
80
81/// Mock configuration representation
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84    /// Unique identifier for the mock
85    #[serde(skip_serializing_if = "String::is_empty")]
86    pub id: String,
87    /// Human-readable name for the mock
88    pub name: String,
89    /// HTTP method (GET, POST, etc.)
90    pub method: String,
91    /// API path pattern to match
92    pub path: String,
93    /// Response configuration
94    pub response: MockResponse,
95    /// Whether this mock is currently enabled
96    #[serde(default = "default_true")]
97    pub enabled: bool,
98    /// Optional latency to inject in milliseconds
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub latency_ms: Option<u64>,
101    /// Optional HTTP status code override
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub status_code: Option<u16>,
104    /// Request matching criteria (headers, query params, body patterns)
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub request_match: Option<RequestMatchCriteria>,
107    /// Priority for mock ordering (higher priority mocks are matched first)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub priority: Option<i32>,
110    /// Scenario name for stateful mocking
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub scenario: Option<String>,
113    /// Required scenario state for this mock to be active
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub required_scenario_state: Option<String>,
116    /// New scenario state after this mock is matched
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122    true
123}
124
125/// Mock response configuration
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128    /// Response body as JSON
129    pub body: serde_json::Value,
130    /// Optional custom response headers
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135/// Request matching criteria for advanced request matching
136#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138    /// Headers that must be present and match (case-insensitive header names)
139    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140    pub headers: std::collections::HashMap<String, String>,
141    /// Query parameters that must be present and match
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub query_params: std::collections::HashMap<String, String>,
144    /// Request body pattern (supports exact match or regex)
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub body_pattern: Option<String>,
147    /// JSONPath expression for JSON body matching
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub json_path: Option<String>,
150    /// XPath expression for XML body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub xpath: Option<String>,
153    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub custom_matcher: Option<String>,
156}
157
158/// Check if a request matches the given mock configuration
159///
160/// This function implements comprehensive request matching including:
161/// - Method and path matching
162/// - Header matching (with regex support)
163/// - Query parameter matching
164/// - Body pattern matching (exact, regex, JSONPath, XPath)
165/// - Custom matcher expressions
166pub fn mock_matches_request(
167    mock: &MockConfig,
168    method: &str,
169    path: &str,
170    headers: &std::collections::HashMap<String, String>,
171    query_params: &std::collections::HashMap<String, String>,
172    body: Option<&[u8]>,
173) -> bool {
174    use regex::Regex;
175
176    // Check if mock is enabled
177    if !mock.enabled {
178        return false;
179    }
180
181    // Check method (case-insensitive)
182    if mock.method.to_uppercase() != method.to_uppercase() {
183        return false;
184    }
185
186    // Check path pattern (supports wildcards and path parameters)
187    if !path_matches_pattern(&mock.path, path) {
188        return false;
189    }
190
191    // Check request matching criteria if present
192    if let Some(criteria) = &mock.request_match {
193        // Check headers
194        for (key, expected_value) in &criteria.headers {
195            let header_key_lower = key.to_lowercase();
196            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198            if let Some((_, actual_value)) = found {
199                // Try regex match first, then exact match
200                if let Ok(re) = Regex::new(expected_value) {
201                    if !re.is_match(actual_value) {
202                        return false;
203                    }
204                } else if actual_value != expected_value {
205                    return false;
206                }
207            } else {
208                return false; // Header not found
209            }
210        }
211
212        // Check query parameters
213        for (key, expected_value) in &criteria.query_params {
214            if let Some(actual_value) = query_params.get(key) {
215                if actual_value != expected_value {
216                    return false;
217                }
218            } else {
219                return false; // Query param not found
220            }
221        }
222
223        // Check body pattern
224        if let Some(pattern) = &criteria.body_pattern {
225            if let Some(body_bytes) = body {
226                let body_str = String::from_utf8_lossy(body_bytes);
227                // Try regex first, then exact match
228                if let Ok(re) = Regex::new(pattern) {
229                    if !re.is_match(&body_str) {
230                        return false;
231                    }
232                } else if body_str.as_ref() != pattern {
233                    return false;
234                }
235            } else {
236                return false; // Body required but not present
237            }
238        }
239
240        // Check JSONPath (simplified implementation)
241        if let Some(json_path) = &criteria.json_path {
242            if let Some(body_bytes) = body {
243                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245                        // Simple JSONPath check
246                        if !json_path_exists(&json_value, json_path) {
247                            return false;
248                        }
249                    }
250                }
251            }
252        }
253
254        // Check XPath (supports a focused subset)
255        if let Some(xpath) = &criteria.xpath {
256            if let Some(body_bytes) = body {
257                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258                    if !xml_xpath_exists(body_str, xpath) {
259                        return false;
260                    }
261                } else {
262                    return false;
263                }
264            } else {
265                return false; // Body required but not present
266            }
267        }
268
269        // Check custom matcher
270        if let Some(custom) = &criteria.custom_matcher {
271            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272                return false;
273            }
274        }
275    }
276
277    true
278}
279
280/// Check if a path matches a pattern (supports wildcards and path parameters)
281fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282    // Exact match
283    if pattern == path {
284        return true;
285    }
286
287    // Wildcard match
288    if pattern == "*" {
289        return true;
290    }
291
292    // Path parameter matching (e.g., /users/{id} matches /users/123)
293    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296    if pattern_parts.len() != path_parts.len() {
297        // Check for wildcard patterns
298        if pattern.contains('*') {
299            return matches_wildcard_pattern(pattern, path);
300        }
301        return false;
302    }
303
304    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305        // Check for path parameters {param}
306        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307            continue; // Matches any value
308        }
309
310        if pattern_part != path_part {
311            return false;
312        }
313    }
314
315    true
316}
317
318/// Check if path matches a wildcard pattern
319fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320    use regex::Regex;
321
322    // Convert pattern to regex
323    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326        return re.is_match(path);
327    }
328
329    false
330}
331
332/// Check if a JSONPath exists in a JSON value (simplified implementation)
333fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334    // Simple implementation - for full JSONPath support, use a library like jsonpath-rs
335    // This handles simple paths like $.field or $.field.subfield
336    if let Some(path) = json_path.strip_prefix("$.") {
337        let parts: Vec<&str> = path.split('.').collect();
338
339        let mut current = json;
340        for part in parts {
341            if let Some(obj) = current.as_object() {
342                if let Some(value) = obj.get(part) {
343                    current = value;
344                } else {
345                    return false;
346                }
347            } else {
348                return false;
349            }
350        }
351        true
352    } else {
353        // For complex JSONPath expressions, would need a proper JSONPath library
354        tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355        false
356    }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361    name: String,
362    text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366    if segment.is_empty() {
367        return None;
368    }
369
370    let trimmed = segment.trim();
371    if let Some(bracket_start) = trimmed.find('[') {
372        if !trimmed.ends_with(']') {
373            return None;
374        }
375
376        let name = trimmed[..bracket_start].trim();
377        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378        let predicate = predicate.trim();
379
380        // Support simple predicate: [text()="value"] or [text()='value']
381        if let Some(raw) = predicate.strip_prefix("text()=") {
382            let raw = raw.trim();
383            if raw.len() >= 2
384                && ((raw.starts_with('"') && raw.ends_with('"'))
385                    || (raw.starts_with('\'') && raw.ends_with('\'')))
386            {
387                let text = raw[1..raw.len() - 1].to_string();
388                if !name.is_empty() {
389                    return Some(XPathSegment {
390                        name: name.to_string(),
391                        text_equals: Some(text),
392                    });
393                }
394            }
395        }
396
397        None
398    } else {
399        Some(XPathSegment {
400            name: trimmed.to_string(),
401            text_equals: None,
402        })
403    }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407    if !node.is_element() {
408        return false;
409    }
410    if node.tag_name().name() != segment.name {
411        return false;
412    }
413    match &segment.text_equals {
414        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415        None => true,
416    }
417}
418
419/// Check if an XPath expression matches an XML body.
420///
421/// Supported subset:
422/// - Absolute paths: `/root/child/item`
423/// - Descendant search: `//item` and `//parent/child`
424/// - Optional text predicate per segment: `item[text()="value"]`
425fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426    let doc = match roxmltree::Document::parse(xml_body) {
427        Ok(doc) => doc,
428        Err(err) => {
429            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430            return false;
431        }
432    };
433
434    let expr = xpath.trim();
435    if expr.is_empty() {
436        return false;
437    }
438
439    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440        (true, rest)
441    } else if let Some(rest) = expr.strip_prefix('/') {
442        (false, rest)
443    } else {
444        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445        return false;
446    };
447
448    let segments: Vec<XPathSegment> = path_str
449        .split('/')
450        .filter(|s| !s.trim().is_empty())
451        .filter_map(parse_xpath_segment)
452        .collect();
453
454    if segments.is_empty() {
455        return false;
456    }
457
458    if is_descendant {
459        let first = &segments[0];
460        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461            let mut frontier = vec![node];
462            for segment in &segments[1..] {
463                let mut next_frontier = Vec::new();
464                for parent in &frontier {
465                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466                        next_frontier.push(child);
467                    }
468                }
469                if next_frontier.is_empty() {
470                    frontier.clear();
471                    break;
472                }
473                frontier = next_frontier;
474            }
475            if !frontier.is_empty() {
476                return true;
477            }
478        }
479        false
480    } else {
481        let mut frontier = vec![doc.root_element()];
482        for (index, segment) in segments.iter().enumerate() {
483            let mut next_frontier = Vec::new();
484            for parent in &frontier {
485                if index == 0 {
486                    if segment_matches(*parent, segment) {
487                        next_frontier.push(*parent);
488                    }
489                    continue;
490                }
491                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492                    next_frontier.push(child);
493                }
494            }
495            if next_frontier.is_empty() {
496                return false;
497            }
498            frontier = next_frontier;
499        }
500        !frontier.is_empty()
501    }
502}
503
504/// Evaluate a custom matcher expression
505fn evaluate_custom_matcher(
506    expression: &str,
507    method: &str,
508    path: &str,
509    headers: &std::collections::HashMap<String, String>,
510    query_params: &std::collections::HashMap<String, String>,
511    body: Option<&[u8]>,
512) -> bool {
513    use regex::Regex;
514
515    let expr = expression.trim();
516
517    // Handle equality expressions (field == "value")
518    if expr.contains("==") {
519        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520        if parts.len() != 2 {
521            return false;
522        }
523
524        let field = parts[0];
525        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527        match field {
528            "method" => method == expected_value,
529            "path" => path == expected_value,
530            _ if field.starts_with("headers.") => {
531                let header_name = &field[8..];
532                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533            }
534            _ if field.starts_with("query.") => {
535                let param_name = &field[6..];
536                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537            }
538            _ => false,
539        }
540    }
541    // Handle regex match expressions (field =~ "pattern")
542    else if expr.contains("=~") {
543        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544        if parts.len() != 2 {
545            return false;
546        }
547
548        let field = parts[0];
549        let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551        if let Ok(re) = Regex::new(pattern) {
552            match field {
553                "method" => re.is_match(method),
554                "path" => re.is_match(path),
555                _ if field.starts_with("headers.") => {
556                    let header_name = &field[8..];
557                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558                }
559                _ if field.starts_with("query.") => {
560                    let param_name = &field[6..];
561                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562                }
563                _ => false,
564            }
565        } else {
566            false
567        }
568    }
569    // Handle contains expressions (field contains "value")
570    else if expr.contains("contains") {
571        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572        if parts.len() != 2 {
573            return false;
574        }
575
576        let field = parts[0];
577        let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579        match field {
580            "path" => path.contains(search_value),
581            _ if field.starts_with("headers.") => {
582                let header_name = &field[8..];
583                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584            }
585            _ if field.starts_with("body") => {
586                if let Some(body_bytes) = body {
587                    let body_str = String::from_utf8_lossy(body_bytes);
588                    body_str.contains(search_value)
589                } else {
590                    false
591                }
592            }
593            _ => false,
594        }
595    } else {
596        // Unknown expression format
597        tracing::warn!("Unknown custom matcher expression format: {}", expr);
598        false
599    }
600}
601
602/// Server statistics
603#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605    /// Server uptime in seconds
606    pub uptime_seconds: u64,
607    /// Total number of requests processed
608    pub total_requests: u64,
609    /// Number of active mock configurations
610    pub active_mocks: usize,
611    /// Number of currently enabled mocks
612    pub enabled_mocks: usize,
613    /// Number of registered API routes
614    pub registered_routes: usize,
615}
616
617/// Server configuration info
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620    /// MockForge version string
621    pub version: String,
622    /// Server port number
623    pub port: u16,
624    /// Whether an OpenAPI spec is loaded
625    pub has_openapi_spec: bool,
626    /// Optional path to the OpenAPI spec file
627    #[serde(skip_serializing_if = "Option::is_none")]
628    pub spec_path: Option<String>,
629}
630
631/// Shared state for the management API
632#[derive(Clone)]
633pub struct ManagementState {
634    /// Collection of mock configurations
635    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636    /// Optional OpenAPI specification
637    pub spec: Option<Arc<OpenApiSpec>>,
638    /// Optional path to the OpenAPI spec file
639    pub spec_path: Option<String>,
640    /// Server port number
641    pub port: u16,
642    /// Server start time for uptime calculation
643    pub start_time: std::time::Instant,
644    /// Counter for total requests processed
645    pub request_counter: Arc<RwLock<u64>>,
646    /// Optional proxy configuration for migration pipeline
647    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648    /// Optional SMTP registry for email mocking
649    #[cfg(feature = "smtp")]
650    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651    /// Optional MQTT broker for message mocking
652    #[cfg(feature = "mqtt")]
653    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654    /// Optional Kafka broker for event streaming
655    #[cfg(feature = "kafka")]
656    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657    /// Broadcast channel for message events (MQTT & Kafka)
658    #[cfg(any(feature = "mqtt", feature = "kafka"))]
659    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660    /// State machine manager for scenario state machines
661    pub state_machine_manager:
662        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663    /// Optional WebSocket broadcast channel for real-time updates
664    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665    /// Lifecycle hook registry for extensibility
666    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667    /// Rule explanations storage (in-memory for now)
668    pub rule_explanations: Arc<
669        RwLock<
670            std::collections::HashMap<
671                String,
672                mockforge_core::intelligent_behavior::RuleExplanation,
673            >,
674        >,
675    >,
676    /// Optional chaos API state for chaos config management
677    #[cfg(feature = "chaos")]
678    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679    /// Optional server configuration for profile application
680    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684    /// Create a new management state
685    ///
686    /// # Arguments
687    /// * `spec` - Optional OpenAPI specification
688    /// * `spec_path` - Optional path to the OpenAPI spec file
689    /// * `port` - Server port number
690    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691        Self {
692            mocks: Arc::new(RwLock::new(Vec::new())),
693            spec,
694            spec_path,
695            port,
696            start_time: std::time::Instant::now(),
697            request_counter: Arc::new(RwLock::new(0)),
698            proxy_config: None,
699            #[cfg(feature = "smtp")]
700            smtp_registry: None,
701            #[cfg(feature = "mqtt")]
702            mqtt_broker: None,
703            #[cfg(feature = "kafka")]
704            kafka_broker: None,
705            #[cfg(any(feature = "mqtt", feature = "kafka"))]
706            message_events: {
707                let capacity = get_message_broadcast_capacity();
708                let (tx, _) = broadcast::channel(capacity);
709                Arc::new(tx)
710            },
711            state_machine_manager: Arc::new(RwLock::new(
712                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713            )),
714            ws_broadcast: None,
715            lifecycle_hooks: None,
716            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717            #[cfg(feature = "chaos")]
718            chaos_api_state: None,
719            server_config: None,
720        }
721    }
722
723    /// Add lifecycle hook registry to management state
724    pub fn with_lifecycle_hooks(
725        mut self,
726        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727    ) -> Self {
728        self.lifecycle_hooks = Some(hooks);
729        self
730    }
731
732    /// Add WebSocket broadcast channel to management state
733    pub fn with_ws_broadcast(
734        mut self,
735        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736    ) -> Self {
737        self.ws_broadcast = Some(ws_broadcast);
738        self
739    }
740
741    /// Add proxy configuration to management state
742    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743        self.proxy_config = Some(proxy_config);
744        self
745    }
746
747    #[cfg(feature = "smtp")]
748    /// Add SMTP registry to management state
749    pub fn with_smtp_registry(
750        mut self,
751        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752    ) -> Self {
753        self.smtp_registry = Some(smtp_registry);
754        self
755    }
756
757    #[cfg(feature = "mqtt")]
758    /// Add MQTT broker to management state
759    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760        self.mqtt_broker = Some(mqtt_broker);
761        self
762    }
763
764    #[cfg(feature = "kafka")]
765    /// Add Kafka broker to management state
766    pub fn with_kafka_broker(
767        mut self,
768        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769    ) -> Self {
770        self.kafka_broker = Some(kafka_broker);
771        self
772    }
773
774    #[cfg(feature = "chaos")]
775    /// Add chaos API state to management state
776    pub fn with_chaos_api_state(
777        mut self,
778        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779    ) -> Self {
780        self.chaos_api_state = Some(chaos_api_state);
781        self
782    }
783
784    /// Add server configuration to management state
785    pub fn with_server_config(
786        mut self,
787        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788    ) -> Self {
789        self.server_config = Some(server_config);
790        self
791    }
792}
793
794/// List all mocks
795async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796    let mocks = state.mocks.read().await;
797    Json(serde_json::json!({
798        "mocks": *mocks,
799        "total": mocks.len(),
800        "enabled": mocks.iter().filter(|m| m.enabled).count()
801    }))
802}
803
804/// Get a specific mock by ID
805async fn get_mock(
806    State(state): State<ManagementState>,
807    Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809    let mocks = state.mocks.read().await;
810    mocks
811        .iter()
812        .find(|m| m.id == id)
813        .cloned()
814        .map(Json)
815        .ok_or(StatusCode::NOT_FOUND)
816}
817
818/// Create a new mock
819async fn create_mock(
820    State(state): State<ManagementState>,
821    Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823    let mut mocks = state.mocks.write().await;
824
825    // Generate ID if not provided
826    if mock.id.is_empty() {
827        mock.id = uuid::Uuid::new_v4().to_string();
828    }
829
830    // Check for duplicate ID
831    if mocks.iter().any(|m| m.id == mock.id) {
832        return Err(StatusCode::CONFLICT);
833    }
834
835    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837    // Invoke lifecycle hooks
838    if let Some(hooks) = &state.lifecycle_hooks {
839        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840            id: mock.id.clone(),
841            name: mock.name.clone(),
842            config: serde_json::to_value(&mock).unwrap_or_default(),
843        };
844        hooks.invoke_mock_created(&event).await;
845    }
846
847    mocks.push(mock.clone());
848
849    // Broadcast WebSocket event
850    if let Some(tx) = &state.ws_broadcast {
851        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852    }
853
854    Ok(Json(mock))
855}
856
857/// Update an existing mock
858async fn update_mock(
859    State(state): State<ManagementState>,
860    Path(id): Path<String>,
861    Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863    let mut mocks = state.mocks.write().await;
864
865    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867    // Get old mock for comparison
868    let old_mock = mocks[position].clone();
869
870    info!("Updating mock: {}", id);
871    mocks[position] = updated_mock.clone();
872
873    // Invoke lifecycle hooks
874    if let Some(hooks) = &state.lifecycle_hooks {
875        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876            id: updated_mock.id.clone(),
877            name: updated_mock.name.clone(),
878            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879        };
880        hooks.invoke_mock_updated(&event).await;
881
882        // Check if enabled state changed
883        if old_mock.enabled != updated_mock.enabled {
884            let state_event = if updated_mock.enabled {
885                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886                    id: updated_mock.id.clone(),
887                }
888            } else {
889                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890                    id: updated_mock.id.clone(),
891                }
892            };
893            hooks.invoke_mock_state_changed(&state_event).await;
894        }
895    }
896
897    // Broadcast WebSocket event
898    if let Some(tx) = &state.ws_broadcast {
899        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900    }
901
902    Ok(Json(updated_mock))
903}
904
905/// Delete a mock
906async fn delete_mock(
907    State(state): State<ManagementState>,
908    Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910    let mut mocks = state.mocks.write().await;
911
912    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914    // Get mock info before deletion for lifecycle hooks
915    let deleted_mock = mocks[position].clone();
916
917    info!("Deleting mock: {}", id);
918    mocks.remove(position);
919
920    // Invoke lifecycle hooks
921    if let Some(hooks) = &state.lifecycle_hooks {
922        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923            id: deleted_mock.id.clone(),
924            name: deleted_mock.name.clone(),
925        };
926        hooks.invoke_mock_deleted(&event).await;
927    }
928
929    // Broadcast WebSocket event
930    if let Some(tx) = &state.ws_broadcast {
931        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932    }
933
934    Ok(StatusCode::NO_CONTENT)
935}
936
937/// Request to validate configuration
938#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940    /// Configuration to validate (as JSON)
941    pub config: serde_json::Value,
942    /// Format of the configuration ("json" or "yaml")
943    #[serde(default = "default_format")]
944    pub format: String,
945}
946
947fn default_format() -> String {
948    "json".to_string()
949}
950
951/// Validate configuration without applying it
952async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953    use mockforge_core::config::ServerConfig;
954
955    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956        "yaml" | "yml" => {
957            let yaml_str = match serde_json::to_string(&request.config) {
958                Ok(s) => s,
959                Err(e) => {
960                    return (
961                        StatusCode::BAD_REQUEST,
962                        Json(serde_json::json!({
963                            "valid": false,
964                            "error": format!("Failed to convert to string: {}", e),
965                            "message": "Configuration validation failed"
966                        })),
967                    )
968                        .into_response();
969                }
970            };
971            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972        }
973        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974    };
975
976    match config_result {
977        Ok(_) => Json(serde_json::json!({
978            "valid": true,
979            "message": "Configuration is valid"
980        }))
981        .into_response(),
982        Err(e) => (
983            StatusCode::BAD_REQUEST,
984            Json(serde_json::json!({
985                "valid": false,
986                "error": format!("Invalid configuration: {}", e),
987                "message": "Configuration validation failed"
988            })),
989        )
990            .into_response(),
991    }
992}
993
994/// Request for bulk configuration update
995#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997    /// Partial configuration updates (only specified fields will be updated)
998    pub updates: serde_json::Value,
999}
1000
1001/// Bulk update configuration
1002///
1003/// This endpoint allows updating multiple configuration options at once.
1004/// Only the specified fields in the updates object will be modified.
1005///
1006/// Configuration updates are applied to the server configuration if available
1007/// in ManagementState. Changes take effect immediately for supported settings.
1008async fn bulk_update_config(
1009    State(_state): State<ManagementState>,
1010    Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012    // Validate the updates structure
1013    if !request.updates.is_object() {
1014        return (
1015            StatusCode::BAD_REQUEST,
1016            Json(serde_json::json!({
1017                "error": "Invalid request",
1018                "message": "Updates must be a JSON object"
1019            })),
1020        )
1021            .into_response();
1022    }
1023
1024    // Try to validate as partial ServerConfig
1025    use mockforge_core::config::ServerConfig;
1026
1027    // Create a minimal valid config and try to merge updates
1028    let base_config = ServerConfig::default();
1029    let base_json = match serde_json::to_value(&base_config) {
1030        Ok(v) => v,
1031        Err(e) => {
1032            return (
1033                StatusCode::INTERNAL_SERVER_ERROR,
1034                Json(serde_json::json!({
1035                    "error": "Internal error",
1036                    "message": format!("Failed to serialize base config: {}", e)
1037                })),
1038            )
1039                .into_response();
1040        }
1041    };
1042
1043    // Merge updates into base config (simplified merge)
1044    let mut merged = base_json.clone();
1045    if let (Some(merged_obj), Some(updates_obj)) =
1046        (merged.as_object_mut(), request.updates.as_object())
1047    {
1048        for (key, value) in updates_obj {
1049            merged_obj.insert(key.clone(), value.clone());
1050        }
1051    }
1052
1053    // Validate the merged config
1054    match serde_json::from_value::<ServerConfig>(merged) {
1055        Ok(_) => {
1056            // Config is valid
1057            // Note: Runtime application of config changes would require:
1058            // 1. Storing ServerConfig in ManagementState
1059            // 2. Implementing hot-reload mechanism for server configuration
1060            // 3. Updating router state and middleware based on new config
1061            // For now, this endpoint only validates the configuration structure
1062            Json(serde_json::json!({
1063                "success": true,
1064                "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
1065                "updates_received": request.updates,
1066                "validated": true
1067            }))
1068            .into_response()
1069        }
1070        Err(e) => (
1071            StatusCode::BAD_REQUEST,
1072            Json(serde_json::json!({
1073                "error": "Invalid configuration",
1074                "message": format!("Configuration validation failed: {}", e),
1075                "validated": false
1076            })),
1077        )
1078            .into_response(),
1079    }
1080}
1081
1082/// Get server statistics
1083async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1084    let mocks = state.mocks.read().await;
1085    let request_count = *state.request_counter.read().await;
1086
1087    Json(ServerStats {
1088        uptime_seconds: state.start_time.elapsed().as_secs(),
1089        total_requests: request_count,
1090        active_mocks: mocks.len(),
1091        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1092        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1093    })
1094}
1095
1096/// Get server configuration
1097async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1098    Json(ServerConfig {
1099        version: env!("CARGO_PKG_VERSION").to_string(),
1100        port: state.port,
1101        has_openapi_spec: state.spec.is_some(),
1102        spec_path: state.spec_path.clone(),
1103    })
1104}
1105
1106/// Health check endpoint
1107async fn health_check() -> Json<serde_json::Value> {
1108    Json(serde_json::json!({
1109        "status": "healthy",
1110        "service": "mockforge-management",
1111        "timestamp": chrono::Utc::now().to_rfc3339()
1112    }))
1113}
1114
1115/// Export format for mock configurations
1116#[derive(Debug, Clone, Serialize, Deserialize)]
1117#[serde(rename_all = "lowercase")]
1118pub enum ExportFormat {
1119    /// JSON format
1120    Json,
1121    /// YAML format
1122    Yaml,
1123}
1124
1125/// Export mocks in specified format
1126async fn export_mocks(
1127    State(state): State<ManagementState>,
1128    Query(params): Query<std::collections::HashMap<String, String>>,
1129) -> Result<(StatusCode, String), StatusCode> {
1130    let mocks = state.mocks.read().await;
1131
1132    let format = params
1133        .get("format")
1134        .map(|f| match f.as_str() {
1135            "yaml" | "yml" => ExportFormat::Yaml,
1136            _ => ExportFormat::Json,
1137        })
1138        .unwrap_or(ExportFormat::Json);
1139
1140    match format {
1141        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1142            .map(|json| (StatusCode::OK, json))
1143            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1144        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1145            .map(|yaml| (StatusCode::OK, yaml))
1146            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1147    }
1148}
1149
1150/// Import mocks from JSON/YAML
1151async fn import_mocks(
1152    State(state): State<ManagementState>,
1153    Json(mocks): Json<Vec<MockConfig>>,
1154) -> impl IntoResponse {
1155    let mut current_mocks = state.mocks.write().await;
1156    current_mocks.clear();
1157    current_mocks.extend(mocks);
1158    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1159}
1160
1161#[cfg(feature = "smtp")]
1162/// List SMTP emails in mailbox
1163async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1164    if let Some(ref smtp_registry) = state.smtp_registry {
1165        match smtp_registry.get_emails() {
1166            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1167            Err(e) => (
1168                StatusCode::INTERNAL_SERVER_ERROR,
1169                Json(serde_json::json!({
1170                    "error": "Failed to retrieve emails",
1171                    "message": e.to_string()
1172                })),
1173            ),
1174        }
1175    } else {
1176        (
1177            StatusCode::NOT_IMPLEMENTED,
1178            Json(serde_json::json!({
1179                "error": "SMTP mailbox management not available",
1180                "message": "SMTP server is not enabled or registry not available."
1181            })),
1182        )
1183    }
1184}
1185
1186/// Get specific SMTP email
1187#[cfg(feature = "smtp")]
1188async fn get_smtp_email(
1189    State(state): State<ManagementState>,
1190    Path(id): Path<String>,
1191) -> impl IntoResponse {
1192    if let Some(ref smtp_registry) = state.smtp_registry {
1193        match smtp_registry.get_email_by_id(&id) {
1194            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1195            Ok(None) => (
1196                StatusCode::NOT_FOUND,
1197                Json(serde_json::json!({
1198                    "error": "Email not found",
1199                    "id": id
1200                })),
1201            ),
1202            Err(e) => (
1203                StatusCode::INTERNAL_SERVER_ERROR,
1204                Json(serde_json::json!({
1205                    "error": "Failed to retrieve email",
1206                    "message": e.to_string()
1207                })),
1208            ),
1209        }
1210    } else {
1211        (
1212            StatusCode::NOT_IMPLEMENTED,
1213            Json(serde_json::json!({
1214                "error": "SMTP mailbox management not available",
1215                "message": "SMTP server is not enabled or registry not available."
1216            })),
1217        )
1218    }
1219}
1220
1221/// Clear SMTP mailbox
1222#[cfg(feature = "smtp")]
1223async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1224    if let Some(ref smtp_registry) = state.smtp_registry {
1225        match smtp_registry.clear_mailbox() {
1226            Ok(()) => (
1227                StatusCode::OK,
1228                Json(serde_json::json!({
1229                    "message": "Mailbox cleared successfully"
1230                })),
1231            ),
1232            Err(e) => (
1233                StatusCode::INTERNAL_SERVER_ERROR,
1234                Json(serde_json::json!({
1235                    "error": "Failed to clear mailbox",
1236                    "message": e.to_string()
1237                })),
1238            ),
1239        }
1240    } else {
1241        (
1242            StatusCode::NOT_IMPLEMENTED,
1243            Json(serde_json::json!({
1244                "error": "SMTP mailbox management not available",
1245                "message": "SMTP server is not enabled or registry not available."
1246            })),
1247        )
1248    }
1249}
1250
1251/// Export SMTP mailbox
1252#[cfg(feature = "smtp")]
1253async fn export_smtp_mailbox(
1254    Query(params): Query<std::collections::HashMap<String, String>>,
1255) -> impl IntoResponse {
1256    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1257    (
1258        StatusCode::NOT_IMPLEMENTED,
1259        Json(serde_json::json!({
1260            "error": "SMTP mailbox management not available via HTTP API",
1261            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1262            "requested_format": format
1263        })),
1264    )
1265}
1266
1267/// Search SMTP emails
1268#[cfg(feature = "smtp")]
1269async fn search_smtp_emails(
1270    State(state): State<ManagementState>,
1271    Query(params): Query<std::collections::HashMap<String, String>>,
1272) -> impl IntoResponse {
1273    if let Some(ref smtp_registry) = state.smtp_registry {
1274        let filters = EmailSearchFilters {
1275            sender: params.get("sender").cloned(),
1276            recipient: params.get("recipient").cloned(),
1277            subject: params.get("subject").cloned(),
1278            body: params.get("body").cloned(),
1279            since: params
1280                .get("since")
1281                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1282                .map(|dt| dt.with_timezone(&chrono::Utc)),
1283            until: params
1284                .get("until")
1285                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1286                .map(|dt| dt.with_timezone(&chrono::Utc)),
1287            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1288            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1289        };
1290
1291        match smtp_registry.search_emails(filters) {
1292            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1293            Err(e) => (
1294                StatusCode::INTERNAL_SERVER_ERROR,
1295                Json(serde_json::json!({
1296                    "error": "Failed to search emails",
1297                    "message": e.to_string()
1298                })),
1299            ),
1300        }
1301    } else {
1302        (
1303            StatusCode::NOT_IMPLEMENTED,
1304            Json(serde_json::json!({
1305                "error": "SMTP mailbox management not available",
1306                "message": "SMTP server is not enabled or registry not available."
1307            })),
1308        )
1309    }
1310}
1311
1312/// MQTT broker statistics
1313#[cfg(feature = "mqtt")]
1314#[derive(Debug, Clone, Serialize, Deserialize)]
1315pub struct MqttBrokerStats {
1316    /// Number of connected MQTT clients
1317    pub connected_clients: usize,
1318    /// Number of active MQTT topics
1319    pub active_topics: usize,
1320    /// Number of retained messages
1321    pub retained_messages: usize,
1322    /// Total number of subscriptions
1323    pub total_subscriptions: usize,
1324}
1325
1326/// MQTT management handlers
1327#[cfg(feature = "mqtt")]
1328async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1329    if let Some(broker) = &state.mqtt_broker {
1330        let connected_clients = broker.get_connected_clients().await.len();
1331        let active_topics = broker.get_active_topics().await.len();
1332        let stats = broker.get_topic_stats().await;
1333
1334        let broker_stats = MqttBrokerStats {
1335            connected_clients,
1336            active_topics,
1337            retained_messages: stats.retained_messages,
1338            total_subscriptions: stats.total_subscriptions,
1339        };
1340
1341        Json(broker_stats).into_response()
1342    } else {
1343        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1344    }
1345}
1346
1347#[cfg(feature = "mqtt")]
1348async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1349    if let Some(broker) = &state.mqtt_broker {
1350        let clients = broker.get_connected_clients().await;
1351        Json(serde_json::json!({
1352            "clients": clients
1353        }))
1354        .into_response()
1355    } else {
1356        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1357    }
1358}
1359
1360#[cfg(feature = "mqtt")]
1361async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1362    if let Some(broker) = &state.mqtt_broker {
1363        let topics = broker.get_active_topics().await;
1364        Json(serde_json::json!({
1365            "topics": topics
1366        }))
1367        .into_response()
1368    } else {
1369        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1370    }
1371}
1372
1373#[cfg(feature = "mqtt")]
1374async fn disconnect_mqtt_client(
1375    State(state): State<ManagementState>,
1376    Path(client_id): Path<String>,
1377) -> impl IntoResponse {
1378    if let Some(broker) = &state.mqtt_broker {
1379        match broker.disconnect_client(&client_id).await {
1380            Ok(_) => {
1381                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1382            }
1383            Err(e) => {
1384                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1385                    .into_response()
1386            }
1387        }
1388    } else {
1389        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1390    }
1391}
1392
1393// ========== MQTT Publish Handler ==========
1394
1395#[cfg(feature = "mqtt")]
1396/// Request to publish a single MQTT message
1397#[derive(Debug, Deserialize)]
1398pub struct MqttPublishRequest {
1399    /// Topic to publish to
1400    pub topic: String,
1401    /// Message payload (string or JSON)
1402    pub payload: String,
1403    /// QoS level (0, 1, or 2)
1404    #[serde(default = "default_qos")]
1405    pub qos: u8,
1406    /// Whether to retain the message
1407    #[serde(default)]
1408    pub retain: bool,
1409}
1410
1411#[cfg(feature = "mqtt")]
1412fn default_qos() -> u8 {
1413    0
1414}
1415
1416#[cfg(feature = "mqtt")]
1417/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1418async fn publish_mqtt_message_handler(
1419    State(state): State<ManagementState>,
1420    Json(request): Json<serde_json::Value>,
1421) -> impl IntoResponse {
1422    // Extract fields from JSON manually
1423    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1424    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1425    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1426    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1427
1428    if topic.is_none() || payload.is_none() {
1429        return (
1430            StatusCode::BAD_REQUEST,
1431            Json(serde_json::json!({
1432                "error": "Invalid request",
1433                "message": "Missing required fields: topic and payload"
1434            })),
1435        );
1436    }
1437
1438    let topic = topic.unwrap();
1439    let payload = payload.unwrap();
1440
1441    if let Some(broker) = &state.mqtt_broker {
1442        // Validate QoS
1443        if qos > 2 {
1444            return (
1445                StatusCode::BAD_REQUEST,
1446                Json(serde_json::json!({
1447                    "error": "Invalid QoS",
1448                    "message": "QoS must be 0, 1, or 2"
1449                })),
1450            );
1451        }
1452
1453        // Convert payload to bytes
1454        let payload_bytes = payload.as_bytes().to_vec();
1455        let client_id = "mockforge-management-api".to_string();
1456
1457        let publish_result = broker
1458            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1459            .await
1460            .map_err(|e| format!("{}", e));
1461
1462        match publish_result {
1463            Ok(_) => {
1464                // Emit message event for real-time monitoring
1465                let event = MessageEvent::Mqtt(MqttMessageEvent {
1466                    topic: topic.clone(),
1467                    payload: payload.clone(),
1468                    qos,
1469                    retain,
1470                    timestamp: chrono::Utc::now().to_rfc3339(),
1471                });
1472                let _ = state.message_events.send(event);
1473
1474                (
1475                    StatusCode::OK,
1476                    Json(serde_json::json!({
1477                        "success": true,
1478                        "message": format!("Message published to topic '{}'", topic),
1479                        "topic": topic,
1480                        "qos": qos,
1481                        "retain": retain
1482                    })),
1483                )
1484            }
1485            Err(error_msg) => (
1486                StatusCode::INTERNAL_SERVER_ERROR,
1487                Json(serde_json::json!({
1488                    "error": "Failed to publish message",
1489                    "message": error_msg
1490                })),
1491            ),
1492        }
1493    } else {
1494        (
1495            StatusCode::SERVICE_UNAVAILABLE,
1496            Json(serde_json::json!({
1497                "error": "MQTT broker not available",
1498                "message": "MQTT broker is not enabled or not available."
1499            })),
1500        )
1501    }
1502}
1503
1504#[cfg(not(feature = "mqtt"))]
1505/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1506async fn publish_mqtt_message_handler(
1507    State(_state): State<ManagementState>,
1508    Json(_request): Json<serde_json::Value>,
1509) -> impl IntoResponse {
1510    (
1511        StatusCode::SERVICE_UNAVAILABLE,
1512        Json(serde_json::json!({
1513            "error": "MQTT feature not enabled",
1514            "message": "MQTT support is not compiled into this build"
1515        })),
1516    )
1517}
1518
1519#[cfg(feature = "mqtt")]
1520/// Request to publish multiple MQTT messages
1521#[derive(Debug, Deserialize)]
1522pub struct MqttBatchPublishRequest {
1523    /// List of messages to publish
1524    pub messages: Vec<MqttPublishRequest>,
1525    /// Delay between messages in milliseconds
1526    #[serde(default = "default_delay")]
1527    pub delay_ms: u64,
1528}
1529
1530#[cfg(feature = "mqtt")]
1531fn default_delay() -> u64 {
1532    100
1533}
1534
1535#[cfg(feature = "mqtt")]
1536/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1537async fn publish_mqtt_batch_handler(
1538    State(state): State<ManagementState>,
1539    Json(request): Json<serde_json::Value>,
1540) -> impl IntoResponse {
1541    // Extract fields from JSON manually
1542    let messages_json = request.get("messages").and_then(|v| v.as_array());
1543    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1544
1545    if messages_json.is_none() {
1546        return (
1547            StatusCode::BAD_REQUEST,
1548            Json(serde_json::json!({
1549                "error": "Invalid request",
1550                "message": "Missing required field: messages"
1551            })),
1552        );
1553    }
1554
1555    let messages_json = messages_json.unwrap();
1556
1557    if let Some(broker) = &state.mqtt_broker {
1558        if messages_json.is_empty() {
1559            return (
1560                StatusCode::BAD_REQUEST,
1561                Json(serde_json::json!({
1562                    "error": "Empty batch",
1563                    "message": "At least one message is required"
1564                })),
1565            );
1566        }
1567
1568        let mut results = Vec::new();
1569        let client_id = "mockforge-management-api".to_string();
1570
1571        for (index, msg_json) in messages_json.iter().enumerate() {
1572            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1573            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1574            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1575            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1576
1577            if topic.is_none() || payload.is_none() {
1578                results.push(serde_json::json!({
1579                    "index": index,
1580                    "success": false,
1581                    "error": "Missing required fields: topic and payload"
1582                }));
1583                continue;
1584            }
1585
1586            let topic = topic.unwrap();
1587            let payload = payload.unwrap();
1588
1589            // Validate QoS
1590            if qos > 2 {
1591                results.push(serde_json::json!({
1592                    "index": index,
1593                    "success": false,
1594                    "error": "Invalid QoS (must be 0, 1, or 2)"
1595                }));
1596                continue;
1597            }
1598
1599            // Convert payload to bytes
1600            let payload_bytes = payload.as_bytes().to_vec();
1601
1602            let publish_result = broker
1603                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1604                .await
1605                .map_err(|e| format!("{}", e));
1606
1607            match publish_result {
1608                Ok(_) => {
1609                    // Emit message event
1610                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1611                        topic: topic.clone(),
1612                        payload: payload.clone(),
1613                        qos,
1614                        retain,
1615                        timestamp: chrono::Utc::now().to_rfc3339(),
1616                    });
1617                    let _ = state.message_events.send(event);
1618
1619                    results.push(serde_json::json!({
1620                        "index": index,
1621                        "success": true,
1622                        "topic": topic,
1623                        "qos": qos
1624                    }));
1625                }
1626                Err(error_msg) => {
1627                    results.push(serde_json::json!({
1628                        "index": index,
1629                        "success": false,
1630                        "error": error_msg
1631                    }));
1632                }
1633            }
1634
1635            // Add delay between messages (except for the last one)
1636            if index < messages_json.len() - 1 && delay_ms > 0 {
1637                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1638            }
1639        }
1640
1641        let success_count =
1642            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1643
1644        (
1645            StatusCode::OK,
1646            Json(serde_json::json!({
1647                "success": true,
1648                "total": messages_json.len(),
1649                "succeeded": success_count,
1650                "failed": messages_json.len() - success_count,
1651                "results": results
1652            })),
1653        )
1654    } else {
1655        (
1656            StatusCode::SERVICE_UNAVAILABLE,
1657            Json(serde_json::json!({
1658                "error": "MQTT broker not available",
1659                "message": "MQTT broker is not enabled or not available."
1660            })),
1661        )
1662    }
1663}
1664
1665#[cfg(not(feature = "mqtt"))]
1666/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1667async fn publish_mqtt_batch_handler(
1668    State(_state): State<ManagementState>,
1669    Json(_request): Json<serde_json::Value>,
1670) -> impl IntoResponse {
1671    (
1672        StatusCode::SERVICE_UNAVAILABLE,
1673        Json(serde_json::json!({
1674            "error": "MQTT feature not enabled",
1675            "message": "MQTT support is not compiled into this build"
1676        })),
1677    )
1678}
1679
1680// Migration pipeline handlers
1681
1682/// Request to set migration mode
1683#[derive(Debug, Deserialize)]
1684struct SetMigrationModeRequest {
1685    mode: String,
1686}
1687
1688/// Get all migration routes
1689async fn get_migration_routes(
1690    State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692    let proxy_config = match &state.proxy_config {
1693        Some(config) => config,
1694        None => {
1695            return Ok(Json(serde_json::json!({
1696                "error": "Migration not configured. Proxy config not available."
1697            })));
1698        }
1699    };
1700
1701    let config = proxy_config.read().await;
1702    let routes = config.get_migration_routes();
1703
1704    Ok(Json(serde_json::json!({
1705        "routes": routes
1706    })))
1707}
1708
1709/// Toggle a route's migration mode
1710async fn toggle_route_migration(
1711    State(state): State<ManagementState>,
1712    Path(pattern): Path<String>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714    let proxy_config = match &state.proxy_config {
1715        Some(config) => config,
1716        None => {
1717            return Ok(Json(serde_json::json!({
1718                "error": "Migration not configured. Proxy config not available."
1719            })));
1720        }
1721    };
1722
1723    let mut config = proxy_config.write().await;
1724    let new_mode = match config.toggle_route_migration(&pattern) {
1725        Some(mode) => mode,
1726        None => {
1727            return Ok(Json(serde_json::json!({
1728                "error": format!("Route pattern not found: {}", pattern)
1729            })));
1730        }
1731    };
1732
1733    Ok(Json(serde_json::json!({
1734        "pattern": pattern,
1735        "mode": format!("{:?}", new_mode).to_lowercase()
1736    })))
1737}
1738
1739/// Set a route's migration mode explicitly
1740async fn set_route_migration_mode(
1741    State(state): State<ManagementState>,
1742    Path(pattern): Path<String>,
1743    Json(request): Json<SetMigrationModeRequest>,
1744) -> Result<Json<serde_json::Value>, StatusCode> {
1745    let proxy_config = match &state.proxy_config {
1746        Some(config) => config,
1747        None => {
1748            return Ok(Json(serde_json::json!({
1749                "error": "Migration not configured. Proxy config not available."
1750            })));
1751        }
1752    };
1753
1754    use mockforge_core::proxy::config::MigrationMode;
1755    let mode = match request.mode.to_lowercase().as_str() {
1756        "mock" => MigrationMode::Mock,
1757        "shadow" => MigrationMode::Shadow,
1758        "real" => MigrationMode::Real,
1759        "auto" => MigrationMode::Auto,
1760        _ => {
1761            return Ok(Json(serde_json::json!({
1762                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1763            })));
1764        }
1765    };
1766
1767    let mut config = proxy_config.write().await;
1768    let updated = config.update_rule_migration_mode(&pattern, mode);
1769
1770    if !updated {
1771        return Ok(Json(serde_json::json!({
1772            "error": format!("Route pattern not found: {}", pattern)
1773        })));
1774    }
1775
1776    Ok(Json(serde_json::json!({
1777        "pattern": pattern,
1778        "mode": format!("{:?}", mode).to_lowercase()
1779    })))
1780}
1781
1782/// Toggle a group's migration mode
1783async fn toggle_group_migration(
1784    State(state): State<ManagementState>,
1785    Path(group): Path<String>,
1786) -> Result<Json<serde_json::Value>, StatusCode> {
1787    let proxy_config = match &state.proxy_config {
1788        Some(config) => config,
1789        None => {
1790            return Ok(Json(serde_json::json!({
1791                "error": "Migration not configured. Proxy config not available."
1792            })));
1793        }
1794    };
1795
1796    let mut config = proxy_config.write().await;
1797    let new_mode = config.toggle_group_migration(&group);
1798
1799    Ok(Json(serde_json::json!({
1800        "group": group,
1801        "mode": format!("{:?}", new_mode).to_lowercase()
1802    })))
1803}
1804
1805/// Set a group's migration mode explicitly
1806async fn set_group_migration_mode(
1807    State(state): State<ManagementState>,
1808    Path(group): Path<String>,
1809    Json(request): Json<SetMigrationModeRequest>,
1810) -> Result<Json<serde_json::Value>, StatusCode> {
1811    let proxy_config = match &state.proxy_config {
1812        Some(config) => config,
1813        None => {
1814            return Ok(Json(serde_json::json!({
1815                "error": "Migration not configured. Proxy config not available."
1816            })));
1817        }
1818    };
1819
1820    use mockforge_core::proxy::config::MigrationMode;
1821    let mode = match request.mode.to_lowercase().as_str() {
1822        "mock" => MigrationMode::Mock,
1823        "shadow" => MigrationMode::Shadow,
1824        "real" => MigrationMode::Real,
1825        "auto" => MigrationMode::Auto,
1826        _ => {
1827            return Ok(Json(serde_json::json!({
1828                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1829            })));
1830        }
1831    };
1832
1833    let mut config = proxy_config.write().await;
1834    config.update_group_migration_mode(&group, mode);
1835
1836    Ok(Json(serde_json::json!({
1837        "group": group,
1838        "mode": format!("{:?}", mode).to_lowercase()
1839    })))
1840}
1841
1842/// Get all migration groups
1843async fn get_migration_groups(
1844    State(state): State<ManagementState>,
1845) -> Result<Json<serde_json::Value>, StatusCode> {
1846    let proxy_config = match &state.proxy_config {
1847        Some(config) => config,
1848        None => {
1849            return Ok(Json(serde_json::json!({
1850                "error": "Migration not configured. Proxy config not available."
1851            })));
1852        }
1853    };
1854
1855    let config = proxy_config.read().await;
1856    let groups = config.get_migration_groups();
1857
1858    // Convert to JSON-serializable format
1859    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1860        .into_iter()
1861        .map(|(name, info)| {
1862            (
1863                name,
1864                serde_json::json!({
1865                    "name": info.name,
1866                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1867                    "route_count": info.route_count
1868                }),
1869            )
1870        })
1871        .collect();
1872
1873    Ok(Json(serde_json::json!(groups_json)))
1874}
1875
1876/// Get overall migration status
1877async fn get_migration_status(
1878    State(state): State<ManagementState>,
1879) -> Result<Json<serde_json::Value>, StatusCode> {
1880    let proxy_config = match &state.proxy_config {
1881        Some(config) => config,
1882        None => {
1883            return Ok(Json(serde_json::json!({
1884                "error": "Migration not configured. Proxy config not available."
1885            })));
1886        }
1887    };
1888
1889    let config = proxy_config.read().await;
1890    let routes = config.get_migration_routes();
1891    let groups = config.get_migration_groups();
1892
1893    let mut mock_count = 0;
1894    let mut shadow_count = 0;
1895    let mut real_count = 0;
1896    let mut auto_count = 0;
1897
1898    for route in &routes {
1899        match route.migration_mode {
1900            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1901            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1902            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1903            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1904        }
1905    }
1906
1907    Ok(Json(serde_json::json!({
1908        "total_routes": routes.len(),
1909        "mock_routes": mock_count,
1910        "shadow_routes": shadow_count,
1911        "real_routes": real_count,
1912        "auto_routes": auto_count,
1913        "total_groups": groups.len(),
1914        "migration_enabled": config.migration_enabled
1915    })))
1916}
1917
1918// ========== Proxy Replacement Rules Management ==========
1919
1920/// Request body for creating/updating proxy replacement rules
1921#[derive(Debug, Deserialize, Serialize)]
1922pub struct ProxyRuleRequest {
1923    /// URL pattern to match (supports wildcards like "/api/users/*")
1924    pub pattern: String,
1925    /// Rule type: "request" or "response"
1926    #[serde(rename = "type")]
1927    pub rule_type: String,
1928    /// Optional status code filter for response rules
1929    #[serde(default)]
1930    pub status_codes: Vec<u16>,
1931    /// Body transformations to apply
1932    pub body_transforms: Vec<BodyTransformRequest>,
1933    /// Whether this rule is enabled
1934    #[serde(default = "default_true")]
1935    pub enabled: bool,
1936}
1937
1938/// Request body for individual body transformations
1939#[derive(Debug, Deserialize, Serialize)]
1940pub struct BodyTransformRequest {
1941    /// JSONPath expression to target (e.g., "$.userId", "$.email")
1942    pub path: String,
1943    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
1944    pub replace: String,
1945    /// Operation to perform: "replace", "add", or "remove"
1946    #[serde(default)]
1947    pub operation: String,
1948}
1949
1950/// Response format for proxy rules
1951#[derive(Debug, Serialize)]
1952pub struct ProxyRuleResponse {
1953    /// Rule ID (index in the array)
1954    pub id: usize,
1955    /// URL pattern
1956    pub pattern: String,
1957    /// Rule type
1958    #[serde(rename = "type")]
1959    pub rule_type: String,
1960    /// Status codes (for response rules)
1961    pub status_codes: Vec<u16>,
1962    /// Body transformations
1963    pub body_transforms: Vec<BodyTransformRequest>,
1964    /// Whether enabled
1965    pub enabled: bool,
1966}
1967
1968/// List all proxy replacement rules
1969async fn list_proxy_rules(
1970    State(state): State<ManagementState>,
1971) -> Result<Json<serde_json::Value>, StatusCode> {
1972    let proxy_config = match &state.proxy_config {
1973        Some(config) => config,
1974        None => {
1975            return Ok(Json(serde_json::json!({
1976                "error": "Proxy not configured. Proxy config not available."
1977            })));
1978        }
1979    };
1980
1981    let config = proxy_config.read().await;
1982
1983    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1984
1985    // Add request replacement rules
1986    for (idx, rule) in config.request_replacements.iter().enumerate() {
1987        rules.push(ProxyRuleResponse {
1988            id: idx,
1989            pattern: rule.pattern.clone(),
1990            rule_type: "request".to_string(),
1991            status_codes: Vec::new(),
1992            body_transforms: rule
1993                .body_transforms
1994                .iter()
1995                .map(|t| BodyTransformRequest {
1996                    path: t.path.clone(),
1997                    replace: t.replace.clone(),
1998                    operation: format!("{:?}", t.operation).to_lowercase(),
1999                })
2000                .collect(),
2001            enabled: rule.enabled,
2002        });
2003    }
2004
2005    // Add response replacement rules
2006    let request_count = config.request_replacements.len();
2007    for (idx, rule) in config.response_replacements.iter().enumerate() {
2008        rules.push(ProxyRuleResponse {
2009            id: request_count + idx,
2010            pattern: rule.pattern.clone(),
2011            rule_type: "response".to_string(),
2012            status_codes: rule.status_codes.clone(),
2013            body_transforms: rule
2014                .body_transforms
2015                .iter()
2016                .map(|t| BodyTransformRequest {
2017                    path: t.path.clone(),
2018                    replace: t.replace.clone(),
2019                    operation: format!("{:?}", t.operation).to_lowercase(),
2020                })
2021                .collect(),
2022            enabled: rule.enabled,
2023        });
2024    }
2025
2026    Ok(Json(serde_json::json!({
2027        "rules": rules
2028    })))
2029}
2030
2031/// Create a new proxy replacement rule
2032async fn create_proxy_rule(
2033    State(state): State<ManagementState>,
2034    Json(request): Json<ProxyRuleRequest>,
2035) -> Result<Json<serde_json::Value>, StatusCode> {
2036    let proxy_config = match &state.proxy_config {
2037        Some(config) => config,
2038        None => {
2039            return Ok(Json(serde_json::json!({
2040                "error": "Proxy not configured. Proxy config not available."
2041            })));
2042        }
2043    };
2044
2045    // Validate request
2046    if request.body_transforms.is_empty() {
2047        return Ok(Json(serde_json::json!({
2048            "error": "At least one body transform is required"
2049        })));
2050    }
2051
2052    let body_transforms: Vec<BodyTransform> = request
2053        .body_transforms
2054        .iter()
2055        .map(|t| {
2056            let op = match t.operation.as_str() {
2057                "replace" => TransformOperation::Replace,
2058                "add" => TransformOperation::Add,
2059                "remove" => TransformOperation::Remove,
2060                _ => TransformOperation::Replace,
2061            };
2062            BodyTransform {
2063                path: t.path.clone(),
2064                replace: t.replace.clone(),
2065                operation: op,
2066            }
2067        })
2068        .collect();
2069
2070    let new_rule = BodyTransformRule {
2071        pattern: request.pattern.clone(),
2072        status_codes: request.status_codes.clone(),
2073        body_transforms,
2074        enabled: request.enabled,
2075    };
2076
2077    let mut config = proxy_config.write().await;
2078
2079    let rule_id = if request.rule_type == "request" {
2080        config.request_replacements.push(new_rule);
2081        config.request_replacements.len() - 1
2082    } else if request.rule_type == "response" {
2083        config.response_replacements.push(new_rule);
2084        config.request_replacements.len() + config.response_replacements.len() - 1
2085    } else {
2086        return Ok(Json(serde_json::json!({
2087            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2088        })));
2089    };
2090
2091    Ok(Json(serde_json::json!({
2092        "id": rule_id,
2093        "message": "Rule created successfully"
2094    })))
2095}
2096
2097/// Get a specific proxy replacement rule
2098async fn get_proxy_rule(
2099    State(state): State<ManagementState>,
2100    Path(id): Path<String>,
2101) -> Result<Json<serde_json::Value>, StatusCode> {
2102    let proxy_config = match &state.proxy_config {
2103        Some(config) => config,
2104        None => {
2105            return Ok(Json(serde_json::json!({
2106                "error": "Proxy not configured. Proxy config not available."
2107            })));
2108        }
2109    };
2110
2111    let config = proxy_config.read().await;
2112    let rule_id: usize = match id.parse() {
2113        Ok(id) => id,
2114        Err(_) => {
2115            return Ok(Json(serde_json::json!({
2116                "error": format!("Invalid rule ID: {}", id)
2117            })));
2118        }
2119    };
2120
2121    let request_count = config.request_replacements.len();
2122
2123    if rule_id < request_count {
2124        // Request rule
2125        let rule = &config.request_replacements[rule_id];
2126        Ok(Json(serde_json::json!({
2127            "id": rule_id,
2128            "pattern": rule.pattern,
2129            "type": "request",
2130            "status_codes": [],
2131            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2132                "path": t.path,
2133                "replace": t.replace,
2134                "operation": format!("{:?}", t.operation).to_lowercase()
2135            })).collect::<Vec<_>>(),
2136            "enabled": rule.enabled
2137        })))
2138    } else if rule_id < request_count + config.response_replacements.len() {
2139        // Response rule
2140        let response_idx = rule_id - request_count;
2141        let rule = &config.response_replacements[response_idx];
2142        Ok(Json(serde_json::json!({
2143            "id": rule_id,
2144            "pattern": rule.pattern,
2145            "type": "response",
2146            "status_codes": rule.status_codes,
2147            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2148                "path": t.path,
2149                "replace": t.replace,
2150                "operation": format!("{:?}", t.operation).to_lowercase()
2151            })).collect::<Vec<_>>(),
2152            "enabled": rule.enabled
2153        })))
2154    } else {
2155        Ok(Json(serde_json::json!({
2156            "error": format!("Rule ID {} not found", rule_id)
2157        })))
2158    }
2159}
2160
2161/// Update a proxy replacement rule
2162async fn update_proxy_rule(
2163    State(state): State<ManagementState>,
2164    Path(id): Path<String>,
2165    Json(request): Json<ProxyRuleRequest>,
2166) -> Result<Json<serde_json::Value>, StatusCode> {
2167    let proxy_config = match &state.proxy_config {
2168        Some(config) => config,
2169        None => {
2170            return Ok(Json(serde_json::json!({
2171                "error": "Proxy not configured. Proxy config not available."
2172            })));
2173        }
2174    };
2175
2176    let mut config = proxy_config.write().await;
2177    let rule_id: usize = match id.parse() {
2178        Ok(id) => id,
2179        Err(_) => {
2180            return Ok(Json(serde_json::json!({
2181                "error": format!("Invalid rule ID: {}", id)
2182            })));
2183        }
2184    };
2185
2186    let body_transforms: Vec<BodyTransform> = request
2187        .body_transforms
2188        .iter()
2189        .map(|t| {
2190            let op = match t.operation.as_str() {
2191                "replace" => TransformOperation::Replace,
2192                "add" => TransformOperation::Add,
2193                "remove" => TransformOperation::Remove,
2194                _ => TransformOperation::Replace,
2195            };
2196            BodyTransform {
2197                path: t.path.clone(),
2198                replace: t.replace.clone(),
2199                operation: op,
2200            }
2201        })
2202        .collect();
2203
2204    let updated_rule = BodyTransformRule {
2205        pattern: request.pattern.clone(),
2206        status_codes: request.status_codes.clone(),
2207        body_transforms,
2208        enabled: request.enabled,
2209    };
2210
2211    let request_count = config.request_replacements.len();
2212
2213    if rule_id < request_count {
2214        // Update request rule
2215        config.request_replacements[rule_id] = updated_rule;
2216    } else if rule_id < request_count + config.response_replacements.len() {
2217        // Update response rule
2218        let response_idx = rule_id - request_count;
2219        config.response_replacements[response_idx] = updated_rule;
2220    } else {
2221        return Ok(Json(serde_json::json!({
2222            "error": format!("Rule ID {} not found", rule_id)
2223        })));
2224    }
2225
2226    Ok(Json(serde_json::json!({
2227        "id": rule_id,
2228        "message": "Rule updated successfully"
2229    })))
2230}
2231
2232/// Delete a proxy replacement rule
2233async fn delete_proxy_rule(
2234    State(state): State<ManagementState>,
2235    Path(id): Path<String>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237    let proxy_config = match &state.proxy_config {
2238        Some(config) => config,
2239        None => {
2240            return Ok(Json(serde_json::json!({
2241                "error": "Proxy not configured. Proxy config not available."
2242            })));
2243        }
2244    };
2245
2246    let mut config = proxy_config.write().await;
2247    let rule_id: usize = match id.parse() {
2248        Ok(id) => id,
2249        Err(_) => {
2250            return Ok(Json(serde_json::json!({
2251                "error": format!("Invalid rule ID: {}", id)
2252            })));
2253        }
2254    };
2255
2256    let request_count = config.request_replacements.len();
2257
2258    if rule_id < request_count {
2259        // Delete request rule
2260        config.request_replacements.remove(rule_id);
2261    } else if rule_id < request_count + config.response_replacements.len() {
2262        // Delete response rule
2263        let response_idx = rule_id - request_count;
2264        config.response_replacements.remove(response_idx);
2265    } else {
2266        return Ok(Json(serde_json::json!({
2267            "error": format!("Rule ID {} not found", rule_id)
2268        })));
2269    }
2270
2271    Ok(Json(serde_json::json!({
2272        "id": rule_id,
2273        "message": "Rule deleted successfully"
2274    })))
2275}
2276
2277/// Get recent intercepted requests/responses for inspection
2278/// This is a placeholder - in a full implementation, you'd track intercepted traffic
2279async fn get_proxy_inspect(
2280    State(state): State<ManagementState>,
2281    Query(params): Query<std::collections::HashMap<String, String>>,
2282) -> Result<Json<serde_json::Value>, StatusCode> {
2283    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2284    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2285
2286    let proxy_config = match &state.proxy_config {
2287        Some(config) => config.read().await,
2288        None => {
2289            return Ok(Json(serde_json::json!({
2290                "error": "Proxy not configured. Proxy config not available."
2291            })));
2292        }
2293    };
2294
2295    let mut rules = Vec::new();
2296    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2297        rules.push(serde_json::json!({
2298            "id": idx,
2299            "kind": "request",
2300            "pattern": rule.pattern,
2301            "enabled": rule.enabled,
2302            "status_codes": rule.status_codes,
2303            "transform_count": rule.body_transforms.len(),
2304            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2305                "path": t.path,
2306                "operation": t.operation,
2307                "replace": t.replace
2308            })).collect::<Vec<_>>()
2309        }));
2310    }
2311    let request_rule_count = rules.len();
2312    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2313        rules.push(serde_json::json!({
2314            "id": request_rule_count + idx,
2315            "kind": "response",
2316            "pattern": rule.pattern,
2317            "enabled": rule.enabled,
2318            "status_codes": rule.status_codes,
2319            "transform_count": rule.body_transforms.len(),
2320            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2321                "path": t.path,
2322                "operation": t.operation,
2323                "replace": t.replace
2324            })).collect::<Vec<_>>()
2325        }));
2326    }
2327
2328    let total = rules.len();
2329    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2330
2331    Ok(Json(serde_json::json!({
2332        "enabled": proxy_config.enabled,
2333        "target_url": proxy_config.target_url,
2334        "prefix": proxy_config.prefix,
2335        "timeout_seconds": proxy_config.timeout_seconds,
2336        "follow_redirects": proxy_config.follow_redirects,
2337        "passthrough_by_default": proxy_config.passthrough_by_default,
2338        "rules": paged_rules,
2339        "request_rule_count": request_rule_count,
2340        "response_rule_count": total.saturating_sub(request_rule_count),
2341        "limit": limit,
2342        "offset": offset,
2343        "total": total
2344    })))
2345}
2346
2347/// Build the management API router
2348pub fn management_router(state: ManagementState) -> Router {
2349    let router = Router::new()
2350        .route("/health", get(health_check))
2351        .route("/stats", get(get_stats))
2352        .route("/config", get(get_config))
2353        .route("/config/validate", post(validate_config))
2354        .route("/config/bulk", post(bulk_update_config))
2355        .route("/mocks", get(list_mocks))
2356        .route("/mocks", post(create_mock))
2357        .route("/mocks/{id}", get(get_mock))
2358        .route("/mocks/{id}", put(update_mock))
2359        .route("/mocks/{id}", delete(delete_mock))
2360        .route("/export", get(export_mocks))
2361        .route("/import", post(import_mocks));
2362
2363    #[cfg(feature = "smtp")]
2364    let router = router
2365        .route("/smtp/mailbox", get(list_smtp_emails))
2366        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2367        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2368        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2369        .route("/smtp/mailbox/search", get(search_smtp_emails));
2370
2371    #[cfg(not(feature = "smtp"))]
2372    let router = router;
2373
2374    // MQTT routes
2375    #[cfg(feature = "mqtt")]
2376    let router = router
2377        .route("/mqtt/stats", get(get_mqtt_stats))
2378        .route("/mqtt/clients", get(get_mqtt_clients))
2379        .route("/mqtt/topics", get(get_mqtt_topics))
2380        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2381        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2382        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2383        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2384
2385    #[cfg(not(feature = "mqtt"))]
2386    let router = router
2387        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2388        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2389
2390    #[cfg(feature = "kafka")]
2391    let router = router
2392        .route("/kafka/stats", get(get_kafka_stats))
2393        .route("/kafka/topics", get(get_kafka_topics))
2394        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2395        .route("/kafka/groups", get(get_kafka_groups))
2396        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2397        .route("/kafka/produce", post(produce_kafka_message))
2398        .route("/kafka/produce/batch", post(produce_kafka_batch))
2399        .route("/kafka/messages/stream", get(kafka_messages_stream));
2400
2401    #[cfg(not(feature = "kafka"))]
2402    let router = router;
2403
2404    // Migration pipeline routes
2405    let router = router
2406        .route("/migration/routes", get(get_migration_routes))
2407        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2408        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2409        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2410        .route("/migration/groups/{group}", put(set_group_migration_mode))
2411        .route("/migration/groups", get(get_migration_groups))
2412        .route("/migration/status", get(get_migration_status));
2413
2414    // Proxy replacement rules routes
2415    let router = router
2416        .route("/proxy/rules", get(list_proxy_rules))
2417        .route("/proxy/rules", post(create_proxy_rule))
2418        .route("/proxy/rules/{id}", get(get_proxy_rule))
2419        .route("/proxy/rules/{id}", put(update_proxy_rule))
2420        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2421        .route("/proxy/inspect", get(get_proxy_inspect));
2422
2423    // AI-powered features
2424    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2425
2426    // Snapshot diff endpoints
2427    let router = router.nest(
2428        "/snapshot-diff",
2429        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2430    );
2431
2432    #[cfg(feature = "behavioral-cloning")]
2433    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2434
2435    let router = router
2436        .route("/mockai/learn", post(learn_from_examples))
2437        .route("/mockai/rules/explanations", get(list_rule_explanations))
2438        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2439        .route("/chaos/config", get(get_chaos_config))
2440        .route("/chaos/config", post(update_chaos_config))
2441        .route("/network/profiles", get(list_network_profiles))
2442        .route("/network/profile/apply", post(apply_network_profile));
2443
2444    // State machine API routes
2445    let router =
2446        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2447
2448    router.with_state(state)
2449}
2450
2451#[cfg(feature = "kafka")]
2452#[derive(Debug, Clone, Serialize, Deserialize)]
2453pub struct KafkaBrokerStats {
2454    /// Number of topics
2455    pub topics: usize,
2456    /// Total number of partitions
2457    pub partitions: usize,
2458    /// Number of consumer groups
2459    pub consumer_groups: usize,
2460    /// Total messages produced
2461    pub messages_produced: u64,
2462    /// Total messages consumed
2463    pub messages_consumed: u64,
2464}
2465
2466#[cfg(feature = "kafka")]
2467#[derive(Debug, Clone, Serialize, Deserialize)]
2468pub struct KafkaTopicInfo {
2469    pub name: String,
2470    pub partitions: usize,
2471    pub replication_factor: i32,
2472}
2473
2474#[cfg(feature = "kafka")]
2475#[derive(Debug, Clone, Serialize, Deserialize)]
2476pub struct KafkaConsumerGroupInfo {
2477    pub group_id: String,
2478    pub members: usize,
2479    pub state: String,
2480}
2481
2482#[cfg(feature = "kafka")]
2483/// Get Kafka broker statistics
2484async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2485    if let Some(broker) = &state.kafka_broker {
2486        let topics = broker.topics.read().await;
2487        let consumer_groups = broker.consumer_groups.read().await;
2488
2489        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2490
2491        // Get metrics snapshot for message counts
2492        let metrics_snapshot = broker.metrics().snapshot();
2493
2494        let stats = KafkaBrokerStats {
2495            topics: topics.len(),
2496            partitions: total_partitions,
2497            consumer_groups: consumer_groups.groups().len(),
2498            messages_produced: metrics_snapshot.messages_produced_total,
2499            messages_consumed: metrics_snapshot.messages_consumed_total,
2500        };
2501
2502        Json(stats).into_response()
2503    } else {
2504        (
2505            StatusCode::SERVICE_UNAVAILABLE,
2506            Json(serde_json::json!({
2507                "error": "Kafka broker not available",
2508                "message": "Kafka broker is not enabled or not available."
2509            })),
2510        )
2511            .into_response()
2512    }
2513}
2514
2515#[cfg(feature = "kafka")]
2516/// List Kafka topics
2517async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2518    if let Some(broker) = &state.kafka_broker {
2519        let topics = broker.topics.read().await;
2520        let topic_list: Vec<KafkaTopicInfo> = topics
2521            .iter()
2522            .map(|(name, topic)| KafkaTopicInfo {
2523                name: name.clone(),
2524                partitions: topic.partitions.len(),
2525                replication_factor: topic.config.replication_factor as i32,
2526            })
2527            .collect();
2528
2529        Json(serde_json::json!({
2530            "topics": topic_list
2531        }))
2532        .into_response()
2533    } else {
2534        (
2535            StatusCode::SERVICE_UNAVAILABLE,
2536            Json(serde_json::json!({
2537                "error": "Kafka broker not available",
2538                "message": "Kafka broker is not enabled or not available."
2539            })),
2540        )
2541            .into_response()
2542    }
2543}
2544
2545#[cfg(feature = "kafka")]
2546/// Get Kafka topic details
2547async fn get_kafka_topic(
2548    State(state): State<ManagementState>,
2549    Path(topic_name): Path<String>,
2550) -> impl IntoResponse {
2551    if let Some(broker) = &state.kafka_broker {
2552        let topics = broker.topics.read().await;
2553        if let Some(topic) = topics.get(&topic_name) {
2554            Json(serde_json::json!({
2555                "name": topic_name,
2556                "partitions": topic.partitions.len(),
2557                "replication_factor": topic.config.replication_factor,
2558                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2559                    "id": idx as i32,
2560                    "leader": 0,
2561                    "replicas": vec![0],
2562                    "message_count": partition.messages.len()
2563                })).collect::<Vec<_>>()
2564            })).into_response()
2565        } else {
2566            (
2567                StatusCode::NOT_FOUND,
2568                Json(serde_json::json!({
2569                    "error": "Topic not found",
2570                    "topic": topic_name
2571                })),
2572            )
2573                .into_response()
2574        }
2575    } else {
2576        (
2577            StatusCode::SERVICE_UNAVAILABLE,
2578            Json(serde_json::json!({
2579                "error": "Kafka broker not available",
2580                "message": "Kafka broker is not enabled or not available."
2581            })),
2582        )
2583            .into_response()
2584    }
2585}
2586
2587#[cfg(feature = "kafka")]
2588/// List Kafka consumer groups
2589async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2590    if let Some(broker) = &state.kafka_broker {
2591        let consumer_groups = broker.consumer_groups.read().await;
2592        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2593            .groups()
2594            .iter()
2595            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2596                group_id: group_id.clone(),
2597                members: group.members.len(),
2598                state: "Stable".to_string(), // Simplified - could be more detailed
2599            })
2600            .collect();
2601
2602        Json(serde_json::json!({
2603            "groups": groups
2604        }))
2605        .into_response()
2606    } else {
2607        (
2608            StatusCode::SERVICE_UNAVAILABLE,
2609            Json(serde_json::json!({
2610                "error": "Kafka broker not available",
2611                "message": "Kafka broker is not enabled or not available."
2612            })),
2613        )
2614            .into_response()
2615    }
2616}
2617
2618#[cfg(feature = "kafka")]
2619/// Get Kafka consumer group details
2620async fn get_kafka_group(
2621    State(state): State<ManagementState>,
2622    Path(group_id): Path<String>,
2623) -> impl IntoResponse {
2624    if let Some(broker) = &state.kafka_broker {
2625        let consumer_groups = broker.consumer_groups.read().await;
2626        if let Some(group) = consumer_groups.groups().get(&group_id) {
2627            Json(serde_json::json!({
2628                "group_id": group_id,
2629                "members": group.members.len(),
2630                "state": "Stable",
2631                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2632                    "member_id": member_id,
2633                    "client_id": member.client_id,
2634                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2635                        "topic": a.topic,
2636                        "partitions": a.partitions
2637                    })).collect::<Vec<_>>()
2638                })).collect::<Vec<_>>(),
2639                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2640                    "topic": topic,
2641                    "partition": partition,
2642                    "offset": offset
2643                })).collect::<Vec<_>>()
2644            })).into_response()
2645        } else {
2646            (
2647                StatusCode::NOT_FOUND,
2648                Json(serde_json::json!({
2649                    "error": "Consumer group not found",
2650                    "group_id": group_id
2651                })),
2652            )
2653                .into_response()
2654        }
2655    } else {
2656        (
2657            StatusCode::SERVICE_UNAVAILABLE,
2658            Json(serde_json::json!({
2659                "error": "Kafka broker not available",
2660                "message": "Kafka broker is not enabled or not available."
2661            })),
2662        )
2663            .into_response()
2664    }
2665}
2666
2667// ========== Kafka Produce Handler ==========
2668
2669#[cfg(feature = "kafka")]
2670#[derive(Debug, Deserialize)]
2671pub struct KafkaProduceRequest {
2672    /// Topic to produce to
2673    pub topic: String,
2674    /// Message key (optional)
2675    #[serde(default)]
2676    pub key: Option<String>,
2677    /// Message value (JSON string or plain string)
2678    pub value: String,
2679    /// Partition ID (optional, auto-assigned if not provided)
2680    #[serde(default)]
2681    pub partition: Option<i32>,
2682    /// Message headers (optional, key-value pairs)
2683    #[serde(default)]
2684    pub headers: Option<std::collections::HashMap<String, String>>,
2685}
2686
2687#[cfg(feature = "kafka")]
2688/// Produce a message to a Kafka topic
2689async fn produce_kafka_message(
2690    State(state): State<ManagementState>,
2691    Json(request): Json<KafkaProduceRequest>,
2692) -> impl IntoResponse {
2693    if let Some(broker) = &state.kafka_broker {
2694        let mut topics = broker.topics.write().await;
2695
2696        // Get or create the topic
2697        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2698            mockforge_kafka::topics::Topic::new(
2699                request.topic.clone(),
2700                mockforge_kafka::topics::TopicConfig::default(),
2701            )
2702        });
2703
2704        // Determine partition
2705        let partition_id = if let Some(partition) = request.partition {
2706            partition
2707        } else {
2708            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2709        };
2710
2711        // Validate partition exists
2712        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2713            return (
2714                StatusCode::BAD_REQUEST,
2715                Json(serde_json::json!({
2716                    "error": "Invalid partition",
2717                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2718                })),
2719            )
2720                .into_response();
2721        }
2722
2723        // Create the message
2724        let key_clone = request.key.clone();
2725        let headers_clone = request.headers.clone();
2726        let message = mockforge_kafka::partitions::KafkaMessage {
2727            offset: 0, // Will be set by partition.append
2728            timestamp: chrono::Utc::now().timestamp_millis(),
2729            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2730            value: request.value.as_bytes().to_vec(),
2731            headers: headers_clone
2732                .clone()
2733                .unwrap_or_default()
2734                .into_iter()
2735                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2736                .collect(),
2737        };
2738
2739        // Produce to partition
2740        match topic_entry.produce(partition_id, message).await {
2741            Ok(offset) => {
2742                // Record metrics for successful message production
2743                if let Some(broker) = &state.kafka_broker {
2744                    broker.metrics().record_messages_produced(1);
2745                }
2746
2747                // Emit message event for real-time monitoring
2748                #[cfg(feature = "kafka")]
2749                {
2750                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2751                        topic: request.topic.clone(),
2752                        key: key_clone,
2753                        value: request.value.clone(),
2754                        partition: partition_id,
2755                        offset,
2756                        headers: headers_clone,
2757                        timestamp: chrono::Utc::now().to_rfc3339(),
2758                    });
2759                    let _ = state.message_events.send(event);
2760                }
2761
2762                Json(serde_json::json!({
2763                    "success": true,
2764                    "message": format!("Message produced to topic '{}'", request.topic),
2765                    "topic": request.topic,
2766                    "partition": partition_id,
2767                    "offset": offset
2768                }))
2769                .into_response()
2770            }
2771            Err(e) => (
2772                StatusCode::INTERNAL_SERVER_ERROR,
2773                Json(serde_json::json!({
2774                    "error": "Failed to produce message",
2775                    "message": e.to_string()
2776                })),
2777            )
2778                .into_response(),
2779        }
2780    } else {
2781        (
2782            StatusCode::SERVICE_UNAVAILABLE,
2783            Json(serde_json::json!({
2784                "error": "Kafka broker not available",
2785                "message": "Kafka broker is not enabled or not available."
2786            })),
2787        )
2788            .into_response()
2789    }
2790}
2791
2792#[cfg(feature = "kafka")]
2793#[derive(Debug, Deserialize)]
2794pub struct KafkaBatchProduceRequest {
2795    /// List of messages to produce
2796    pub messages: Vec<KafkaProduceRequest>,
2797    /// Delay between messages in milliseconds
2798    #[serde(default = "default_delay")]
2799    pub delay_ms: u64,
2800}
2801
2802#[cfg(feature = "kafka")]
2803/// Produce multiple messages to Kafka topics
2804async fn produce_kafka_batch(
2805    State(state): State<ManagementState>,
2806    Json(request): Json<KafkaBatchProduceRequest>,
2807) -> impl IntoResponse {
2808    if let Some(broker) = &state.kafka_broker {
2809        if request.messages.is_empty() {
2810            return (
2811                StatusCode::BAD_REQUEST,
2812                Json(serde_json::json!({
2813                    "error": "Empty batch",
2814                    "message": "At least one message is required"
2815                })),
2816            )
2817                .into_response();
2818        }
2819
2820        let mut results = Vec::new();
2821
2822        for (index, msg_request) in request.messages.iter().enumerate() {
2823            let mut topics = broker.topics.write().await;
2824
2825            // Get or create the topic
2826            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2827                mockforge_kafka::topics::Topic::new(
2828                    msg_request.topic.clone(),
2829                    mockforge_kafka::topics::TopicConfig::default(),
2830                )
2831            });
2832
2833            // Determine partition
2834            let partition_id = if let Some(partition) = msg_request.partition {
2835                partition
2836            } else {
2837                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2838            };
2839
2840            // Validate partition exists
2841            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2842                results.push(serde_json::json!({
2843                    "index": index,
2844                    "success": false,
2845                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2846                }));
2847                continue;
2848            }
2849
2850            // Create the message
2851            let message = mockforge_kafka::partitions::KafkaMessage {
2852                offset: 0,
2853                timestamp: chrono::Utc::now().timestamp_millis(),
2854                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2855                value: msg_request.value.as_bytes().to_vec(),
2856                headers: msg_request
2857                    .headers
2858                    .clone()
2859                    .unwrap_or_default()
2860                    .into_iter()
2861                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2862                    .collect(),
2863            };
2864
2865            // Produce to partition
2866            match topic_entry.produce(partition_id, message).await {
2867                Ok(offset) => {
2868                    // Record metrics for successful message production
2869                    if let Some(broker) = &state.kafka_broker {
2870                        broker.metrics().record_messages_produced(1);
2871                    }
2872
2873                    // Emit message event
2874                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2875                        topic: msg_request.topic.clone(),
2876                        key: msg_request.key.clone(),
2877                        value: msg_request.value.clone(),
2878                        partition: partition_id,
2879                        offset,
2880                        headers: msg_request.headers.clone(),
2881                        timestamp: chrono::Utc::now().to_rfc3339(),
2882                    });
2883                    let _ = state.message_events.send(event);
2884
2885                    results.push(serde_json::json!({
2886                        "index": index,
2887                        "success": true,
2888                        "topic": msg_request.topic,
2889                        "partition": partition_id,
2890                        "offset": offset
2891                    }));
2892                }
2893                Err(e) => {
2894                    results.push(serde_json::json!({
2895                        "index": index,
2896                        "success": false,
2897                        "error": e.to_string()
2898                    }));
2899                }
2900            }
2901
2902            // Add delay between messages (except for the last one)
2903            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2904                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2905            }
2906        }
2907
2908        let success_count =
2909            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2910
2911        Json(serde_json::json!({
2912            "success": true,
2913            "total": request.messages.len(),
2914            "succeeded": success_count,
2915            "failed": request.messages.len() - success_count,
2916            "results": results
2917        }))
2918        .into_response()
2919    } else {
2920        (
2921            StatusCode::SERVICE_UNAVAILABLE,
2922            Json(serde_json::json!({
2923                "error": "Kafka broker not available",
2924                "message": "Kafka broker is not enabled or not available."
2925            })),
2926        )
2927            .into_response()
2928    }
2929}
2930
2931// ========== Real-time Message Streaming (SSE) ==========
2932
2933#[cfg(feature = "mqtt")]
2934/// SSE stream for MQTT messages
2935async fn mqtt_messages_stream(
2936    State(state): State<ManagementState>,
2937    Query(params): Query<std::collections::HashMap<String, String>>,
2938) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2939    let rx = state.message_events.subscribe();
2940    let topic_filter = params.get("topic").cloned();
2941
2942    let stream = stream::unfold(rx, move |mut rx| {
2943        let topic_filter = topic_filter.clone();
2944
2945        async move {
2946            loop {
2947                match rx.recv().await {
2948                    Ok(MessageEvent::Mqtt(event)) => {
2949                        // Apply topic filter if specified
2950                        if let Some(filter) = &topic_filter {
2951                            if !event.topic.contains(filter) {
2952                                continue;
2953                            }
2954                        }
2955
2956                        let event_json = serde_json::json!({
2957                            "protocol": "mqtt",
2958                            "topic": event.topic,
2959                            "payload": event.payload,
2960                            "qos": event.qos,
2961                            "retain": event.retain,
2962                            "timestamp": event.timestamp,
2963                        });
2964
2965                        if let Ok(event_data) = serde_json::to_string(&event_json) {
2966                            let sse_event = Event::default().event("mqtt_message").data(event_data);
2967                            return Some((Ok(sse_event), rx));
2968                        }
2969                    }
2970                    #[cfg(feature = "kafka")]
2971                    Ok(MessageEvent::Kafka(_)) => {
2972                        // Skip Kafka events in MQTT stream
2973                        continue;
2974                    }
2975                    Err(broadcast::error::RecvError::Closed) => {
2976                        return None;
2977                    }
2978                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
2979                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
2980                        continue;
2981                    }
2982                }
2983            }
2984        }
2985    });
2986
2987    Sse::new(stream).keep_alive(
2988        axum::response::sse::KeepAlive::new()
2989            .interval(std::time::Duration::from_secs(15))
2990            .text("keep-alive-text"),
2991    )
2992}
2993
2994#[cfg(feature = "kafka")]
2995/// SSE stream for Kafka messages
2996async fn kafka_messages_stream(
2997    State(state): State<ManagementState>,
2998    Query(params): Query<std::collections::HashMap<String, String>>,
2999) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3000    let mut rx = state.message_events.subscribe();
3001    let topic_filter = params.get("topic").cloned();
3002
3003    let stream = stream::unfold(rx, move |mut rx| {
3004        let topic_filter = topic_filter.clone();
3005
3006        async move {
3007            loop {
3008                match rx.recv().await {
3009                    #[cfg(feature = "mqtt")]
3010                    Ok(MessageEvent::Mqtt(_)) => {
3011                        // Skip MQTT events in Kafka stream
3012                        continue;
3013                    }
3014                    Ok(MessageEvent::Kafka(event)) => {
3015                        // Apply topic filter if specified
3016                        if let Some(filter) = &topic_filter {
3017                            if !event.topic.contains(filter) {
3018                                continue;
3019                            }
3020                        }
3021
3022                        let event_json = serde_json::json!({
3023                            "protocol": "kafka",
3024                            "topic": event.topic,
3025                            "key": event.key,
3026                            "value": event.value,
3027                            "partition": event.partition,
3028                            "offset": event.offset,
3029                            "headers": event.headers,
3030                            "timestamp": event.timestamp,
3031                        });
3032
3033                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3034                            let sse_event =
3035                                Event::default().event("kafka_message").data(event_data);
3036                            return Some((Ok(sse_event), rx));
3037                        }
3038                    }
3039                    Err(broadcast::error::RecvError::Closed) => {
3040                        return None;
3041                    }
3042                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3043                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3044                        continue;
3045                    }
3046                }
3047            }
3048        }
3049    });
3050
3051    Sse::new(stream).keep_alive(
3052        axum::response::sse::KeepAlive::new()
3053            .interval(std::time::Duration::from_secs(15))
3054            .text("keep-alive-text"),
3055    )
3056}
3057
3058// ========== AI-Powered Features ==========
3059
3060/// Request for AI-powered API specification generation
3061#[derive(Debug, Deserialize)]
3062pub struct GenerateSpecRequest {
3063    /// Natural language description of the API to generate
3064    pub query: String,
3065    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3066    pub spec_type: String,
3067    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3068    pub api_version: Option<String>,
3069}
3070
3071/// Request for OpenAPI generation from recorded traffic
3072#[derive(Debug, Deserialize)]
3073pub struct GenerateOpenApiFromTrafficRequest {
3074    /// Path to recorder database (optional, defaults to ./recordings.db)
3075    #[serde(default)]
3076    pub database_path: Option<String>,
3077    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3078    #[serde(default)]
3079    pub since: Option<String>,
3080    /// End time for filtering (ISO 8601 format)
3081    #[serde(default)]
3082    pub until: Option<String>,
3083    /// Path pattern filter (supports wildcards, e.g., /api/*)
3084    #[serde(default)]
3085    pub path_pattern: Option<String>,
3086    /// Minimum confidence score for including paths (0.0 to 1.0)
3087    #[serde(default = "default_min_confidence")]
3088    pub min_confidence: f64,
3089}
3090
3091fn default_min_confidence() -> f64 {
3092    0.7
3093}
3094
3095/// Generate API specification from natural language using AI
3096#[cfg(feature = "data-faker")]
3097async fn generate_ai_spec(
3098    State(_state): State<ManagementState>,
3099    Json(request): Json<GenerateSpecRequest>,
3100) -> impl IntoResponse {
3101    use mockforge_data::rag::{
3102        config::{LlmProvider, RagConfig},
3103        engine::RagEngine,
3104        storage::DocumentStorage,
3105    };
3106    use std::sync::Arc;
3107
3108    // Build RAG config from environment variables
3109    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3110        .ok()
3111        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3112
3113    // Check if RAG is configured - require API key
3114    if api_key.is_none() {
3115        return (
3116            StatusCode::SERVICE_UNAVAILABLE,
3117            Json(serde_json::json!({
3118                "error": "AI service not configured",
3119                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3120            })),
3121        )
3122            .into_response();
3123    }
3124
3125    // Build RAG configuration
3126    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3127        .unwrap_or_else(|_| "openai".to_string())
3128        .to_lowercase();
3129
3130    let provider = match provider_str.as_str() {
3131        "openai" => LlmProvider::OpenAI,
3132        "anthropic" => LlmProvider::Anthropic,
3133        "ollama" => LlmProvider::Ollama,
3134        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3135        _ => LlmProvider::OpenAI,
3136    };
3137
3138    let api_endpoint =
3139        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3140            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3141            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3142            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3143            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3144        });
3145
3146    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3147        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3148        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3149        LlmProvider::Ollama => "llama2".to_string(),
3150        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3151    });
3152
3153    // Build RagConfig using default() and override fields
3154    let mut rag_config = RagConfig::default();
3155    rag_config.provider = provider;
3156    rag_config.api_endpoint = api_endpoint;
3157    rag_config.api_key = api_key;
3158    rag_config.model = model;
3159    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3160        .unwrap_or_else(|_| "4096".to_string())
3161        .parse()
3162        .unwrap_or(4096);
3163    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3164        .unwrap_or_else(|_| "0.3".to_string())
3165        .parse()
3166        .unwrap_or(0.3); // Lower temperature for more structured output
3167    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3168        .unwrap_or_else(|_| "60".to_string())
3169        .parse()
3170        .unwrap_or(60);
3171    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3172        .unwrap_or_else(|_| "4000".to_string())
3173        .parse()
3174        .unwrap_or(4000);
3175
3176    // Build the prompt for spec generation
3177    let spec_type_label = match request.spec_type.as_str() {
3178        "openapi" => "OpenAPI 3.0",
3179        "graphql" => "GraphQL",
3180        "asyncapi" => "AsyncAPI",
3181        _ => "OpenAPI 3.0",
3182    };
3183
3184    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3185
3186    let prompt = format!(
3187        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3188
3189User Requirements:
3190{}
3191
3192Instructions:
31931. Generate a complete, valid {} specification
31942. Include all paths, operations, request/response schemas, and components
31953. Use realistic field names and data types
31964. Include proper descriptions and examples
31975. Follow {} best practices
31986. Return ONLY the specification, no additional explanation
31997. For OpenAPI, use version {}
3200
3201Return the specification in {} format."#,
3202        spec_type_label,
3203        request.query,
3204        spec_type_label,
3205        spec_type_label,
3206        api_version,
3207        if request.spec_type == "graphql" {
3208            "GraphQL SDL"
3209        } else {
3210            "YAML"
3211        }
3212    );
3213
3214    // Create in-memory storage for RAG engine
3215    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3216    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3217    // a simpler approach: create InMemoryStorage directly
3218    use mockforge_data::rag::storage::InMemoryStorage;
3219    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3220
3221    // Create RAG engine
3222    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3223        Ok(engine) => engine,
3224        Err(e) => {
3225            return (
3226                StatusCode::INTERNAL_SERVER_ERROR,
3227                Json(serde_json::json!({
3228                    "error": "Failed to initialize RAG engine",
3229                    "message": e.to_string()
3230                })),
3231            )
3232                .into_response();
3233        }
3234    };
3235
3236    // Generate using RAG engine
3237    match rag_engine.generate(&prompt, None).await {
3238        Ok(generated_text) => {
3239            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3240            let spec = if request.spec_type == "graphql" {
3241                // For GraphQL, extract SDL
3242                extract_graphql_schema(&generated_text)
3243            } else {
3244                // For OpenAPI/AsyncAPI, extract YAML
3245                extract_yaml_spec(&generated_text)
3246            };
3247
3248            Json(serde_json::json!({
3249                "success": true,
3250                "spec": spec,
3251                "spec_type": request.spec_type,
3252            }))
3253            .into_response()
3254        }
3255        Err(e) => (
3256            StatusCode::INTERNAL_SERVER_ERROR,
3257            Json(serde_json::json!({
3258                "error": "AI generation failed",
3259                "message": e.to_string()
3260            })),
3261        )
3262            .into_response(),
3263    }
3264}
3265
3266#[cfg(not(feature = "data-faker"))]
3267async fn generate_ai_spec(
3268    State(_state): State<ManagementState>,
3269    Json(_request): Json<GenerateSpecRequest>,
3270) -> impl IntoResponse {
3271    (
3272        StatusCode::NOT_IMPLEMENTED,
3273        Json(serde_json::json!({
3274            "error": "AI features not enabled",
3275            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3276        })),
3277    )
3278        .into_response()
3279}
3280
3281/// Generate OpenAPI specification from recorded traffic
3282#[cfg(feature = "behavioral-cloning")]
3283async fn generate_openapi_from_traffic(
3284    State(_state): State<ManagementState>,
3285    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3286) -> impl IntoResponse {
3287    use chrono::{DateTime, Utc};
3288    use mockforge_core::intelligent_behavior::{
3289        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3290        IntelligentBehaviorConfig,
3291    };
3292    use mockforge_recorder::{
3293        database::RecorderDatabase,
3294        openapi_export::{QueryFilters, RecordingsToOpenApi},
3295    };
3296    use std::path::PathBuf;
3297
3298    // Determine database path
3299    let db_path = if let Some(ref path) = request.database_path {
3300        PathBuf::from(path)
3301    } else {
3302        std::env::current_dir()
3303            .unwrap_or_else(|_| PathBuf::from("."))
3304            .join("recordings.db")
3305    };
3306
3307    // Open database
3308    let db = match RecorderDatabase::new(&db_path).await {
3309        Ok(db) => db,
3310        Err(e) => {
3311            return (
3312                StatusCode::BAD_REQUEST,
3313                Json(serde_json::json!({
3314                    "error": "Database error",
3315                    "message": format!("Failed to open recorder database: {}", e)
3316                })),
3317            )
3318                .into_response();
3319        }
3320    };
3321
3322    // Parse time filters
3323    let since_dt = if let Some(ref since_str) = request.since {
3324        match DateTime::parse_from_rfc3339(since_str) {
3325            Ok(dt) => Some(dt.with_timezone(&Utc)),
3326            Err(e) => {
3327                return (
3328                    StatusCode::BAD_REQUEST,
3329                    Json(serde_json::json!({
3330                        "error": "Invalid date format",
3331                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3332                    })),
3333                )
3334                    .into_response();
3335            }
3336        }
3337    } else {
3338        None
3339    };
3340
3341    let until_dt = if let Some(ref until_str) = request.until {
3342        match DateTime::parse_from_rfc3339(until_str) {
3343            Ok(dt) => Some(dt.with_timezone(&Utc)),
3344            Err(e) => {
3345                return (
3346                    StatusCode::BAD_REQUEST,
3347                    Json(serde_json::json!({
3348                        "error": "Invalid date format",
3349                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3350                    })),
3351                )
3352                    .into_response();
3353            }
3354        }
3355    } else {
3356        None
3357    };
3358
3359    // Build query filters
3360    let query_filters = QueryFilters {
3361        since: since_dt,
3362        until: until_dt,
3363        path_pattern: request.path_pattern.clone(),
3364        min_status_code: None,
3365        max_requests: Some(1000),
3366    };
3367
3368    // Query HTTP exchanges
3369    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3370    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3371    // dependency, so we need to manually convert to the local version.
3372    let exchanges_from_recorder =
3373        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3374            Ok(exchanges) => exchanges,
3375            Err(e) => {
3376                return (
3377                    StatusCode::INTERNAL_SERVER_ERROR,
3378                    Json(serde_json::json!({
3379                        "error": "Query error",
3380                        "message": format!("Failed to query HTTP exchanges: {}", e)
3381                    })),
3382                )
3383                    .into_response();
3384            }
3385        };
3386
3387    if exchanges_from_recorder.is_empty() {
3388        return (
3389            StatusCode::NOT_FOUND,
3390            Json(serde_json::json!({
3391                "error": "No exchanges found",
3392                "message": "No HTTP exchanges found matching the specified filters"
3393            })),
3394        )
3395            .into_response();
3396    }
3397
3398    // Convert to local HttpExchange type to avoid version mismatch
3399    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3400    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3401        .into_iter()
3402        .map(|e| LocalHttpExchange {
3403            method: e.method,
3404            path: e.path,
3405            query_params: e.query_params,
3406            headers: e.headers,
3407            body: e.body,
3408            body_encoding: e.body_encoding,
3409            status_code: e.status_code,
3410            response_headers: e.response_headers,
3411            response_body: e.response_body,
3412            response_body_encoding: e.response_body_encoding,
3413            timestamp: e.timestamp,
3414        })
3415        .collect();
3416
3417    // Create OpenAPI generator config
3418    let behavior_config = IntelligentBehaviorConfig::default();
3419    let gen_config = OpenApiGenerationConfig {
3420        min_confidence: request.min_confidence,
3421        behavior_model: Some(behavior_config.behavior_model),
3422    };
3423
3424    // Generate OpenAPI spec
3425    let generator = OpenApiSpecGenerator::new(gen_config);
3426    let result = match generator.generate_from_exchanges(exchanges).await {
3427        Ok(result) => result,
3428        Err(e) => {
3429            return (
3430                StatusCode::INTERNAL_SERVER_ERROR,
3431                Json(serde_json::json!({
3432                    "error": "Generation error",
3433                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3434                })),
3435            )
3436                .into_response();
3437        }
3438    };
3439
3440    // Prepare response
3441    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3442        raw.clone()
3443    } else {
3444        match serde_json::to_value(&result.spec.spec) {
3445            Ok(json) => json,
3446            Err(e) => {
3447                return (
3448                    StatusCode::INTERNAL_SERVER_ERROR,
3449                    Json(serde_json::json!({
3450                        "error": "Serialization error",
3451                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3452                    })),
3453                )
3454                    .into_response();
3455            }
3456        }
3457    };
3458
3459    // Build response with metadata
3460    let response = serde_json::json!({
3461        "spec": spec_json,
3462        "metadata": {
3463            "requests_analyzed": result.metadata.requests_analyzed,
3464            "paths_inferred": result.metadata.paths_inferred,
3465            "path_confidence": result.metadata.path_confidence,
3466            "generated_at": result.metadata.generated_at.to_rfc3339(),
3467            "duration_ms": result.metadata.duration_ms,
3468        }
3469    });
3470
3471    Json(response).into_response()
3472}
3473
3474/// List all rule explanations
3475async fn list_rule_explanations(
3476    State(state): State<ManagementState>,
3477    Query(params): Query<std::collections::HashMap<String, String>>,
3478) -> impl IntoResponse {
3479    use mockforge_core::intelligent_behavior::RuleType;
3480
3481    let explanations = state.rule_explanations.read().await;
3482    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3483
3484    // Filter by rule type if provided
3485    if let Some(rule_type_str) = params.get("rule_type") {
3486        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3487            explanations_vec.retain(|e| e.rule_type == rule_type);
3488        }
3489    }
3490
3491    // Filter by minimum confidence if provided
3492    if let Some(min_confidence_str) = params.get("min_confidence") {
3493        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3494            explanations_vec.retain(|e| e.confidence >= min_confidence);
3495        }
3496    }
3497
3498    // Sort by confidence (descending) and then by generated_at (descending)
3499    explanations_vec.sort_by(|a, b| {
3500        b.confidence
3501            .partial_cmp(&a.confidence)
3502            .unwrap_or(std::cmp::Ordering::Equal)
3503            .then_with(|| b.generated_at.cmp(&a.generated_at))
3504    });
3505
3506    Json(serde_json::json!({
3507        "explanations": explanations_vec,
3508        "total": explanations_vec.len(),
3509    }))
3510    .into_response()
3511}
3512
3513/// Get a specific rule explanation by ID
3514async fn get_rule_explanation(
3515    State(state): State<ManagementState>,
3516    Path(rule_id): Path<String>,
3517) -> impl IntoResponse {
3518    let explanations = state.rule_explanations.read().await;
3519
3520    match explanations.get(&rule_id) {
3521        Some(explanation) => Json(serde_json::json!({
3522            "explanation": explanation,
3523        }))
3524        .into_response(),
3525        None => (
3526            StatusCode::NOT_FOUND,
3527            Json(serde_json::json!({
3528                "error": "Rule explanation not found",
3529                "message": format!("No explanation found for rule ID: {}", rule_id)
3530            })),
3531        )
3532            .into_response(),
3533    }
3534}
3535
3536/// Request for learning from examples
3537#[derive(Debug, Deserialize)]
3538pub struct LearnFromExamplesRequest {
3539    /// Example request/response pairs to learn from
3540    pub examples: Vec<ExamplePairRequest>,
3541    /// Optional configuration override
3542    #[serde(default)]
3543    pub config: Option<serde_json::Value>,
3544}
3545
3546/// Example pair request format
3547#[derive(Debug, Deserialize)]
3548pub struct ExamplePairRequest {
3549    /// Request data (method, path, body, etc.)
3550    pub request: serde_json::Value,
3551    /// Response data (status_code, body, etc.)
3552    pub response: serde_json::Value,
3553}
3554
3555/// Learn behavioral rules from example pairs
3556///
3557/// This endpoint accepts example request/response pairs, generates behavioral rules
3558/// with explanations, and stores the explanations for later retrieval.
3559async fn learn_from_examples(
3560    State(state): State<ManagementState>,
3561    Json(request): Json<LearnFromExamplesRequest>,
3562) -> impl IntoResponse {
3563    use mockforge_core::intelligent_behavior::{
3564        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3565        rule_generator::{ExamplePair, RuleGenerator},
3566    };
3567
3568    if request.examples.is_empty() {
3569        return (
3570            StatusCode::BAD_REQUEST,
3571            Json(serde_json::json!({
3572                "error": "No examples provided",
3573                "message": "At least one example pair is required"
3574            })),
3575        )
3576            .into_response();
3577    }
3578
3579    // Convert request examples to ExamplePair format
3580    let example_pairs: Result<Vec<ExamplePair>, String> = request
3581        .examples
3582        .into_iter()
3583        .enumerate()
3584        .map(|(idx, ex)| {
3585            // Parse request JSON to extract method, path, body, etc.
3586            let method = ex
3587                .request
3588                .get("method")
3589                .and_then(|v| v.as_str())
3590                .map(|s| s.to_string())
3591                .unwrap_or_else(|| "GET".to_string());
3592            let path = ex
3593                .request
3594                .get("path")
3595                .and_then(|v| v.as_str())
3596                .map(|s| s.to_string())
3597                .unwrap_or_else(|| "/".to_string());
3598            let request_body = ex.request.get("body").cloned();
3599            let query_params = ex
3600                .request
3601                .get("query_params")
3602                .and_then(|v| v.as_object())
3603                .map(|obj| {
3604                    obj.iter()
3605                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3606                        .collect()
3607                })
3608                .unwrap_or_default();
3609            let headers = ex
3610                .request
3611                .get("headers")
3612                .and_then(|v| v.as_object())
3613                .map(|obj| {
3614                    obj.iter()
3615                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3616                        .collect()
3617                })
3618                .unwrap_or_default();
3619
3620            // Parse response JSON to extract status, body, etc.
3621            let status = ex
3622                .response
3623                .get("status_code")
3624                .or_else(|| ex.response.get("status"))
3625                .and_then(|v| v.as_u64())
3626                .map(|n| n as u16)
3627                .unwrap_or(200);
3628            let response_body = ex.response.get("body").cloned();
3629
3630            Ok(ExamplePair {
3631                method,
3632                path,
3633                request: request_body,
3634                status,
3635                response: response_body,
3636                query_params,
3637                headers,
3638                metadata: {
3639                    let mut meta = std::collections::HashMap::new();
3640                    meta.insert("source".to_string(), "api".to_string());
3641                    meta.insert("example_index".to_string(), idx.to_string());
3642                    meta
3643                },
3644            })
3645        })
3646        .collect();
3647
3648    let example_pairs = match example_pairs {
3649        Ok(pairs) => pairs,
3650        Err(e) => {
3651            return (
3652                StatusCode::BAD_REQUEST,
3653                Json(serde_json::json!({
3654                    "error": "Invalid examples",
3655                    "message": e
3656                })),
3657            )
3658                .into_response();
3659        }
3660    };
3661
3662    // Create behavior config (use provided config or default)
3663    let behavior_config = if let Some(config_json) = request.config {
3664        // Try to deserialize custom config, fall back to default
3665        serde_json::from_value(config_json)
3666            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3667            .behavior_model
3668    } else {
3669        BehaviorModelConfig::default()
3670    };
3671
3672    // Create rule generator
3673    let generator = RuleGenerator::new(behavior_config);
3674
3675    // Generate rules with explanations
3676    let (rules, explanations) =
3677        match generator.generate_rules_with_explanations(example_pairs).await {
3678            Ok(result) => result,
3679            Err(e) => {
3680                return (
3681                    StatusCode::INTERNAL_SERVER_ERROR,
3682                    Json(serde_json::json!({
3683                        "error": "Rule generation failed",
3684                        "message": format!("Failed to generate rules: {}", e)
3685                    })),
3686                )
3687                    .into_response();
3688            }
3689        };
3690
3691    // Store explanations in ManagementState
3692    {
3693        let mut stored_explanations = state.rule_explanations.write().await;
3694        for explanation in &explanations {
3695            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3696        }
3697    }
3698
3699    // Prepare response
3700    let response = serde_json::json!({
3701        "success": true,
3702        "rules_generated": {
3703            "consistency_rules": rules.consistency_rules.len(),
3704            "schemas": rules.schemas.len(),
3705            "state_machines": rules.state_transitions.len(),
3706            "system_prompt": !rules.system_prompt.is_empty(),
3707        },
3708        "explanations": explanations.iter().map(|e| serde_json::json!({
3709            "rule_id": e.rule_id,
3710            "rule_type": e.rule_type,
3711            "confidence": e.confidence,
3712            "reasoning": e.reasoning,
3713        })).collect::<Vec<_>>(),
3714        "total_explanations": explanations.len(),
3715    });
3716
3717    Json(response).into_response()
3718}
3719
3720#[cfg(feature = "data-faker")]
3721fn extract_yaml_spec(text: &str) -> String {
3722    // Try to find YAML code blocks
3723    if let Some(start) = text.find("```yaml") {
3724        let yaml_start = text[start + 7..].trim_start();
3725        if let Some(end) = yaml_start.find("```") {
3726            return yaml_start[..end].trim().to_string();
3727        }
3728    }
3729    if let Some(start) = text.find("```") {
3730        let content_start = text[start + 3..].trim_start();
3731        if let Some(end) = content_start.find("```") {
3732            return content_start[..end].trim().to_string();
3733        }
3734    }
3735
3736    // Check if it starts with openapi: or asyncapi:
3737    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3738        return text.trim().to_string();
3739    }
3740
3741    // Return as-is if no code blocks found
3742    text.trim().to_string()
3743}
3744
3745/// Extract GraphQL schema from text content
3746#[cfg(feature = "data-faker")]
3747fn extract_graphql_schema(text: &str) -> String {
3748    // Try to find GraphQL code blocks
3749    if let Some(start) = text.find("```graphql") {
3750        let schema_start = text[start + 10..].trim_start();
3751        if let Some(end) = schema_start.find("```") {
3752            return schema_start[..end].trim().to_string();
3753        }
3754    }
3755    if let Some(start) = text.find("```") {
3756        let content_start = text[start + 3..].trim_start();
3757        if let Some(end) = content_start.find("```") {
3758            return content_start[..end].trim().to_string();
3759        }
3760    }
3761
3762    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3763    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3764        return text.trim().to_string();
3765    }
3766
3767    text.trim().to_string()
3768}
3769
3770// ========== Chaos Engineering Management ==========
3771
3772/// Get current chaos engineering configuration
3773async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3774    #[cfg(feature = "chaos")]
3775    {
3776        if let Some(chaos_state) = &_state.chaos_api_state {
3777            let config = chaos_state.config.read().await;
3778            // Convert ChaosConfig to JSON response format
3779            Json(serde_json::json!({
3780                "enabled": config.enabled,
3781                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3782                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3783                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3784                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3785            }))
3786            .into_response()
3787        } else {
3788            // Chaos API not available, return default
3789            Json(serde_json::json!({
3790                "enabled": false,
3791                "latency": null,
3792                "fault_injection": null,
3793                "rate_limit": null,
3794                "traffic_shaping": null,
3795            }))
3796            .into_response()
3797        }
3798    }
3799    #[cfg(not(feature = "chaos"))]
3800    {
3801        // Chaos feature not enabled
3802        Json(serde_json::json!({
3803            "enabled": false,
3804            "latency": null,
3805            "fault_injection": null,
3806            "rate_limit": null,
3807            "traffic_shaping": null,
3808        }))
3809        .into_response()
3810    }
3811}
3812
3813/// Request to update chaos configuration
3814#[derive(Debug, Deserialize)]
3815pub struct ChaosConfigUpdate {
3816    /// Whether to enable chaos engineering
3817    pub enabled: Option<bool>,
3818    /// Latency configuration
3819    pub latency: Option<serde_json::Value>,
3820    /// Fault injection configuration
3821    pub fault_injection: Option<serde_json::Value>,
3822    /// Rate limiting configuration
3823    pub rate_limit: Option<serde_json::Value>,
3824    /// Traffic shaping configuration
3825    pub traffic_shaping: Option<serde_json::Value>,
3826}
3827
3828/// Update chaos engineering configuration
3829async fn update_chaos_config(
3830    State(_state): State<ManagementState>,
3831    Json(_config_update): Json<ChaosConfigUpdate>,
3832) -> impl IntoResponse {
3833    #[cfg(feature = "chaos")]
3834    {
3835        if let Some(chaos_state) = &_state.chaos_api_state {
3836            use mockforge_chaos::config::{
3837                ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3838                TrafficShapingConfig,
3839            };
3840
3841            let mut config = chaos_state.config.write().await;
3842
3843            // Update enabled flag if provided
3844            if let Some(enabled) = _config_update.enabled {
3845                config.enabled = enabled;
3846            }
3847
3848            // Update latency config if provided
3849            if let Some(latency_json) = _config_update.latency {
3850                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3851                    config.latency = Some(latency);
3852                }
3853            }
3854
3855            // Update fault injection config if provided
3856            if let Some(fault_json) = _config_update.fault_injection {
3857                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3858                    config.fault_injection = Some(fault);
3859                }
3860            }
3861
3862            // Update rate limit config if provided
3863            if let Some(rate_json) = _config_update.rate_limit {
3864                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3865                    config.rate_limit = Some(rate);
3866                }
3867            }
3868
3869            // Update traffic shaping config if provided
3870            if let Some(traffic_json) = _config_update.traffic_shaping {
3871                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3872                    config.traffic_shaping = Some(traffic);
3873                }
3874            }
3875
3876            // Reinitialize middleware injectors with new config
3877            // The middleware will pick up the changes on the next request
3878            drop(config);
3879
3880            info!("Chaos configuration updated successfully");
3881            Json(serde_json::json!({
3882                "success": true,
3883                "message": "Chaos configuration updated and applied"
3884            }))
3885            .into_response()
3886        } else {
3887            (
3888                StatusCode::SERVICE_UNAVAILABLE,
3889                Json(serde_json::json!({
3890                    "success": false,
3891                    "error": "Chaos API not available",
3892                    "message": "Chaos engineering is not enabled or configured"
3893                })),
3894            )
3895                .into_response()
3896        }
3897    }
3898    #[cfg(not(feature = "chaos"))]
3899    {
3900        (
3901            StatusCode::NOT_IMPLEMENTED,
3902            Json(serde_json::json!({
3903                "success": false,
3904                "error": "Chaos feature not enabled",
3905                "message": "Chaos engineering feature is not compiled into this build"
3906            })),
3907        )
3908            .into_response()
3909    }
3910}
3911
3912// ========== Network Profile Management ==========
3913
3914/// List available network profiles
3915async fn list_network_profiles() -> impl IntoResponse {
3916    use mockforge_core::network_profiles::NetworkProfileCatalog;
3917
3918    let catalog = NetworkProfileCatalog::default();
3919    let profiles: Vec<serde_json::Value> = catalog
3920        .list_profiles_with_description()
3921        .iter()
3922        .map(|(name, description)| {
3923            serde_json::json!({
3924                "name": name,
3925                "description": description,
3926            })
3927        })
3928        .collect();
3929
3930    Json(serde_json::json!({
3931        "profiles": profiles
3932    }))
3933    .into_response()
3934}
3935
3936#[derive(Debug, Deserialize)]
3937/// Request to apply a network profile
3938pub struct ApplyNetworkProfileRequest {
3939    /// Name of the network profile to apply
3940    pub profile_name: String,
3941}
3942
3943/// Apply a network profile
3944async fn apply_network_profile(
3945    State(state): State<ManagementState>,
3946    Json(request): Json<ApplyNetworkProfileRequest>,
3947) -> impl IntoResponse {
3948    use mockforge_core::network_profiles::NetworkProfileCatalog;
3949
3950    let catalog = NetworkProfileCatalog::default();
3951    if let Some(profile) = catalog.get(&request.profile_name) {
3952        // Apply profile to server configuration if available
3953        // NetworkProfile contains latency and traffic_shaping configs
3954        if let Some(server_config) = &state.server_config {
3955            let mut config = server_config.write().await;
3956
3957            // Apply network profile's traffic shaping to core config
3958            use mockforge_core::config::NetworkShapingConfig;
3959
3960            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
3961            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
3962            // which has bandwidth and burst_loss fields
3963            let network_shaping = NetworkShapingConfig {
3964                enabled: profile.traffic_shaping.bandwidth.enabled
3965                    || profile.traffic_shaping.burst_loss.enabled,
3966                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
3967                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3968                max_connections: 1000, // Default value
3969            };
3970
3971            // Update chaos config if it exists, or create it
3972            // Chaos config is in observability.chaos, not core.chaos
3973            if let Some(ref mut chaos) = config.observability.chaos {
3974                chaos.traffic_shaping = Some(network_shaping);
3975            } else {
3976                // Create minimal chaos config with traffic shaping
3977                use mockforge_core::config::ChaosEngConfig;
3978                config.observability.chaos = Some(ChaosEngConfig {
3979                    enabled: true,
3980                    latency: None,
3981                    fault_injection: None,
3982                    rate_limit: None,
3983                    traffic_shaping: Some(network_shaping),
3984                    scenario: None,
3985                });
3986            }
3987
3988            info!("Network profile '{}' applied to server configuration", request.profile_name);
3989        } else {
3990            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3991        }
3992
3993        // Also update chaos API state if available
3994        #[cfg(feature = "chaos")]
3995        {
3996            if let Some(chaos_state) = &state.chaos_api_state {
3997                use mockforge_chaos::config::TrafficShapingConfig;
3998
3999                let mut chaos_config = chaos_state.config.write().await;
4000                // Apply profile's traffic shaping to chaos API state
4001                let chaos_traffic_shaping = TrafficShapingConfig {
4002                    enabled: profile.traffic_shaping.bandwidth.enabled
4003                        || profile.traffic_shaping.burst_loss.enabled,
4004                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4005                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4006                    max_connections: 0,
4007                    connection_timeout_ms: 30000,
4008                };
4009                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4010                chaos_config.enabled = true; // Enable chaos when applying a profile
4011                drop(chaos_config);
4012                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4013            }
4014        }
4015
4016        Json(serde_json::json!({
4017            "success": true,
4018            "message": format!("Network profile '{}' applied", request.profile_name),
4019            "profile": {
4020                "name": profile.name,
4021                "description": profile.description,
4022            }
4023        }))
4024        .into_response()
4025    } else {
4026        (
4027            StatusCode::NOT_FOUND,
4028            Json(serde_json::json!({
4029                "error": "Profile not found",
4030                "message": format!("Network profile '{}' not found", request.profile_name)
4031            })),
4032        )
4033            .into_response()
4034    }
4035}
4036
4037/// Build the management API router with UI Builder support
4038pub fn management_router_with_ui_builder(
4039    state: ManagementState,
4040    server_config: mockforge_core::config::ServerConfig,
4041) -> Router {
4042    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4043
4044    // Create the base management router
4045    let management = management_router(state);
4046
4047    // Create UI Builder state and router
4048    let ui_builder_state = UIBuilderState::new(server_config);
4049    let ui_builder = create_ui_builder_router(ui_builder_state);
4050
4051    // Nest UI Builder under /ui-builder
4052    management.nest("/ui-builder", ui_builder)
4053}
4054
4055/// Build management router with spec import API
4056pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4057    use crate::spec_import::{spec_import_router, SpecImportState};
4058
4059    // Create base management router
4060    let management = management_router(state);
4061
4062    // Merge with spec import router
4063    Router::new()
4064        .merge(management)
4065        .merge(spec_import_router(SpecImportState::new()))
4066}
4067
4068#[cfg(test)]
4069mod tests {
4070    use super::*;
4071
4072    #[tokio::test]
4073    async fn test_create_and_get_mock() {
4074        let state = ManagementState::new(None, None, 3000);
4075
4076        let mock = MockConfig {
4077            id: "test-1".to_string(),
4078            name: "Test Mock".to_string(),
4079            method: "GET".to_string(),
4080            path: "/test".to_string(),
4081            response: MockResponse {
4082                body: serde_json::json!({"message": "test"}),
4083                headers: None,
4084            },
4085            enabled: true,
4086            latency_ms: None,
4087            status_code: Some(200),
4088            request_match: None,
4089            priority: None,
4090            scenario: None,
4091            required_scenario_state: None,
4092            new_scenario_state: None,
4093        };
4094
4095        // Create mock
4096        {
4097            let mut mocks = state.mocks.write().await;
4098            mocks.push(mock.clone());
4099        }
4100
4101        // Get mock
4102        let mocks = state.mocks.read().await;
4103        let found = mocks.iter().find(|m| m.id == "test-1");
4104        assert!(found.is_some());
4105        assert_eq!(found.unwrap().name, "Test Mock");
4106    }
4107
4108    #[tokio::test]
4109    async fn test_server_stats() {
4110        let state = ManagementState::new(None, None, 3000);
4111
4112        // Add some mocks
4113        {
4114            let mut mocks = state.mocks.write().await;
4115            mocks.push(MockConfig {
4116                id: "1".to_string(),
4117                name: "Mock 1".to_string(),
4118                method: "GET".to_string(),
4119                path: "/test1".to_string(),
4120                response: MockResponse {
4121                    body: serde_json::json!({}),
4122                    headers: None,
4123                },
4124                enabled: true,
4125                latency_ms: None,
4126                status_code: Some(200),
4127                request_match: None,
4128                priority: None,
4129                scenario: None,
4130                required_scenario_state: None,
4131                new_scenario_state: None,
4132            });
4133            mocks.push(MockConfig {
4134                id: "2".to_string(),
4135                name: "Mock 2".to_string(),
4136                method: "POST".to_string(),
4137                path: "/test2".to_string(),
4138                response: MockResponse {
4139                    body: serde_json::json!({}),
4140                    headers: None,
4141                },
4142                enabled: false,
4143                latency_ms: None,
4144                status_code: Some(201),
4145                request_match: None,
4146                priority: None,
4147                scenario: None,
4148                required_scenario_state: None,
4149                new_scenario_state: None,
4150            });
4151        }
4152
4153        let mocks = state.mocks.read().await;
4154        assert_eq!(mocks.len(), 2);
4155        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4156    }
4157
4158    #[test]
4159    fn test_mock_matches_request_with_xpath_absolute_path() {
4160        let mock = MockConfig {
4161            id: "xpath-1".to_string(),
4162            name: "XPath Match".to_string(),
4163            method: "POST".to_string(),
4164            path: "/xml".to_string(),
4165            response: MockResponse {
4166                body: serde_json::json!({"ok": true}),
4167                headers: None,
4168            },
4169            enabled: true,
4170            latency_ms: None,
4171            status_code: Some(200),
4172            request_match: Some(RequestMatchCriteria {
4173                xpath: Some("/root/order/id".to_string()),
4174                ..Default::default()
4175            }),
4176            priority: None,
4177            scenario: None,
4178            required_scenario_state: None,
4179            new_scenario_state: None,
4180        };
4181
4182        let body = br#"<root><order><id>123</id></order></root>"#;
4183        let headers = std::collections::HashMap::new();
4184        let query = std::collections::HashMap::new();
4185
4186        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4187    }
4188
4189    #[test]
4190    fn test_mock_matches_request_with_xpath_text_predicate() {
4191        let mock = MockConfig {
4192            id: "xpath-2".to_string(),
4193            name: "XPath Predicate Match".to_string(),
4194            method: "POST".to_string(),
4195            path: "/xml".to_string(),
4196            response: MockResponse {
4197                body: serde_json::json!({"ok": true}),
4198                headers: None,
4199            },
4200            enabled: true,
4201            latency_ms: None,
4202            status_code: Some(200),
4203            request_match: Some(RequestMatchCriteria {
4204                xpath: Some("//order/id[text()='123']".to_string()),
4205                ..Default::default()
4206            }),
4207            priority: None,
4208            scenario: None,
4209            required_scenario_state: None,
4210            new_scenario_state: None,
4211        };
4212
4213        let body = br#"<root><order><id>123</id></order></root>"#;
4214        let headers = std::collections::HashMap::new();
4215        let query = std::collections::HashMap::new();
4216
4217        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4218    }
4219
4220    #[test]
4221    fn test_mock_matches_request_with_xpath_no_match() {
4222        let mock = MockConfig {
4223            id: "xpath-3".to_string(),
4224            name: "XPath No Match".to_string(),
4225            method: "POST".to_string(),
4226            path: "/xml".to_string(),
4227            response: MockResponse {
4228                body: serde_json::json!({"ok": true}),
4229                headers: None,
4230            },
4231            enabled: true,
4232            latency_ms: None,
4233            status_code: Some(200),
4234            request_match: Some(RequestMatchCriteria {
4235                xpath: Some("//order/id[text()='456']".to_string()),
4236                ..Default::default()
4237            }),
4238            priority: None,
4239            scenario: None,
4240            required_scenario_state: None,
4241            new_scenario_state: None,
4242        };
4243
4244        let body = br#"<root><order><id>123</id></order></root>"#;
4245        let headers = std::collections::HashMap::new();
4246        let query = std::collections::HashMap::new();
4247
4248        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4249    }
4250}