Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128/// Mock response configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131    /// Response body as JSON
132    pub body: serde_json::Value,
133    /// Optional custom response headers
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138/// Request matching criteria for advanced request matching
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141    /// Headers that must be present and match (case-insensitive header names)
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub headers: std::collections::HashMap<String, String>,
144    /// Query parameters that must be present and match
145    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146    pub query_params: std::collections::HashMap<String, String>,
147    /// Request body pattern (supports exact match or regex)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub body_pattern: Option<String>,
150    /// JSONPath expression for JSON body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub json_path: Option<String>,
153    /// XPath expression for XML body matching
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub xpath: Option<String>,
156    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub custom_matcher: Option<String>,
159}
160
161/// Check if a request matches the given mock configuration
162///
163/// This function implements comprehensive request matching including:
164/// - Method and path matching
165/// - Header matching (with regex support)
166/// - Query parameter matching
167/// - Body pattern matching (exact, regex, JSONPath, XPath)
168/// - Custom matcher expressions
169pub fn mock_matches_request(
170    mock: &MockConfig,
171    method: &str,
172    path: &str,
173    headers: &std::collections::HashMap<String, String>,
174    query_params: &std::collections::HashMap<String, String>,
175    body: Option<&[u8]>,
176) -> bool {
177    use regex::Regex;
178
179    // Check if mock is enabled
180    if !mock.enabled {
181        return false;
182    }
183
184    // Check method (case-insensitive)
185    if mock.method.to_uppercase() != method.to_uppercase() {
186        return false;
187    }
188
189    // Check path pattern (supports wildcards and path parameters)
190    if !path_matches_pattern(&mock.path, path) {
191        return false;
192    }
193
194    // Check request matching criteria if present
195    if let Some(criteria) = &mock.request_match {
196        // Check headers
197        for (key, expected_value) in &criteria.headers {
198            let header_key_lower = key.to_lowercase();
199            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201            if let Some((_, actual_value)) = found {
202                // Try regex match first, then exact match
203                if let Ok(re) = Regex::new(expected_value) {
204                    if !re.is_match(actual_value) {
205                        return false;
206                    }
207                } else if actual_value != expected_value {
208                    return false;
209                }
210            } else {
211                return false; // Header not found
212            }
213        }
214
215        // Check query parameters
216        for (key, expected_value) in &criteria.query_params {
217            if let Some(actual_value) = query_params.get(key) {
218                if actual_value != expected_value {
219                    return false;
220                }
221            } else {
222                return false; // Query param not found
223            }
224        }
225
226        // Check body pattern
227        if let Some(pattern) = &criteria.body_pattern {
228            if let Some(body_bytes) = body {
229                let body_str = String::from_utf8_lossy(body_bytes);
230                // Try regex first, then exact match
231                if let Ok(re) = Regex::new(pattern) {
232                    if !re.is_match(&body_str) {
233                        return false;
234                    }
235                } else if body_str.as_ref() != pattern {
236                    return false;
237                }
238            } else {
239                return false; // Body required but not present
240            }
241        }
242
243        // Check JSONPath (simplified implementation)
244        if let Some(json_path) = &criteria.json_path {
245            if let Some(body_bytes) = body {
246                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248                        // Simple JSONPath check
249                        if !json_path_exists(&json_value, json_path) {
250                            return false;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Check XPath (supports a focused subset)
258        if let Some(xpath) = &criteria.xpath {
259            if let Some(body_bytes) = body {
260                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261                    if !xml_xpath_exists(body_str, xpath) {
262                        return false;
263                    }
264                } else {
265                    return false;
266                }
267            } else {
268                return false; // Body required but not present
269            }
270        }
271
272        // Check custom matcher
273        if let Some(custom) = &criteria.custom_matcher {
274            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275                return false;
276            }
277        }
278    }
279
280    true
281}
282
283/// Check if a path matches a pattern (supports wildcards and path parameters)
284fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285    // Exact match
286    if pattern == path {
287        return true;
288    }
289
290    // Wildcard match
291    if pattern == "*" {
292        return true;
293    }
294
295    // Path parameter matching (e.g., /users/{id} matches /users/123)
296    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299    if pattern_parts.len() != path_parts.len() {
300        // Check for wildcard patterns
301        if pattern.contains('*') {
302            return matches_wildcard_pattern(pattern, path);
303        }
304        return false;
305    }
306
307    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308        // Check for path parameters {param}
309        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310            continue; // Matches any value
311        }
312
313        if pattern_part != path_part {
314            return false;
315        }
316    }
317
318    true
319}
320
321/// Check if path matches a wildcard pattern
322fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323    use regex::Regex;
324
325    // Convert pattern to regex
326    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329        return re.is_match(path);
330    }
331
332    false
333}
334
335/// Check if a JSONPath exists in a JSON value
336///
337/// Supports:
338/// - `$` — root element
339/// - `$.field.subfield` — nested object access
340/// - `$.items[0].name` — array index access
341/// - `$.items[*]` — array wildcard (checks array is non-empty)
342fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343    let path = if json_path == "$" {
344        return true;
345    } else if let Some(p) = json_path.strip_prefix("$.") {
346        p
347    } else if let Some(p) = json_path.strip_prefix('$') {
348        p.strip_prefix('.').unwrap_or(p)
349    } else {
350        json_path
351    };
352
353    let mut current = json;
354    for segment in split_json_path_segments(path) {
355        match segment {
356            JsonPathSegment::Field(name) => {
357                if let Some(obj) = current.as_object() {
358                    if let Some(value) = obj.get(name) {
359                        current = value;
360                    } else {
361                        return false;
362                    }
363                } else {
364                    return false;
365                }
366            }
367            JsonPathSegment::Index(idx) => {
368                if let Some(arr) = current.as_array() {
369                    if let Some(value) = arr.get(idx) {
370                        current = value;
371                    } else {
372                        return false;
373                    }
374                } else {
375                    return false;
376                }
377            }
378            JsonPathSegment::Wildcard => {
379                if let Some(arr) = current.as_array() {
380                    return !arr.is_empty();
381                }
382                return false;
383            }
384        }
385    }
386    true
387}
388
389enum JsonPathSegment<'a> {
390    Field(&'a str),
391    Index(usize),
392    Wildcard,
393}
394
395/// Split a JSONPath (without the leading `$`) into segments
396fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397    let mut segments = Vec::new();
398    for part in path.split('.') {
399        if part.is_empty() {
400            continue;
401        }
402        if let Some(bracket_start) = part.find('[') {
403            let field_name = &part[..bracket_start];
404            if !field_name.is_empty() {
405                segments.push(JsonPathSegment::Field(field_name));
406            }
407            let bracket_content = &part[bracket_start + 1..part.len() - 1];
408            if bracket_content == "*" {
409                segments.push(JsonPathSegment::Wildcard);
410            } else if let Ok(idx) = bracket_content.parse::<usize>() {
411                segments.push(JsonPathSegment::Index(idx));
412            }
413        } else {
414            segments.push(JsonPathSegment::Field(part));
415        }
416    }
417    segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422    name: String,
423    text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427    if segment.is_empty() {
428        return None;
429    }
430
431    let trimmed = segment.trim();
432    if let Some(bracket_start) = trimmed.find('[') {
433        if !trimmed.ends_with(']') {
434            return None;
435        }
436
437        let name = trimmed[..bracket_start].trim();
438        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439        let predicate = predicate.trim();
440
441        // Support simple predicate: [text()="value"] or [text()='value']
442        if let Some(raw) = predicate.strip_prefix("text()=") {
443            let raw = raw.trim();
444            if raw.len() >= 2
445                && ((raw.starts_with('"') && raw.ends_with('"'))
446                    || (raw.starts_with('\'') && raw.ends_with('\'')))
447            {
448                let text = raw[1..raw.len() - 1].to_string();
449                if !name.is_empty() {
450                    return Some(XPathSegment {
451                        name: name.to_string(),
452                        text_equals: Some(text),
453                    });
454                }
455            }
456        }
457
458        None
459    } else {
460        Some(XPathSegment {
461            name: trimmed.to_string(),
462            text_equals: None,
463        })
464    }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468    if !node.is_element() {
469        return false;
470    }
471    if node.tag_name().name() != segment.name {
472        return false;
473    }
474    match &segment.text_equals {
475        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476        None => true,
477    }
478}
479
480/// Check if an XPath expression matches an XML body.
481///
482/// Supported subset:
483/// - Absolute paths: `/root/child/item`
484/// - Descendant search: `//item` and `//parent/child`
485/// - Optional text predicate per segment: `item[text()="value"]`
486fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487    let doc = match roxmltree::Document::parse(xml_body) {
488        Ok(doc) => doc,
489        Err(err) => {
490            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491            return false;
492        }
493    };
494
495    let expr = xpath.trim();
496    if expr.is_empty() {
497        return false;
498    }
499
500    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501        (true, rest)
502    } else if let Some(rest) = expr.strip_prefix('/') {
503        (false, rest)
504    } else {
505        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506        return false;
507    };
508
509    let segments: Vec<XPathSegment> = path_str
510        .split('/')
511        .filter(|s| !s.trim().is_empty())
512        .filter_map(parse_xpath_segment)
513        .collect();
514
515    if segments.is_empty() {
516        return false;
517    }
518
519    if is_descendant {
520        let first = &segments[0];
521        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522            let mut frontier = vec![node];
523            for segment in &segments[1..] {
524                let mut next_frontier = Vec::new();
525                for parent in &frontier {
526                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527                        next_frontier.push(child);
528                    }
529                }
530                if next_frontier.is_empty() {
531                    frontier.clear();
532                    break;
533                }
534                frontier = next_frontier;
535            }
536            if !frontier.is_empty() {
537                return true;
538            }
539        }
540        false
541    } else {
542        let mut frontier = vec![doc.root_element()];
543        for (index, segment) in segments.iter().enumerate() {
544            let mut next_frontier = Vec::new();
545            for parent in &frontier {
546                if index == 0 {
547                    if segment_matches(*parent, segment) {
548                        next_frontier.push(*parent);
549                    }
550                    continue;
551                }
552                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553                    next_frontier.push(child);
554                }
555            }
556            if next_frontier.is_empty() {
557                return false;
558            }
559            frontier = next_frontier;
560        }
561        !frontier.is_empty()
562    }
563}
564
565/// Evaluate a custom matcher expression
566fn evaluate_custom_matcher(
567    expression: &str,
568    method: &str,
569    path: &str,
570    headers: &std::collections::HashMap<String, String>,
571    query_params: &std::collections::HashMap<String, String>,
572    body: Option<&[u8]>,
573) -> bool {
574    use regex::Regex;
575
576    let expr = expression.trim();
577
578    // Handle equality expressions (field == "value")
579    if expr.contains("==") {
580        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581        if parts.len() != 2 {
582            return false;
583        }
584
585        let field = parts[0];
586        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588        match field {
589            "method" => method == expected_value,
590            "path" => path == expected_value,
591            _ if field.starts_with("headers.") => {
592                let header_name = &field[8..];
593                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594            }
595            _ if field.starts_with("query.") => {
596                let param_name = &field[6..];
597                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598            }
599            _ => false,
600        }
601    }
602    // Handle regex match expressions (field =~ "pattern")
603    else if expr.contains("=~") {
604        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605        if parts.len() != 2 {
606            return false;
607        }
608
609        let field = parts[0];
610        let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612        if let Ok(re) = Regex::new(pattern) {
613            match field {
614                "method" => re.is_match(method),
615                "path" => re.is_match(path),
616                _ if field.starts_with("headers.") => {
617                    let header_name = &field[8..];
618                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619                }
620                _ if field.starts_with("query.") => {
621                    let param_name = &field[6..];
622                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623                }
624                _ => false,
625            }
626        } else {
627            false
628        }
629    }
630    // Handle contains expressions (field contains "value")
631    else if expr.contains("contains") {
632        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633        if parts.len() != 2 {
634            return false;
635        }
636
637        let field = parts[0];
638        let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640        match field {
641            "path" => path.contains(search_value),
642            _ if field.starts_with("headers.") => {
643                let header_name = &field[8..];
644                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645            }
646            _ if field.starts_with("body") => {
647                if let Some(body_bytes) = body {
648                    let body_str = String::from_utf8_lossy(body_bytes);
649                    body_str.contains(search_value)
650                } else {
651                    false
652                }
653            }
654            _ => false,
655        }
656    } else {
657        // Unknown expression format
658        tracing::warn!("Unknown custom matcher expression format: {}", expr);
659        false
660    }
661}
662
663/// Server statistics
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666    /// Server uptime in seconds
667    pub uptime_seconds: u64,
668    /// Total number of requests processed
669    pub total_requests: u64,
670    /// Number of active mock configurations
671    pub active_mocks: usize,
672    /// Number of currently enabled mocks
673    pub enabled_mocks: usize,
674    /// Number of registered API routes
675    pub registered_routes: usize,
676}
677
678/// Server configuration info
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681    /// MockForge version string
682    pub version: String,
683    /// Server port number
684    pub port: u16,
685    /// Whether an OpenAPI spec is loaded
686    pub has_openapi_spec: bool,
687    /// Optional path to the OpenAPI spec file
688    #[serde(skip_serializing_if = "Option::is_none")]
689    pub spec_path: Option<String>,
690}
691
692/// Shared state for the management API
693#[derive(Clone)]
694pub struct ManagementState {
695    /// Collection of mock configurations
696    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697    /// Optional OpenAPI specification
698    pub spec: Option<Arc<OpenApiSpec>>,
699    /// Optional path to the OpenAPI spec file
700    pub spec_path: Option<String>,
701    /// Server port number
702    pub port: u16,
703    /// Server start time for uptime calculation
704    pub start_time: std::time::Instant,
705    /// Counter for total requests processed
706    pub request_counter: Arc<RwLock<u64>>,
707    /// Optional proxy configuration for migration pipeline
708    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709    /// Optional SMTP registry for email mocking
710    #[cfg(feature = "smtp")]
711    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712    /// Optional MQTT broker for message mocking
713    #[cfg(feature = "mqtt")]
714    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715    /// Optional Kafka broker for event streaming
716    #[cfg(feature = "kafka")]
717    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718    /// Broadcast channel for message events (MQTT & Kafka)
719    #[cfg(any(feature = "mqtt", feature = "kafka"))]
720    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721    /// State machine manager for scenario state machines
722    pub state_machine_manager:
723        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724    /// Optional WebSocket broadcast channel for real-time updates
725    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726    /// Lifecycle hook registry for extensibility
727    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728    /// Rule explanations storage (in-memory for now)
729    pub rule_explanations: Arc<
730        RwLock<
731            std::collections::HashMap<
732                String,
733                mockforge_core::intelligent_behavior::RuleExplanation,
734            >,
735        >,
736    >,
737    /// Optional chaos API state for chaos config management
738    #[cfg(feature = "chaos")]
739    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740    /// Optional server configuration for profile application
741    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742    /// Conformance testing state
743    #[cfg(feature = "conformance")]
744    pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748    /// Create a new management state
749    ///
750    /// # Arguments
751    /// * `spec` - Optional OpenAPI specification
752    /// * `spec_path` - Optional path to the OpenAPI spec file
753    /// * `port` - Server port number
754    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755        Self {
756            mocks: Arc::new(RwLock::new(Vec::new())),
757            spec,
758            spec_path,
759            port,
760            start_time: std::time::Instant::now(),
761            request_counter: Arc::new(RwLock::new(0)),
762            proxy_config: None,
763            #[cfg(feature = "smtp")]
764            smtp_registry: None,
765            #[cfg(feature = "mqtt")]
766            mqtt_broker: None,
767            #[cfg(feature = "kafka")]
768            kafka_broker: None,
769            #[cfg(any(feature = "mqtt", feature = "kafka"))]
770            message_events: {
771                let capacity = get_message_broadcast_capacity();
772                let (tx, _) = broadcast::channel(capacity);
773                Arc::new(tx)
774            },
775            state_machine_manager: Arc::new(RwLock::new(
776                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777            )),
778            ws_broadcast: None,
779            lifecycle_hooks: None,
780            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781            #[cfg(feature = "chaos")]
782            chaos_api_state: None,
783            server_config: None,
784            #[cfg(feature = "conformance")]
785            conformance_state: crate::handlers::conformance::ConformanceState::new(),
786        }
787    }
788
789    /// Add lifecycle hook registry to management state
790    pub fn with_lifecycle_hooks(
791        mut self,
792        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793    ) -> Self {
794        self.lifecycle_hooks = Some(hooks);
795        self
796    }
797
798    /// Add WebSocket broadcast channel to management state
799    pub fn with_ws_broadcast(
800        mut self,
801        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802    ) -> Self {
803        self.ws_broadcast = Some(ws_broadcast);
804        self
805    }
806
807    /// Add proxy configuration to management state
808    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809        self.proxy_config = Some(proxy_config);
810        self
811    }
812
813    #[cfg(feature = "smtp")]
814    /// Add SMTP registry to management state
815    pub fn with_smtp_registry(
816        mut self,
817        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818    ) -> Self {
819        self.smtp_registry = Some(smtp_registry);
820        self
821    }
822
823    #[cfg(feature = "mqtt")]
824    /// Add MQTT broker to management state
825    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826        self.mqtt_broker = Some(mqtt_broker);
827        self
828    }
829
830    #[cfg(feature = "kafka")]
831    /// Add Kafka broker to management state
832    pub fn with_kafka_broker(
833        mut self,
834        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835    ) -> Self {
836        self.kafka_broker = Some(kafka_broker);
837        self
838    }
839
840    #[cfg(feature = "chaos")]
841    /// Add chaos API state to management state
842    pub fn with_chaos_api_state(
843        mut self,
844        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845    ) -> Self {
846        self.chaos_api_state = Some(chaos_api_state);
847        self
848    }
849
850    /// Add server configuration to management state
851    pub fn with_server_config(
852        mut self,
853        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854    ) -> Self {
855        self.server_config = Some(server_config);
856        self
857    }
858}
859
860/// List all mocks
861async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862    let mocks = state.mocks.read().await;
863    Json(serde_json::json!({
864        "mocks": *mocks,
865        "total": mocks.len(),
866        "enabled": mocks.iter().filter(|m| m.enabled).count()
867    }))
868}
869
870/// Get a specific mock by ID
871async fn get_mock(
872    State(state): State<ManagementState>,
873    Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875    let mocks = state.mocks.read().await;
876    mocks
877        .iter()
878        .find(|m| m.id == id)
879        .cloned()
880        .map(Json)
881        .ok_or(StatusCode::NOT_FOUND)
882}
883
884/// Create a new mock
885async fn create_mock(
886    State(state): State<ManagementState>,
887    Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889    let mut mocks = state.mocks.write().await;
890
891    // Generate ID if not provided
892    if mock.id.is_empty() {
893        mock.id = uuid::Uuid::new_v4().to_string();
894    }
895
896    // Check for duplicate ID
897    if mocks.iter().any(|m| m.id == mock.id) {
898        return Err(StatusCode::CONFLICT);
899    }
900
901    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903    // Invoke lifecycle hooks
904    if let Some(hooks) = &state.lifecycle_hooks {
905        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906            id: mock.id.clone(),
907            name: mock.name.clone(),
908            config: serde_json::to_value(&mock).unwrap_or_default(),
909        };
910        hooks.invoke_mock_created(&event).await;
911    }
912
913    mocks.push(mock.clone());
914
915    // Broadcast WebSocket event
916    if let Some(tx) = &state.ws_broadcast {
917        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918    }
919
920    Ok(Json(mock))
921}
922
923/// Update an existing mock
924async fn update_mock(
925    State(state): State<ManagementState>,
926    Path(id): Path<String>,
927    Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929    let mut mocks = state.mocks.write().await;
930
931    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933    // Get old mock for comparison
934    let old_mock = mocks[position].clone();
935
936    info!("Updating mock: {}", id);
937    mocks[position] = updated_mock.clone();
938
939    // Invoke lifecycle hooks
940    if let Some(hooks) = &state.lifecycle_hooks {
941        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942            id: updated_mock.id.clone(),
943            name: updated_mock.name.clone(),
944            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945        };
946        hooks.invoke_mock_updated(&event).await;
947
948        // Check if enabled state changed
949        if old_mock.enabled != updated_mock.enabled {
950            let state_event = if updated_mock.enabled {
951                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952                    id: updated_mock.id.clone(),
953                }
954            } else {
955                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956                    id: updated_mock.id.clone(),
957                }
958            };
959            hooks.invoke_mock_state_changed(&state_event).await;
960        }
961    }
962
963    // Broadcast WebSocket event
964    if let Some(tx) = &state.ws_broadcast {
965        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966    }
967
968    Ok(Json(updated_mock))
969}
970
971/// Delete a mock
972async fn delete_mock(
973    State(state): State<ManagementState>,
974    Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976    let mut mocks = state.mocks.write().await;
977
978    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980    // Get mock info before deletion for lifecycle hooks
981    let deleted_mock = mocks[position].clone();
982
983    info!("Deleting mock: {}", id);
984    mocks.remove(position);
985
986    // Invoke lifecycle hooks
987    if let Some(hooks) = &state.lifecycle_hooks {
988        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989            id: deleted_mock.id.clone(),
990            name: deleted_mock.name.clone(),
991        };
992        hooks.invoke_mock_deleted(&event).await;
993    }
994
995    // Broadcast WebSocket event
996    if let Some(tx) = &state.ws_broadcast {
997        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998    }
999
1000    Ok(StatusCode::NO_CONTENT)
1001}
1002
1003/// Request to validate configuration
1004#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006    /// Configuration to validate (as JSON)
1007    pub config: serde_json::Value,
1008    /// Format of the configuration ("json" or "yaml")
1009    #[serde(default = "default_format")]
1010    pub format: String,
1011}
1012
1013fn default_format() -> String {
1014    "json".to_string()
1015}
1016
1017/// Validate configuration without applying it
1018async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019    use mockforge_core::config::ServerConfig;
1020
1021    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022        "yaml" | "yml" => {
1023            let yaml_str = match serde_json::to_string(&request.config) {
1024                Ok(s) => s,
1025                Err(e) => {
1026                    return (
1027                        StatusCode::BAD_REQUEST,
1028                        Json(serde_json::json!({
1029                            "valid": false,
1030                            "error": format!("Failed to convert to string: {}", e),
1031                            "message": "Configuration validation failed"
1032                        })),
1033                    )
1034                        .into_response();
1035                }
1036            };
1037            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038        }
1039        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040    };
1041
1042    match config_result {
1043        Ok(_) => Json(serde_json::json!({
1044            "valid": true,
1045            "message": "Configuration is valid"
1046        }))
1047        .into_response(),
1048        Err(e) => (
1049            StatusCode::BAD_REQUEST,
1050            Json(serde_json::json!({
1051                "valid": false,
1052                "error": format!("Invalid configuration: {}", e),
1053                "message": "Configuration validation failed"
1054            })),
1055        )
1056            .into_response(),
1057    }
1058}
1059
1060/// Request for bulk configuration update
1061#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063    /// Partial configuration updates (only specified fields will be updated)
1064    pub updates: serde_json::Value,
1065}
1066
1067/// Bulk update configuration
1068///
1069/// This endpoint allows updating multiple configuration options at once.
1070/// Only the specified fields in the updates object will be modified.
1071///
1072/// Configuration updates are applied to the server configuration if available
1073/// in ManagementState. Changes take effect immediately for supported settings.
1074async fn bulk_update_config(
1075    State(state): State<ManagementState>,
1076    Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078    // Validate the updates structure
1079    if !request.updates.is_object() {
1080        return (
1081            StatusCode::BAD_REQUEST,
1082            Json(serde_json::json!({
1083                "error": "Invalid request",
1084                "message": "Updates must be a JSON object"
1085            })),
1086        )
1087            .into_response();
1088    }
1089
1090    // Try to validate as partial ServerConfig
1091    use mockforge_core::config::ServerConfig;
1092
1093    // Create a minimal valid config and try to merge updates
1094    let base_config = ServerConfig::default();
1095    let base_json = match serde_json::to_value(&base_config) {
1096        Ok(v) => v,
1097        Err(e) => {
1098            return (
1099                StatusCode::INTERNAL_SERVER_ERROR,
1100                Json(serde_json::json!({
1101                    "error": "Internal error",
1102                    "message": format!("Failed to serialize base config: {}", e)
1103                })),
1104            )
1105                .into_response();
1106        }
1107    };
1108
1109    // Merge updates into base config (simplified merge)
1110    let mut merged = base_json.clone();
1111    if let (Some(merged_obj), Some(updates_obj)) =
1112        (merged.as_object_mut(), request.updates.as_object())
1113    {
1114        for (key, value) in updates_obj {
1115            merged_obj.insert(key.clone(), value.clone());
1116        }
1117    }
1118
1119    // Validate the merged config
1120    match serde_json::from_value::<ServerConfig>(merged) {
1121        Ok(validated_config) => {
1122            // Apply config if server_config is available in ManagementState
1123            if let Some(ref config_lock) = state.server_config {
1124                let mut config = config_lock.write().await;
1125                *config = validated_config;
1126                Json(serde_json::json!({
1127                    "success": true,
1128                    "message": "Bulk configuration update applied successfully",
1129                    "updates_received": request.updates,
1130                    "validated": true,
1131                    "applied": true
1132                }))
1133                .into_response()
1134            } else {
1135                Json(serde_json::json!({
1136                    "success": true,
1137                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138                    "updates_received": request.updates,
1139                    "validated": true,
1140                    "applied": false
1141                }))
1142                .into_response()
1143            }
1144        }
1145        Err(e) => (
1146            StatusCode::BAD_REQUEST,
1147            Json(serde_json::json!({
1148                "error": "Invalid configuration",
1149                "message": format!("Configuration validation failed: {}", e),
1150                "validated": false
1151            })),
1152        )
1153            .into_response(),
1154    }
1155}
1156
1157/// Get server statistics
1158async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159    let mocks = state.mocks.read().await;
1160    let request_count = *state.request_counter.read().await;
1161
1162    Json(ServerStats {
1163        uptime_seconds: state.start_time.elapsed().as_secs(),
1164        total_requests: request_count,
1165        active_mocks: mocks.len(),
1166        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1168    })
1169}
1170
1171/// Get server configuration
1172async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173    Json(ServerConfig {
1174        version: env!("CARGO_PKG_VERSION").to_string(),
1175        port: state.port,
1176        has_openapi_spec: state.spec.is_some(),
1177        spec_path: state.spec_path.clone(),
1178    })
1179}
1180
1181/// Health check endpoint
1182async fn health_check() -> Json<serde_json::Value> {
1183    Json(serde_json::json!({
1184        "status": "healthy",
1185        "service": "mockforge-management",
1186        "timestamp": chrono::Utc::now().to_rfc3339()
1187    }))
1188}
1189
1190/// Export format for mock configurations
1191#[derive(Debug, Clone, Serialize, Deserialize)]
1192#[serde(rename_all = "lowercase")]
1193pub enum ExportFormat {
1194    /// JSON format
1195    Json,
1196    /// YAML format
1197    Yaml,
1198}
1199
1200/// Export mocks in specified format
1201async fn export_mocks(
1202    State(state): State<ManagementState>,
1203    Query(params): Query<std::collections::HashMap<String, String>>,
1204) -> Result<(StatusCode, String), StatusCode> {
1205    let mocks = state.mocks.read().await;
1206
1207    let format = params
1208        .get("format")
1209        .map(|f| match f.as_str() {
1210            "yaml" | "yml" => ExportFormat::Yaml,
1211            _ => ExportFormat::Json,
1212        })
1213        .unwrap_or(ExportFormat::Json);
1214
1215    match format {
1216        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1217            .map(|json| (StatusCode::OK, json))
1218            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1219        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1220            .map(|yaml| (StatusCode::OK, yaml))
1221            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1222    }
1223}
1224
1225/// Import mocks from JSON/YAML
1226async fn import_mocks(
1227    State(state): State<ManagementState>,
1228    Json(mocks): Json<Vec<MockConfig>>,
1229) -> impl IntoResponse {
1230    let mut current_mocks = state.mocks.write().await;
1231    current_mocks.clear();
1232    current_mocks.extend(mocks);
1233    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1234}
1235
1236#[cfg(feature = "smtp")]
1237/// List SMTP emails in mailbox
1238async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1239    if let Some(ref smtp_registry) = state.smtp_registry {
1240        match smtp_registry.get_emails() {
1241            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1242            Err(e) => (
1243                StatusCode::INTERNAL_SERVER_ERROR,
1244                Json(serde_json::json!({
1245                    "error": "Failed to retrieve emails",
1246                    "message": e.to_string()
1247                })),
1248            ),
1249        }
1250    } else {
1251        (
1252            StatusCode::NOT_IMPLEMENTED,
1253            Json(serde_json::json!({
1254                "error": "SMTP mailbox management not available",
1255                "message": "SMTP server is not enabled or registry not available."
1256            })),
1257        )
1258    }
1259}
1260
1261/// Get specific SMTP email
1262#[cfg(feature = "smtp")]
1263async fn get_smtp_email(
1264    State(state): State<ManagementState>,
1265    Path(id): Path<String>,
1266) -> impl IntoResponse {
1267    if let Some(ref smtp_registry) = state.smtp_registry {
1268        match smtp_registry.get_email_by_id(&id) {
1269            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1270            Ok(None) => (
1271                StatusCode::NOT_FOUND,
1272                Json(serde_json::json!({
1273                    "error": "Email not found",
1274                    "id": id
1275                })),
1276            ),
1277            Err(e) => (
1278                StatusCode::INTERNAL_SERVER_ERROR,
1279                Json(serde_json::json!({
1280                    "error": "Failed to retrieve email",
1281                    "message": e.to_string()
1282                })),
1283            ),
1284        }
1285    } else {
1286        (
1287            StatusCode::NOT_IMPLEMENTED,
1288            Json(serde_json::json!({
1289                "error": "SMTP mailbox management not available",
1290                "message": "SMTP server is not enabled or registry not available."
1291            })),
1292        )
1293    }
1294}
1295
1296/// Clear SMTP mailbox
1297#[cfg(feature = "smtp")]
1298async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1299    if let Some(ref smtp_registry) = state.smtp_registry {
1300        match smtp_registry.clear_mailbox() {
1301            Ok(()) => (
1302                StatusCode::OK,
1303                Json(serde_json::json!({
1304                    "message": "Mailbox cleared successfully"
1305                })),
1306            ),
1307            Err(e) => (
1308                StatusCode::INTERNAL_SERVER_ERROR,
1309                Json(serde_json::json!({
1310                    "error": "Failed to clear mailbox",
1311                    "message": e.to_string()
1312                })),
1313            ),
1314        }
1315    } else {
1316        (
1317            StatusCode::NOT_IMPLEMENTED,
1318            Json(serde_json::json!({
1319                "error": "SMTP mailbox management not available",
1320                "message": "SMTP server is not enabled or registry not available."
1321            })),
1322        )
1323    }
1324}
1325
1326/// Export SMTP mailbox
1327#[cfg(feature = "smtp")]
1328async fn export_smtp_mailbox(
1329    Query(params): Query<std::collections::HashMap<String, String>>,
1330) -> impl IntoResponse {
1331    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1332    (
1333        StatusCode::NOT_IMPLEMENTED,
1334        Json(serde_json::json!({
1335            "error": "SMTP mailbox management not available via HTTP API",
1336            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1337            "requested_format": format
1338        })),
1339    )
1340}
1341
1342/// Search SMTP emails
1343#[cfg(feature = "smtp")]
1344async fn search_smtp_emails(
1345    State(state): State<ManagementState>,
1346    Query(params): Query<std::collections::HashMap<String, String>>,
1347) -> impl IntoResponse {
1348    if let Some(ref smtp_registry) = state.smtp_registry {
1349        let filters = EmailSearchFilters {
1350            sender: params.get("sender").cloned(),
1351            recipient: params.get("recipient").cloned(),
1352            subject: params.get("subject").cloned(),
1353            body: params.get("body").cloned(),
1354            since: params
1355                .get("since")
1356                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1357                .map(|dt| dt.with_timezone(&chrono::Utc)),
1358            until: params
1359                .get("until")
1360                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1361                .map(|dt| dt.with_timezone(&chrono::Utc)),
1362            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1363            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1364        };
1365
1366        match smtp_registry.search_emails(filters) {
1367            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1368            Err(e) => (
1369                StatusCode::INTERNAL_SERVER_ERROR,
1370                Json(serde_json::json!({
1371                    "error": "Failed to search emails",
1372                    "message": e.to_string()
1373                })),
1374            ),
1375        }
1376    } else {
1377        (
1378            StatusCode::NOT_IMPLEMENTED,
1379            Json(serde_json::json!({
1380                "error": "SMTP mailbox management not available",
1381                "message": "SMTP server is not enabled or registry not available."
1382            })),
1383        )
1384    }
1385}
1386
1387/// MQTT broker statistics
1388#[cfg(feature = "mqtt")]
1389#[derive(Debug, Clone, Serialize, Deserialize)]
1390pub struct MqttBrokerStats {
1391    /// Number of connected MQTT clients
1392    pub connected_clients: usize,
1393    /// Number of active MQTT topics
1394    pub active_topics: usize,
1395    /// Number of retained messages
1396    pub retained_messages: usize,
1397    /// Total number of subscriptions
1398    pub total_subscriptions: usize,
1399}
1400
1401/// MQTT management handlers
1402#[cfg(feature = "mqtt")]
1403async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1404    if let Some(broker) = &state.mqtt_broker {
1405        let connected_clients = broker.get_connected_clients().await.len();
1406        let active_topics = broker.get_active_topics().await.len();
1407        let stats = broker.get_topic_stats().await;
1408
1409        let broker_stats = MqttBrokerStats {
1410            connected_clients,
1411            active_topics,
1412            retained_messages: stats.retained_messages,
1413            total_subscriptions: stats.total_subscriptions,
1414        };
1415
1416        Json(broker_stats).into_response()
1417    } else {
1418        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1419    }
1420}
1421
1422#[cfg(feature = "mqtt")]
1423async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1424    if let Some(broker) = &state.mqtt_broker {
1425        let clients = broker.get_connected_clients().await;
1426        Json(serde_json::json!({
1427            "clients": clients
1428        }))
1429        .into_response()
1430    } else {
1431        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1432    }
1433}
1434
1435#[cfg(feature = "mqtt")]
1436async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1437    if let Some(broker) = &state.mqtt_broker {
1438        let topics = broker.get_active_topics().await;
1439        Json(serde_json::json!({
1440            "topics": topics
1441        }))
1442        .into_response()
1443    } else {
1444        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1445    }
1446}
1447
1448#[cfg(feature = "mqtt")]
1449async fn disconnect_mqtt_client(
1450    State(state): State<ManagementState>,
1451    Path(client_id): Path<String>,
1452) -> impl IntoResponse {
1453    if let Some(broker) = &state.mqtt_broker {
1454        match broker.disconnect_client(&client_id).await {
1455            Ok(_) => {
1456                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1457            }
1458            Err(e) => {
1459                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1460                    .into_response()
1461            }
1462        }
1463    } else {
1464        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1465    }
1466}
1467
1468// ========== MQTT Publish Handler ==========
1469
1470#[cfg(feature = "mqtt")]
1471/// Request to publish a single MQTT message
1472#[derive(Debug, Deserialize)]
1473pub struct MqttPublishRequest {
1474    /// Topic to publish to
1475    pub topic: String,
1476    /// Message payload (string or JSON)
1477    pub payload: String,
1478    /// QoS level (0, 1, or 2)
1479    #[serde(default = "default_qos")]
1480    pub qos: u8,
1481    /// Whether to retain the message
1482    #[serde(default)]
1483    pub retain: bool,
1484}
1485
1486#[cfg(feature = "mqtt")]
1487fn default_qos() -> u8 {
1488    0
1489}
1490
1491#[cfg(feature = "mqtt")]
1492/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1493async fn publish_mqtt_message_handler(
1494    State(state): State<ManagementState>,
1495    Json(request): Json<serde_json::Value>,
1496) -> impl IntoResponse {
1497    // Extract fields from JSON manually
1498    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1499    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1500    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1501    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1502
1503    if topic.is_none() || payload.is_none() {
1504        return (
1505            StatusCode::BAD_REQUEST,
1506            Json(serde_json::json!({
1507                "error": "Invalid request",
1508                "message": "Missing required fields: topic and payload"
1509            })),
1510        );
1511    }
1512
1513    let topic = topic.unwrap();
1514    let payload = payload.unwrap();
1515
1516    if let Some(broker) = &state.mqtt_broker {
1517        // Validate QoS
1518        if qos > 2 {
1519            return (
1520                StatusCode::BAD_REQUEST,
1521                Json(serde_json::json!({
1522                    "error": "Invalid QoS",
1523                    "message": "QoS must be 0, 1, or 2"
1524                })),
1525            );
1526        }
1527
1528        // Convert payload to bytes
1529        let payload_bytes = payload.as_bytes().to_vec();
1530        let client_id = "mockforge-management-api".to_string();
1531
1532        let publish_result = broker
1533            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1534            .await
1535            .map_err(|e| format!("{}", e));
1536
1537        match publish_result {
1538            Ok(_) => {
1539                // Emit message event for real-time monitoring
1540                let event = MessageEvent::Mqtt(MqttMessageEvent {
1541                    topic: topic.clone(),
1542                    payload: payload.clone(),
1543                    qos,
1544                    retain,
1545                    timestamp: chrono::Utc::now().to_rfc3339(),
1546                });
1547                let _ = state.message_events.send(event);
1548
1549                (
1550                    StatusCode::OK,
1551                    Json(serde_json::json!({
1552                        "success": true,
1553                        "message": format!("Message published to topic '{}'", topic),
1554                        "topic": topic,
1555                        "qos": qos,
1556                        "retain": retain
1557                    })),
1558                )
1559            }
1560            Err(error_msg) => (
1561                StatusCode::INTERNAL_SERVER_ERROR,
1562                Json(serde_json::json!({
1563                    "error": "Failed to publish message",
1564                    "message": error_msg
1565                })),
1566            ),
1567        }
1568    } else {
1569        (
1570            StatusCode::SERVICE_UNAVAILABLE,
1571            Json(serde_json::json!({
1572                "error": "MQTT broker not available",
1573                "message": "MQTT broker is not enabled or not available."
1574            })),
1575        )
1576    }
1577}
1578
1579#[cfg(not(feature = "mqtt"))]
1580/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1581async fn publish_mqtt_message_handler(
1582    State(_state): State<ManagementState>,
1583    Json(_request): Json<serde_json::Value>,
1584) -> impl IntoResponse {
1585    (
1586        StatusCode::SERVICE_UNAVAILABLE,
1587        Json(serde_json::json!({
1588            "error": "MQTT feature not enabled",
1589            "message": "MQTT support is not compiled into this build"
1590        })),
1591    )
1592}
1593
1594#[cfg(feature = "mqtt")]
1595/// Request to publish multiple MQTT messages
1596#[derive(Debug, Deserialize)]
1597pub struct MqttBatchPublishRequest {
1598    /// List of messages to publish
1599    pub messages: Vec<MqttPublishRequest>,
1600    /// Delay between messages in milliseconds
1601    #[serde(default = "default_delay")]
1602    pub delay_ms: u64,
1603}
1604
1605#[cfg(feature = "mqtt")]
1606fn default_delay() -> u64 {
1607    100
1608}
1609
1610#[cfg(feature = "mqtt")]
1611/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1612async fn publish_mqtt_batch_handler(
1613    State(state): State<ManagementState>,
1614    Json(request): Json<serde_json::Value>,
1615) -> impl IntoResponse {
1616    // Extract fields from JSON manually
1617    let messages_json = request.get("messages").and_then(|v| v.as_array());
1618    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1619
1620    if messages_json.is_none() {
1621        return (
1622            StatusCode::BAD_REQUEST,
1623            Json(serde_json::json!({
1624                "error": "Invalid request",
1625                "message": "Missing required field: messages"
1626            })),
1627        );
1628    }
1629
1630    let messages_json = messages_json.unwrap();
1631
1632    if let Some(broker) = &state.mqtt_broker {
1633        if messages_json.is_empty() {
1634            return (
1635                StatusCode::BAD_REQUEST,
1636                Json(serde_json::json!({
1637                    "error": "Empty batch",
1638                    "message": "At least one message is required"
1639                })),
1640            );
1641        }
1642
1643        let mut results = Vec::new();
1644        let client_id = "mockforge-management-api".to_string();
1645
1646        for (index, msg_json) in messages_json.iter().enumerate() {
1647            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1648            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1649            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1650            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1651
1652            if topic.is_none() || payload.is_none() {
1653                results.push(serde_json::json!({
1654                    "index": index,
1655                    "success": false,
1656                    "error": "Missing required fields: topic and payload"
1657                }));
1658                continue;
1659            }
1660
1661            let topic = topic.unwrap();
1662            let payload = payload.unwrap();
1663
1664            // Validate QoS
1665            if qos > 2 {
1666                results.push(serde_json::json!({
1667                    "index": index,
1668                    "success": false,
1669                    "error": "Invalid QoS (must be 0, 1, or 2)"
1670                }));
1671                continue;
1672            }
1673
1674            // Convert payload to bytes
1675            let payload_bytes = payload.as_bytes().to_vec();
1676
1677            let publish_result = broker
1678                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1679                .await
1680                .map_err(|e| format!("{}", e));
1681
1682            match publish_result {
1683                Ok(_) => {
1684                    // Emit message event
1685                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1686                        topic: topic.clone(),
1687                        payload: payload.clone(),
1688                        qos,
1689                        retain,
1690                        timestamp: chrono::Utc::now().to_rfc3339(),
1691                    });
1692                    let _ = state.message_events.send(event);
1693
1694                    results.push(serde_json::json!({
1695                        "index": index,
1696                        "success": true,
1697                        "topic": topic,
1698                        "qos": qos
1699                    }));
1700                }
1701                Err(error_msg) => {
1702                    results.push(serde_json::json!({
1703                        "index": index,
1704                        "success": false,
1705                        "error": error_msg
1706                    }));
1707                }
1708            }
1709
1710            // Add delay between messages (except for the last one)
1711            if index < messages_json.len() - 1 && delay_ms > 0 {
1712                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1713            }
1714        }
1715
1716        let success_count =
1717            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1718
1719        (
1720            StatusCode::OK,
1721            Json(serde_json::json!({
1722                "success": true,
1723                "total": messages_json.len(),
1724                "succeeded": success_count,
1725                "failed": messages_json.len() - success_count,
1726                "results": results
1727            })),
1728        )
1729    } else {
1730        (
1731            StatusCode::SERVICE_UNAVAILABLE,
1732            Json(serde_json::json!({
1733                "error": "MQTT broker not available",
1734                "message": "MQTT broker is not enabled or not available."
1735            })),
1736        )
1737    }
1738}
1739
1740#[cfg(not(feature = "mqtt"))]
1741/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1742async fn publish_mqtt_batch_handler(
1743    State(_state): State<ManagementState>,
1744    Json(_request): Json<serde_json::Value>,
1745) -> impl IntoResponse {
1746    (
1747        StatusCode::SERVICE_UNAVAILABLE,
1748        Json(serde_json::json!({
1749            "error": "MQTT feature not enabled",
1750            "message": "MQTT support is not compiled into this build"
1751        })),
1752    )
1753}
1754
1755// Migration pipeline handlers
1756
1757/// Request to set migration mode
1758#[derive(Debug, Deserialize)]
1759struct SetMigrationModeRequest {
1760    mode: String,
1761}
1762
1763/// Get all migration routes
1764async fn get_migration_routes(
1765    State(state): State<ManagementState>,
1766) -> Result<Json<serde_json::Value>, StatusCode> {
1767    let proxy_config = match &state.proxy_config {
1768        Some(config) => config,
1769        None => {
1770            return Ok(Json(serde_json::json!({
1771                "error": "Migration not configured. Proxy config not available."
1772            })));
1773        }
1774    };
1775
1776    let config = proxy_config.read().await;
1777    let routes = config.get_migration_routes();
1778
1779    Ok(Json(serde_json::json!({
1780        "routes": routes
1781    })))
1782}
1783
1784/// Toggle a route's migration mode
1785async fn toggle_route_migration(
1786    State(state): State<ManagementState>,
1787    Path(pattern): Path<String>,
1788) -> Result<Json<serde_json::Value>, StatusCode> {
1789    let proxy_config = match &state.proxy_config {
1790        Some(config) => config,
1791        None => {
1792            return Ok(Json(serde_json::json!({
1793                "error": "Migration not configured. Proxy config not available."
1794            })));
1795        }
1796    };
1797
1798    let mut config = proxy_config.write().await;
1799    let new_mode = match config.toggle_route_migration(&pattern) {
1800        Some(mode) => mode,
1801        None => {
1802            return Ok(Json(serde_json::json!({
1803                "error": format!("Route pattern not found: {}", pattern)
1804            })));
1805        }
1806    };
1807
1808    Ok(Json(serde_json::json!({
1809        "pattern": pattern,
1810        "mode": format!("{:?}", new_mode).to_lowercase()
1811    })))
1812}
1813
1814/// Set a route's migration mode explicitly
1815async fn set_route_migration_mode(
1816    State(state): State<ManagementState>,
1817    Path(pattern): Path<String>,
1818    Json(request): Json<SetMigrationModeRequest>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820    let proxy_config = match &state.proxy_config {
1821        Some(config) => config,
1822        None => {
1823            return Ok(Json(serde_json::json!({
1824                "error": "Migration not configured. Proxy config not available."
1825            })));
1826        }
1827    };
1828
1829    use mockforge_core::proxy::config::MigrationMode;
1830    let mode = match request.mode.to_lowercase().as_str() {
1831        "mock" => MigrationMode::Mock,
1832        "shadow" => MigrationMode::Shadow,
1833        "real" => MigrationMode::Real,
1834        "auto" => MigrationMode::Auto,
1835        _ => {
1836            return Ok(Json(serde_json::json!({
1837                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1838            })));
1839        }
1840    };
1841
1842    let mut config = proxy_config.write().await;
1843    let updated = config.update_rule_migration_mode(&pattern, mode);
1844
1845    if !updated {
1846        return Ok(Json(serde_json::json!({
1847            "error": format!("Route pattern not found: {}", pattern)
1848        })));
1849    }
1850
1851    Ok(Json(serde_json::json!({
1852        "pattern": pattern,
1853        "mode": format!("{:?}", mode).to_lowercase()
1854    })))
1855}
1856
1857/// Toggle a group's migration mode
1858async fn toggle_group_migration(
1859    State(state): State<ManagementState>,
1860    Path(group): Path<String>,
1861) -> Result<Json<serde_json::Value>, StatusCode> {
1862    let proxy_config = match &state.proxy_config {
1863        Some(config) => config,
1864        None => {
1865            return Ok(Json(serde_json::json!({
1866                "error": "Migration not configured. Proxy config not available."
1867            })));
1868        }
1869    };
1870
1871    let mut config = proxy_config.write().await;
1872    let new_mode = config.toggle_group_migration(&group);
1873
1874    Ok(Json(serde_json::json!({
1875        "group": group,
1876        "mode": format!("{:?}", new_mode).to_lowercase()
1877    })))
1878}
1879
1880/// Set a group's migration mode explicitly
1881async fn set_group_migration_mode(
1882    State(state): State<ManagementState>,
1883    Path(group): Path<String>,
1884    Json(request): Json<SetMigrationModeRequest>,
1885) -> Result<Json<serde_json::Value>, StatusCode> {
1886    let proxy_config = match &state.proxy_config {
1887        Some(config) => config,
1888        None => {
1889            return Ok(Json(serde_json::json!({
1890                "error": "Migration not configured. Proxy config not available."
1891            })));
1892        }
1893    };
1894
1895    use mockforge_core::proxy::config::MigrationMode;
1896    let mode = match request.mode.to_lowercase().as_str() {
1897        "mock" => MigrationMode::Mock,
1898        "shadow" => MigrationMode::Shadow,
1899        "real" => MigrationMode::Real,
1900        "auto" => MigrationMode::Auto,
1901        _ => {
1902            return Ok(Json(serde_json::json!({
1903                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1904            })));
1905        }
1906    };
1907
1908    let mut config = proxy_config.write().await;
1909    config.update_group_migration_mode(&group, mode);
1910
1911    Ok(Json(serde_json::json!({
1912        "group": group,
1913        "mode": format!("{:?}", mode).to_lowercase()
1914    })))
1915}
1916
1917/// Get all migration groups
1918async fn get_migration_groups(
1919    State(state): State<ManagementState>,
1920) -> Result<Json<serde_json::Value>, StatusCode> {
1921    let proxy_config = match &state.proxy_config {
1922        Some(config) => config,
1923        None => {
1924            return Ok(Json(serde_json::json!({
1925                "error": "Migration not configured. Proxy config not available."
1926            })));
1927        }
1928    };
1929
1930    let config = proxy_config.read().await;
1931    let groups = config.get_migration_groups();
1932
1933    // Convert to JSON-serializable format
1934    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1935        .into_iter()
1936        .map(|(name, info)| {
1937            (
1938                name,
1939                serde_json::json!({
1940                    "name": info.name,
1941                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1942                    "route_count": info.route_count
1943                }),
1944            )
1945        })
1946        .collect();
1947
1948    Ok(Json(serde_json::json!(groups_json)))
1949}
1950
1951/// Get overall migration status
1952async fn get_migration_status(
1953    State(state): State<ManagementState>,
1954) -> Result<Json<serde_json::Value>, StatusCode> {
1955    let proxy_config = match &state.proxy_config {
1956        Some(config) => config,
1957        None => {
1958            return Ok(Json(serde_json::json!({
1959                "error": "Migration not configured. Proxy config not available."
1960            })));
1961        }
1962    };
1963
1964    let config = proxy_config.read().await;
1965    let routes = config.get_migration_routes();
1966    let groups = config.get_migration_groups();
1967
1968    let mut mock_count = 0;
1969    let mut shadow_count = 0;
1970    let mut real_count = 0;
1971    let mut auto_count = 0;
1972
1973    for route in &routes {
1974        match route.migration_mode {
1975            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1976            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1977            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1978            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1979        }
1980    }
1981
1982    Ok(Json(serde_json::json!({
1983        "total_routes": routes.len(),
1984        "mock_routes": mock_count,
1985        "shadow_routes": shadow_count,
1986        "real_routes": real_count,
1987        "auto_routes": auto_count,
1988        "total_groups": groups.len(),
1989        "migration_enabled": config.migration_enabled
1990    })))
1991}
1992
1993// ========== Proxy Replacement Rules Management ==========
1994
1995/// Request body for creating/updating proxy replacement rules
1996#[derive(Debug, Deserialize, Serialize)]
1997pub struct ProxyRuleRequest {
1998    /// URL pattern to match (supports wildcards like "/api/users/*")
1999    pub pattern: String,
2000    /// Rule type: "request" or "response"
2001    #[serde(rename = "type")]
2002    pub rule_type: String,
2003    /// Optional status code filter for response rules
2004    #[serde(default)]
2005    pub status_codes: Vec<u16>,
2006    /// Body transformations to apply
2007    pub body_transforms: Vec<BodyTransformRequest>,
2008    /// Whether this rule is enabled
2009    #[serde(default = "default_true")]
2010    pub enabled: bool,
2011}
2012
2013/// Request body for individual body transformations
2014#[derive(Debug, Deserialize, Serialize)]
2015pub struct BodyTransformRequest {
2016    /// JSONPath expression to target (e.g., "$.userId", "$.email")
2017    pub path: String,
2018    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
2019    pub replace: String,
2020    /// Operation to perform: "replace", "add", or "remove"
2021    #[serde(default)]
2022    pub operation: String,
2023}
2024
2025/// Response format for proxy rules
2026#[derive(Debug, Serialize)]
2027pub struct ProxyRuleResponse {
2028    /// Rule ID (index in the array)
2029    pub id: usize,
2030    /// URL pattern
2031    pub pattern: String,
2032    /// Rule type
2033    #[serde(rename = "type")]
2034    pub rule_type: String,
2035    /// Status codes (for response rules)
2036    pub status_codes: Vec<u16>,
2037    /// Body transformations
2038    pub body_transforms: Vec<BodyTransformRequest>,
2039    /// Whether enabled
2040    pub enabled: bool,
2041}
2042
2043/// List all proxy replacement rules
2044async fn list_proxy_rules(
2045    State(state): State<ManagementState>,
2046) -> Result<Json<serde_json::Value>, StatusCode> {
2047    let proxy_config = match &state.proxy_config {
2048        Some(config) => config,
2049        None => {
2050            return Ok(Json(serde_json::json!({
2051                "error": "Proxy not configured. Proxy config not available."
2052            })));
2053        }
2054    };
2055
2056    let config = proxy_config.read().await;
2057
2058    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2059
2060    // Add request replacement rules
2061    for (idx, rule) in config.request_replacements.iter().enumerate() {
2062        rules.push(ProxyRuleResponse {
2063            id: idx,
2064            pattern: rule.pattern.clone(),
2065            rule_type: "request".to_string(),
2066            status_codes: Vec::new(),
2067            body_transforms: rule
2068                .body_transforms
2069                .iter()
2070                .map(|t| BodyTransformRequest {
2071                    path: t.path.clone(),
2072                    replace: t.replace.clone(),
2073                    operation: format!("{:?}", t.operation).to_lowercase(),
2074                })
2075                .collect(),
2076            enabled: rule.enabled,
2077        });
2078    }
2079
2080    // Add response replacement rules
2081    let request_count = config.request_replacements.len();
2082    for (idx, rule) in config.response_replacements.iter().enumerate() {
2083        rules.push(ProxyRuleResponse {
2084            id: request_count + idx,
2085            pattern: rule.pattern.clone(),
2086            rule_type: "response".to_string(),
2087            status_codes: rule.status_codes.clone(),
2088            body_transforms: rule
2089                .body_transforms
2090                .iter()
2091                .map(|t| BodyTransformRequest {
2092                    path: t.path.clone(),
2093                    replace: t.replace.clone(),
2094                    operation: format!("{:?}", t.operation).to_lowercase(),
2095                })
2096                .collect(),
2097            enabled: rule.enabled,
2098        });
2099    }
2100
2101    Ok(Json(serde_json::json!({
2102        "rules": rules
2103    })))
2104}
2105
2106/// Create a new proxy replacement rule
2107async fn create_proxy_rule(
2108    State(state): State<ManagementState>,
2109    Json(request): Json<ProxyRuleRequest>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111    let proxy_config = match &state.proxy_config {
2112        Some(config) => config,
2113        None => {
2114            return Ok(Json(serde_json::json!({
2115                "error": "Proxy not configured. Proxy config not available."
2116            })));
2117        }
2118    };
2119
2120    // Validate request
2121    if request.body_transforms.is_empty() {
2122        return Ok(Json(serde_json::json!({
2123            "error": "At least one body transform is required"
2124        })));
2125    }
2126
2127    let body_transforms: Vec<BodyTransform> = request
2128        .body_transforms
2129        .iter()
2130        .map(|t| {
2131            let op = match t.operation.as_str() {
2132                "replace" => TransformOperation::Replace,
2133                "add" => TransformOperation::Add,
2134                "remove" => TransformOperation::Remove,
2135                _ => TransformOperation::Replace,
2136            };
2137            BodyTransform {
2138                path: t.path.clone(),
2139                replace: t.replace.clone(),
2140                operation: op,
2141            }
2142        })
2143        .collect();
2144
2145    let new_rule = BodyTransformRule {
2146        pattern: request.pattern.clone(),
2147        status_codes: request.status_codes.clone(),
2148        body_transforms,
2149        enabled: request.enabled,
2150    };
2151
2152    let mut config = proxy_config.write().await;
2153
2154    let rule_id = if request.rule_type == "request" {
2155        config.request_replacements.push(new_rule);
2156        config.request_replacements.len() - 1
2157    } else if request.rule_type == "response" {
2158        config.response_replacements.push(new_rule);
2159        config.request_replacements.len() + config.response_replacements.len() - 1
2160    } else {
2161        return Ok(Json(serde_json::json!({
2162            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2163        })));
2164    };
2165
2166    Ok(Json(serde_json::json!({
2167        "id": rule_id,
2168        "message": "Rule created successfully"
2169    })))
2170}
2171
2172/// Get a specific proxy replacement rule
2173async fn get_proxy_rule(
2174    State(state): State<ManagementState>,
2175    Path(id): Path<String>,
2176) -> Result<Json<serde_json::Value>, StatusCode> {
2177    let proxy_config = match &state.proxy_config {
2178        Some(config) => config,
2179        None => {
2180            return Ok(Json(serde_json::json!({
2181                "error": "Proxy not configured. Proxy config not available."
2182            })));
2183        }
2184    };
2185
2186    let config = proxy_config.read().await;
2187    let rule_id: usize = match id.parse() {
2188        Ok(id) => id,
2189        Err(_) => {
2190            return Ok(Json(serde_json::json!({
2191                "error": format!("Invalid rule ID: {}", id)
2192            })));
2193        }
2194    };
2195
2196    let request_count = config.request_replacements.len();
2197
2198    if rule_id < request_count {
2199        // Request rule
2200        let rule = &config.request_replacements[rule_id];
2201        Ok(Json(serde_json::json!({
2202            "id": rule_id,
2203            "pattern": rule.pattern,
2204            "type": "request",
2205            "status_codes": [],
2206            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2207                "path": t.path,
2208                "replace": t.replace,
2209                "operation": format!("{:?}", t.operation).to_lowercase()
2210            })).collect::<Vec<_>>(),
2211            "enabled": rule.enabled
2212        })))
2213    } else if rule_id < request_count + config.response_replacements.len() {
2214        // Response rule
2215        let response_idx = rule_id - request_count;
2216        let rule = &config.response_replacements[response_idx];
2217        Ok(Json(serde_json::json!({
2218            "id": rule_id,
2219            "pattern": rule.pattern,
2220            "type": "response",
2221            "status_codes": rule.status_codes,
2222            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2223                "path": t.path,
2224                "replace": t.replace,
2225                "operation": format!("{:?}", t.operation).to_lowercase()
2226            })).collect::<Vec<_>>(),
2227            "enabled": rule.enabled
2228        })))
2229    } else {
2230        Ok(Json(serde_json::json!({
2231            "error": format!("Rule ID {} not found", rule_id)
2232        })))
2233    }
2234}
2235
2236/// Update a proxy replacement rule
2237async fn update_proxy_rule(
2238    State(state): State<ManagementState>,
2239    Path(id): Path<String>,
2240    Json(request): Json<ProxyRuleRequest>,
2241) -> Result<Json<serde_json::Value>, StatusCode> {
2242    let proxy_config = match &state.proxy_config {
2243        Some(config) => config,
2244        None => {
2245            return Ok(Json(serde_json::json!({
2246                "error": "Proxy not configured. Proxy config not available."
2247            })));
2248        }
2249    };
2250
2251    let mut config = proxy_config.write().await;
2252    let rule_id: usize = match id.parse() {
2253        Ok(id) => id,
2254        Err(_) => {
2255            return Ok(Json(serde_json::json!({
2256                "error": format!("Invalid rule ID: {}", id)
2257            })));
2258        }
2259    };
2260
2261    let body_transforms: Vec<BodyTransform> = request
2262        .body_transforms
2263        .iter()
2264        .map(|t| {
2265            let op = match t.operation.as_str() {
2266                "replace" => TransformOperation::Replace,
2267                "add" => TransformOperation::Add,
2268                "remove" => TransformOperation::Remove,
2269                _ => TransformOperation::Replace,
2270            };
2271            BodyTransform {
2272                path: t.path.clone(),
2273                replace: t.replace.clone(),
2274                operation: op,
2275            }
2276        })
2277        .collect();
2278
2279    let updated_rule = BodyTransformRule {
2280        pattern: request.pattern.clone(),
2281        status_codes: request.status_codes.clone(),
2282        body_transforms,
2283        enabled: request.enabled,
2284    };
2285
2286    let request_count = config.request_replacements.len();
2287
2288    if rule_id < request_count {
2289        // Update request rule
2290        config.request_replacements[rule_id] = updated_rule;
2291    } else if rule_id < request_count + config.response_replacements.len() {
2292        // Update response rule
2293        let response_idx = rule_id - request_count;
2294        config.response_replacements[response_idx] = updated_rule;
2295    } else {
2296        return Ok(Json(serde_json::json!({
2297            "error": format!("Rule ID {} not found", rule_id)
2298        })));
2299    }
2300
2301    Ok(Json(serde_json::json!({
2302        "id": rule_id,
2303        "message": "Rule updated successfully"
2304    })))
2305}
2306
2307/// Delete a proxy replacement rule
2308async fn delete_proxy_rule(
2309    State(state): State<ManagementState>,
2310    Path(id): Path<String>,
2311) -> Result<Json<serde_json::Value>, StatusCode> {
2312    let proxy_config = match &state.proxy_config {
2313        Some(config) => config,
2314        None => {
2315            return Ok(Json(serde_json::json!({
2316                "error": "Proxy not configured. Proxy config not available."
2317            })));
2318        }
2319    };
2320
2321    let mut config = proxy_config.write().await;
2322    let rule_id: usize = match id.parse() {
2323        Ok(id) => id,
2324        Err(_) => {
2325            return Ok(Json(serde_json::json!({
2326                "error": format!("Invalid rule ID: {}", id)
2327            })));
2328        }
2329    };
2330
2331    let request_count = config.request_replacements.len();
2332
2333    if rule_id < request_count {
2334        // Delete request rule
2335        config.request_replacements.remove(rule_id);
2336    } else if rule_id < request_count + config.response_replacements.len() {
2337        // Delete response rule
2338        let response_idx = rule_id - request_count;
2339        config.response_replacements.remove(response_idx);
2340    } else {
2341        return Ok(Json(serde_json::json!({
2342            "error": format!("Rule ID {} not found", rule_id)
2343        })));
2344    }
2345
2346    Ok(Json(serde_json::json!({
2347        "id": rule_id,
2348        "message": "Rule deleted successfully"
2349    })))
2350}
2351
2352/// Get proxy rules and transformation configuration for inspection
2353async fn get_proxy_inspect(
2354    State(state): State<ManagementState>,
2355    Query(params): Query<std::collections::HashMap<String, String>>,
2356) -> Result<Json<serde_json::Value>, StatusCode> {
2357    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2358    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2359
2360    let proxy_config = match &state.proxy_config {
2361        Some(config) => config.read().await,
2362        None => {
2363            return Ok(Json(serde_json::json!({
2364                "error": "Proxy not configured. Proxy config not available."
2365            })));
2366        }
2367    };
2368
2369    let mut rules = Vec::new();
2370    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2371        rules.push(serde_json::json!({
2372            "id": idx,
2373            "kind": "request",
2374            "pattern": rule.pattern,
2375            "enabled": rule.enabled,
2376            "status_codes": rule.status_codes,
2377            "transform_count": rule.body_transforms.len(),
2378            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2379                "path": t.path,
2380                "operation": t.operation,
2381                "replace": t.replace
2382            })).collect::<Vec<_>>()
2383        }));
2384    }
2385    let request_rule_count = rules.len();
2386    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2387        rules.push(serde_json::json!({
2388            "id": request_rule_count + idx,
2389            "kind": "response",
2390            "pattern": rule.pattern,
2391            "enabled": rule.enabled,
2392            "status_codes": rule.status_codes,
2393            "transform_count": rule.body_transforms.len(),
2394            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2395                "path": t.path,
2396                "operation": t.operation,
2397                "replace": t.replace
2398            })).collect::<Vec<_>>()
2399        }));
2400    }
2401
2402    let total = rules.len();
2403    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2404
2405    Ok(Json(serde_json::json!({
2406        "enabled": proxy_config.enabled,
2407        "target_url": proxy_config.target_url,
2408        "prefix": proxy_config.prefix,
2409        "timeout_seconds": proxy_config.timeout_seconds,
2410        "follow_redirects": proxy_config.follow_redirects,
2411        "passthrough_by_default": proxy_config.passthrough_by_default,
2412        "rules": paged_rules,
2413        "request_rule_count": request_rule_count,
2414        "response_rule_count": total.saturating_sub(request_rule_count),
2415        "limit": limit,
2416        "offset": offset,
2417        "total": total
2418    })))
2419}
2420
2421/// Build the management API router
2422pub fn management_router(state: ManagementState) -> Router {
2423    let router = Router::new()
2424        .route("/health", get(health_check))
2425        .route("/stats", get(get_stats))
2426        .route("/config", get(get_config))
2427        .route("/config/validate", post(validate_config))
2428        .route("/config/bulk", post(bulk_update_config))
2429        .route("/mocks", get(list_mocks))
2430        .route("/mocks", post(create_mock))
2431        .route("/mocks/{id}", get(get_mock))
2432        .route("/mocks/{id}", put(update_mock))
2433        .route("/mocks/{id}", delete(delete_mock))
2434        .route("/export", get(export_mocks))
2435        .route("/import", post(import_mocks));
2436
2437    #[cfg(feature = "smtp")]
2438    let router = router
2439        .route("/smtp/mailbox", get(list_smtp_emails))
2440        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2441        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2442        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2443        .route("/smtp/mailbox/search", get(search_smtp_emails));
2444
2445    #[cfg(not(feature = "smtp"))]
2446    let router = router;
2447
2448    // MQTT routes
2449    #[cfg(feature = "mqtt")]
2450    let router = router
2451        .route("/mqtt/stats", get(get_mqtt_stats))
2452        .route("/mqtt/clients", get(get_mqtt_clients))
2453        .route("/mqtt/topics", get(get_mqtt_topics))
2454        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2455        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2456        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2457        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2458
2459    #[cfg(not(feature = "mqtt"))]
2460    let router = router
2461        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2462        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2463
2464    #[cfg(feature = "kafka")]
2465    let router = router
2466        .route("/kafka/stats", get(get_kafka_stats))
2467        .route("/kafka/topics", get(get_kafka_topics))
2468        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2469        .route("/kafka/groups", get(get_kafka_groups))
2470        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2471        .route("/kafka/produce", post(produce_kafka_message))
2472        .route("/kafka/produce/batch", post(produce_kafka_batch))
2473        .route("/kafka/messages/stream", get(kafka_messages_stream));
2474
2475    #[cfg(not(feature = "kafka"))]
2476    let router = router;
2477
2478    // Migration pipeline routes
2479    let router = router
2480        .route("/migration/routes", get(get_migration_routes))
2481        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2482        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2483        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2484        .route("/migration/groups/{group}", put(set_group_migration_mode))
2485        .route("/migration/groups", get(get_migration_groups))
2486        .route("/migration/status", get(get_migration_status));
2487
2488    // Proxy replacement rules routes
2489    let router = router
2490        .route("/proxy/rules", get(list_proxy_rules))
2491        .route("/proxy/rules", post(create_proxy_rule))
2492        .route("/proxy/rules/{id}", get(get_proxy_rule))
2493        .route("/proxy/rules/{id}", put(update_proxy_rule))
2494        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2495        .route("/proxy/inspect", get(get_proxy_inspect));
2496
2497    // AI-powered features
2498    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2499
2500    // Snapshot diff endpoints
2501    let router = router.nest(
2502        "/snapshot-diff",
2503        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2504    );
2505
2506    #[cfg(feature = "behavioral-cloning")]
2507    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2508
2509    let router = router
2510        .route("/mockai/learn", post(learn_from_examples))
2511        .route("/mockai/rules/explanations", get(list_rule_explanations))
2512        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2513        .route("/chaos/config", get(get_chaos_config))
2514        .route("/chaos/config", post(update_chaos_config))
2515        .route("/network/profiles", get(list_network_profiles))
2516        .route("/network/profile/apply", post(apply_network_profile));
2517
2518    // State machine API routes
2519    let router =
2520        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2521
2522    // Conformance testing API routes
2523    #[cfg(feature = "conformance")]
2524    let router = router.nest_service(
2525        "/conformance",
2526        crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2527    );
2528    #[cfg(not(feature = "conformance"))]
2529    let router = router;
2530
2531    router.with_state(state)
2532}
2533
2534#[cfg(feature = "kafka")]
2535/// Kafka broker statistics
2536#[derive(Debug, Clone, Serialize, Deserialize)]
2537pub struct KafkaBrokerStats {
2538    /// Number of topics
2539    pub topics: usize,
2540    /// Total number of partitions
2541    pub partitions: usize,
2542    /// Number of consumer groups
2543    pub consumer_groups: usize,
2544    /// Total messages produced
2545    pub messages_produced: u64,
2546    /// Total messages consumed
2547    pub messages_consumed: u64,
2548}
2549
2550#[cfg(feature = "kafka")]
2551#[allow(missing_docs)]
2552#[derive(Debug, Clone, Serialize, Deserialize)]
2553pub struct KafkaTopicInfo {
2554    pub name: String,
2555    pub partitions: usize,
2556    pub replication_factor: i32,
2557}
2558
2559#[cfg(feature = "kafka")]
2560#[allow(missing_docs)]
2561#[derive(Debug, Clone, Serialize, Deserialize)]
2562pub struct KafkaConsumerGroupInfo {
2563    pub group_id: String,
2564    pub members: usize,
2565    pub state: String,
2566}
2567
2568#[cfg(feature = "kafka")]
2569/// Get Kafka broker statistics
2570async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2571    if let Some(broker) = &state.kafka_broker {
2572        let topics = broker.topics.read().await;
2573        let consumer_groups = broker.consumer_groups.read().await;
2574
2575        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2576
2577        // Get metrics snapshot for message counts
2578        let metrics_snapshot = broker.metrics().snapshot();
2579
2580        let stats = KafkaBrokerStats {
2581            topics: topics.len(),
2582            partitions: total_partitions,
2583            consumer_groups: consumer_groups.groups().len(),
2584            messages_produced: metrics_snapshot.messages_produced_total,
2585            messages_consumed: metrics_snapshot.messages_consumed_total,
2586        };
2587
2588        Json(stats).into_response()
2589    } else {
2590        (
2591            StatusCode::SERVICE_UNAVAILABLE,
2592            Json(serde_json::json!({
2593                "error": "Kafka broker not available",
2594                "message": "Kafka broker is not enabled or not available."
2595            })),
2596        )
2597            .into_response()
2598    }
2599}
2600
2601#[cfg(feature = "kafka")]
2602/// List Kafka topics
2603async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2604    if let Some(broker) = &state.kafka_broker {
2605        let topics = broker.topics.read().await;
2606        let topic_list: Vec<KafkaTopicInfo> = topics
2607            .iter()
2608            .map(|(name, topic)| KafkaTopicInfo {
2609                name: name.clone(),
2610                partitions: topic.partitions.len(),
2611                replication_factor: topic.config.replication_factor as i32,
2612            })
2613            .collect();
2614
2615        Json(serde_json::json!({
2616            "topics": topic_list
2617        }))
2618        .into_response()
2619    } else {
2620        (
2621            StatusCode::SERVICE_UNAVAILABLE,
2622            Json(serde_json::json!({
2623                "error": "Kafka broker not available",
2624                "message": "Kafka broker is not enabled or not available."
2625            })),
2626        )
2627            .into_response()
2628    }
2629}
2630
2631#[cfg(feature = "kafka")]
2632/// Get Kafka topic details
2633async fn get_kafka_topic(
2634    State(state): State<ManagementState>,
2635    Path(topic_name): Path<String>,
2636) -> impl IntoResponse {
2637    if let Some(broker) = &state.kafka_broker {
2638        let topics = broker.topics.read().await;
2639        if let Some(topic) = topics.get(&topic_name) {
2640            Json(serde_json::json!({
2641                "name": topic_name,
2642                "partitions": topic.partitions.len(),
2643                "replication_factor": topic.config.replication_factor,
2644                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2645                    "id": idx as i32,
2646                    "leader": 0,
2647                    "replicas": vec![0],
2648                    "message_count": partition.messages.len()
2649                })).collect::<Vec<_>>()
2650            })).into_response()
2651        } else {
2652            (
2653                StatusCode::NOT_FOUND,
2654                Json(serde_json::json!({
2655                    "error": "Topic not found",
2656                    "topic": topic_name
2657                })),
2658            )
2659                .into_response()
2660        }
2661    } else {
2662        (
2663            StatusCode::SERVICE_UNAVAILABLE,
2664            Json(serde_json::json!({
2665                "error": "Kafka broker not available",
2666                "message": "Kafka broker is not enabled or not available."
2667            })),
2668        )
2669            .into_response()
2670    }
2671}
2672
2673#[cfg(feature = "kafka")]
2674/// List Kafka consumer groups
2675async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2676    if let Some(broker) = &state.kafka_broker {
2677        let consumer_groups = broker.consumer_groups.read().await;
2678        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2679            .groups()
2680            .iter()
2681            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2682                group_id: group_id.clone(),
2683                members: group.members.len(),
2684                state: "Stable".to_string(), // Simplified - could be more detailed
2685            })
2686            .collect();
2687
2688        Json(serde_json::json!({
2689            "groups": groups
2690        }))
2691        .into_response()
2692    } else {
2693        (
2694            StatusCode::SERVICE_UNAVAILABLE,
2695            Json(serde_json::json!({
2696                "error": "Kafka broker not available",
2697                "message": "Kafka broker is not enabled or not available."
2698            })),
2699        )
2700            .into_response()
2701    }
2702}
2703
2704#[cfg(feature = "kafka")]
2705/// Get Kafka consumer group details
2706async fn get_kafka_group(
2707    State(state): State<ManagementState>,
2708    Path(group_id): Path<String>,
2709) -> impl IntoResponse {
2710    if let Some(broker) = &state.kafka_broker {
2711        let consumer_groups = broker.consumer_groups.read().await;
2712        if let Some(group) = consumer_groups.groups().get(&group_id) {
2713            Json(serde_json::json!({
2714                "group_id": group_id,
2715                "members": group.members.len(),
2716                "state": "Stable",
2717                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2718                    "member_id": member_id,
2719                    "client_id": member.client_id,
2720                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2721                        "topic": a.topic,
2722                        "partitions": a.partitions
2723                    })).collect::<Vec<_>>()
2724                })).collect::<Vec<_>>(),
2725                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2726                    "topic": topic,
2727                    "partition": partition,
2728                    "offset": offset
2729                })).collect::<Vec<_>>()
2730            })).into_response()
2731        } else {
2732            (
2733                StatusCode::NOT_FOUND,
2734                Json(serde_json::json!({
2735                    "error": "Consumer group not found",
2736                    "group_id": group_id
2737                })),
2738            )
2739                .into_response()
2740        }
2741    } else {
2742        (
2743            StatusCode::SERVICE_UNAVAILABLE,
2744            Json(serde_json::json!({
2745                "error": "Kafka broker not available",
2746                "message": "Kafka broker is not enabled or not available."
2747            })),
2748        )
2749            .into_response()
2750    }
2751}
2752
2753// ========== Kafka Produce Handler ==========
2754
2755#[cfg(feature = "kafka")]
2756/// Request body for producing a Kafka message
2757#[derive(Debug, Deserialize)]
2758pub struct KafkaProduceRequest {
2759    /// Topic to produce to
2760    pub topic: String,
2761    /// Message key (optional)
2762    #[serde(default)]
2763    pub key: Option<String>,
2764    /// Message value (JSON string or plain string)
2765    pub value: String,
2766    /// Partition ID (optional, auto-assigned if not provided)
2767    #[serde(default)]
2768    pub partition: Option<i32>,
2769    /// Message headers (optional, key-value pairs)
2770    #[serde(default)]
2771    pub headers: Option<std::collections::HashMap<String, String>>,
2772}
2773
2774#[cfg(feature = "kafka")]
2775/// Produce a message to a Kafka topic
2776async fn produce_kafka_message(
2777    State(state): State<ManagementState>,
2778    Json(request): Json<KafkaProduceRequest>,
2779) -> impl IntoResponse {
2780    if let Some(broker) = &state.kafka_broker {
2781        let mut topics = broker.topics.write().await;
2782
2783        // Get or create the topic
2784        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2785            mockforge_kafka::topics::Topic::new(
2786                request.topic.clone(),
2787                mockforge_kafka::topics::TopicConfig::default(),
2788            )
2789        });
2790
2791        // Determine partition
2792        let partition_id = if let Some(partition) = request.partition {
2793            partition
2794        } else {
2795            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2796        };
2797
2798        // Validate partition exists
2799        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2800            return (
2801                StatusCode::BAD_REQUEST,
2802                Json(serde_json::json!({
2803                    "error": "Invalid partition",
2804                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2805                })),
2806            )
2807                .into_response();
2808        }
2809
2810        // Create the message
2811        let key_clone = request.key.clone();
2812        let headers_clone = request.headers.clone();
2813        let message = mockforge_kafka::partitions::KafkaMessage {
2814            offset: 0, // Will be set by partition.append
2815            timestamp: chrono::Utc::now().timestamp_millis(),
2816            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2817            value: request.value.as_bytes().to_vec(),
2818            headers: headers_clone
2819                .clone()
2820                .unwrap_or_default()
2821                .into_iter()
2822                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2823                .collect(),
2824        };
2825
2826        // Produce to partition
2827        match topic_entry.produce(partition_id, message).await {
2828            Ok(offset) => {
2829                // Record metrics for successful message production
2830                if let Some(broker) = &state.kafka_broker {
2831                    broker.metrics().record_messages_produced(1);
2832                }
2833
2834                // Emit message event for real-time monitoring
2835                #[cfg(feature = "kafka")]
2836                {
2837                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2838                        topic: request.topic.clone(),
2839                        key: key_clone,
2840                        value: request.value.clone(),
2841                        partition: partition_id,
2842                        offset,
2843                        headers: headers_clone,
2844                        timestamp: chrono::Utc::now().to_rfc3339(),
2845                    });
2846                    let _ = state.message_events.send(event);
2847                }
2848
2849                Json(serde_json::json!({
2850                    "success": true,
2851                    "message": format!("Message produced to topic '{}'", request.topic),
2852                    "topic": request.topic,
2853                    "partition": partition_id,
2854                    "offset": offset
2855                }))
2856                .into_response()
2857            }
2858            Err(e) => (
2859                StatusCode::INTERNAL_SERVER_ERROR,
2860                Json(serde_json::json!({
2861                    "error": "Failed to produce message",
2862                    "message": e.to_string()
2863                })),
2864            )
2865                .into_response(),
2866        }
2867    } else {
2868        (
2869            StatusCode::SERVICE_UNAVAILABLE,
2870            Json(serde_json::json!({
2871                "error": "Kafka broker not available",
2872                "message": "Kafka broker is not enabled or not available."
2873            })),
2874        )
2875            .into_response()
2876    }
2877}
2878
2879#[cfg(feature = "kafka")]
2880/// Request body for producing a batch of Kafka messages
2881#[derive(Debug, Deserialize)]
2882pub struct KafkaBatchProduceRequest {
2883    /// List of messages to produce
2884    pub messages: Vec<KafkaProduceRequest>,
2885    /// Delay between messages in milliseconds
2886    #[serde(default = "default_delay")]
2887    pub delay_ms: u64,
2888}
2889
2890#[cfg(feature = "kafka")]
2891/// Produce multiple messages to Kafka topics
2892async fn produce_kafka_batch(
2893    State(state): State<ManagementState>,
2894    Json(request): Json<KafkaBatchProduceRequest>,
2895) -> impl IntoResponse {
2896    if let Some(broker) = &state.kafka_broker {
2897        if request.messages.is_empty() {
2898            return (
2899                StatusCode::BAD_REQUEST,
2900                Json(serde_json::json!({
2901                    "error": "Empty batch",
2902                    "message": "At least one message is required"
2903                })),
2904            )
2905                .into_response();
2906        }
2907
2908        let mut results = Vec::new();
2909
2910        for (index, msg_request) in request.messages.iter().enumerate() {
2911            let mut topics = broker.topics.write().await;
2912
2913            // Get or create the topic
2914            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2915                mockforge_kafka::topics::Topic::new(
2916                    msg_request.topic.clone(),
2917                    mockforge_kafka::topics::TopicConfig::default(),
2918                )
2919            });
2920
2921            // Determine partition
2922            let partition_id = if let Some(partition) = msg_request.partition {
2923                partition
2924            } else {
2925                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2926            };
2927
2928            // Validate partition exists
2929            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2930                results.push(serde_json::json!({
2931                    "index": index,
2932                    "success": false,
2933                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2934                }));
2935                continue;
2936            }
2937
2938            // Create the message
2939            let message = mockforge_kafka::partitions::KafkaMessage {
2940                offset: 0,
2941                timestamp: chrono::Utc::now().timestamp_millis(),
2942                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2943                value: msg_request.value.as_bytes().to_vec(),
2944                headers: msg_request
2945                    .headers
2946                    .clone()
2947                    .unwrap_or_default()
2948                    .into_iter()
2949                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2950                    .collect(),
2951            };
2952
2953            // Produce to partition
2954            match topic_entry.produce(partition_id, message).await {
2955                Ok(offset) => {
2956                    // Record metrics for successful message production
2957                    if let Some(broker) = &state.kafka_broker {
2958                        broker.metrics().record_messages_produced(1);
2959                    }
2960
2961                    // Emit message event
2962                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2963                        topic: msg_request.topic.clone(),
2964                        key: msg_request.key.clone(),
2965                        value: msg_request.value.clone(),
2966                        partition: partition_id,
2967                        offset,
2968                        headers: msg_request.headers.clone(),
2969                        timestamp: chrono::Utc::now().to_rfc3339(),
2970                    });
2971                    let _ = state.message_events.send(event);
2972
2973                    results.push(serde_json::json!({
2974                        "index": index,
2975                        "success": true,
2976                        "topic": msg_request.topic,
2977                        "partition": partition_id,
2978                        "offset": offset
2979                    }));
2980                }
2981                Err(e) => {
2982                    results.push(serde_json::json!({
2983                        "index": index,
2984                        "success": false,
2985                        "error": e.to_string()
2986                    }));
2987                }
2988            }
2989
2990            // Add delay between messages (except for the last one)
2991            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2992                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2993            }
2994        }
2995
2996        let success_count =
2997            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2998
2999        Json(serde_json::json!({
3000            "success": true,
3001            "total": request.messages.len(),
3002            "succeeded": success_count,
3003            "failed": request.messages.len() - success_count,
3004            "results": results
3005        }))
3006        .into_response()
3007    } else {
3008        (
3009            StatusCode::SERVICE_UNAVAILABLE,
3010            Json(serde_json::json!({
3011                "error": "Kafka broker not available",
3012                "message": "Kafka broker is not enabled or not available."
3013            })),
3014        )
3015            .into_response()
3016    }
3017}
3018
3019// ========== Real-time Message Streaming (SSE) ==========
3020
3021#[cfg(feature = "mqtt")]
3022/// SSE stream for MQTT messages
3023async fn mqtt_messages_stream(
3024    State(state): State<ManagementState>,
3025    Query(params): Query<std::collections::HashMap<String, String>>,
3026) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3027    let rx = state.message_events.subscribe();
3028    let topic_filter = params.get("topic").cloned();
3029
3030    let stream = stream::unfold(rx, move |mut rx| {
3031        let topic_filter = topic_filter.clone();
3032
3033        async move {
3034            loop {
3035                match rx.recv().await {
3036                    Ok(MessageEvent::Mqtt(event)) => {
3037                        // Apply topic filter if specified
3038                        if let Some(filter) = &topic_filter {
3039                            if !event.topic.contains(filter) {
3040                                continue;
3041                            }
3042                        }
3043
3044                        let event_json = serde_json::json!({
3045                            "protocol": "mqtt",
3046                            "topic": event.topic,
3047                            "payload": event.payload,
3048                            "qos": event.qos,
3049                            "retain": event.retain,
3050                            "timestamp": event.timestamp,
3051                        });
3052
3053                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3054                            let sse_event = Event::default().event("mqtt_message").data(event_data);
3055                            return Some((Ok(sse_event), rx));
3056                        }
3057                    }
3058                    #[cfg(feature = "kafka")]
3059                    Ok(MessageEvent::Kafka(_)) => {
3060                        // Skip Kafka events in MQTT stream
3061                        continue;
3062                    }
3063                    Err(broadcast::error::RecvError::Closed) => {
3064                        return None;
3065                    }
3066                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3067                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
3068                        continue;
3069                    }
3070                }
3071            }
3072        }
3073    });
3074
3075    Sse::new(stream).keep_alive(
3076        axum::response::sse::KeepAlive::new()
3077            .interval(std::time::Duration::from_secs(15))
3078            .text("keep-alive-text"),
3079    )
3080}
3081
3082#[cfg(feature = "kafka")]
3083/// SSE stream for Kafka messages
3084async fn kafka_messages_stream(
3085    State(state): State<ManagementState>,
3086    Query(params): Query<std::collections::HashMap<String, String>>,
3087) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3088    let rx = state.message_events.subscribe();
3089    let topic_filter = params.get("topic").cloned();
3090
3091    let stream = stream::unfold(rx, move |mut rx| {
3092        let topic_filter = topic_filter.clone();
3093
3094        async move {
3095            loop {
3096                match rx.recv().await {
3097                    #[cfg(feature = "mqtt")]
3098                    Ok(MessageEvent::Mqtt(_)) => {
3099                        // Skip MQTT events in Kafka stream
3100                        continue;
3101                    }
3102                    Ok(MessageEvent::Kafka(event)) => {
3103                        // Apply topic filter if specified
3104                        if let Some(filter) = &topic_filter {
3105                            if !event.topic.contains(filter) {
3106                                continue;
3107                            }
3108                        }
3109
3110                        let event_json = serde_json::json!({
3111                            "protocol": "kafka",
3112                            "topic": event.topic,
3113                            "key": event.key,
3114                            "value": event.value,
3115                            "partition": event.partition,
3116                            "offset": event.offset,
3117                            "headers": event.headers,
3118                            "timestamp": event.timestamp,
3119                        });
3120
3121                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3122                            let sse_event =
3123                                Event::default().event("kafka_message").data(event_data);
3124                            return Some((Ok(sse_event), rx));
3125                        }
3126                    }
3127                    Err(broadcast::error::RecvError::Closed) => {
3128                        return None;
3129                    }
3130                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3131                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3132                        continue;
3133                    }
3134                }
3135            }
3136        }
3137    });
3138
3139    Sse::new(stream).keep_alive(
3140        axum::response::sse::KeepAlive::new()
3141            .interval(std::time::Duration::from_secs(15))
3142            .text("keep-alive-text"),
3143    )
3144}
3145
3146// ========== AI-Powered Features ==========
3147
3148/// Request for AI-powered API specification generation
3149#[derive(Debug, Deserialize)]
3150pub struct GenerateSpecRequest {
3151    /// Natural language description of the API to generate
3152    pub query: String,
3153    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3154    pub spec_type: String,
3155    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3156    pub api_version: Option<String>,
3157}
3158
3159/// Request for OpenAPI generation from recorded traffic
3160#[derive(Debug, Deserialize)]
3161pub struct GenerateOpenApiFromTrafficRequest {
3162    /// Path to recorder database (optional, defaults to ./recordings.db)
3163    #[serde(default)]
3164    pub database_path: Option<String>,
3165    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3166    #[serde(default)]
3167    pub since: Option<String>,
3168    /// End time for filtering (ISO 8601 format)
3169    #[serde(default)]
3170    pub until: Option<String>,
3171    /// Path pattern filter (supports wildcards, e.g., /api/*)
3172    #[serde(default)]
3173    pub path_pattern: Option<String>,
3174    /// Minimum confidence score for including paths (0.0 to 1.0)
3175    #[serde(default = "default_min_confidence")]
3176    pub min_confidence: f64,
3177}
3178
3179fn default_min_confidence() -> f64 {
3180    0.7
3181}
3182
3183/// Generate API specification from natural language using AI
3184#[cfg(feature = "data-faker")]
3185async fn generate_ai_spec(
3186    State(_state): State<ManagementState>,
3187    Json(request): Json<GenerateSpecRequest>,
3188) -> impl IntoResponse {
3189    use mockforge_data::rag::{
3190        config::{LlmProvider, RagConfig},
3191        engine::RagEngine,
3192        storage::DocumentStorage,
3193    };
3194    use std::sync::Arc;
3195
3196    // Build RAG config from environment variables
3197    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3198        .ok()
3199        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3200
3201    // Check if RAG is configured - require API key
3202    if api_key.is_none() {
3203        return (
3204            StatusCode::SERVICE_UNAVAILABLE,
3205            Json(serde_json::json!({
3206                "error": "AI service not configured",
3207                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3208            })),
3209        )
3210            .into_response();
3211    }
3212
3213    // Build RAG configuration
3214    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3215        .unwrap_or_else(|_| "openai".to_string())
3216        .to_lowercase();
3217
3218    let provider = match provider_str.as_str() {
3219        "openai" => LlmProvider::OpenAI,
3220        "anthropic" => LlmProvider::Anthropic,
3221        "ollama" => LlmProvider::Ollama,
3222        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3223        _ => LlmProvider::OpenAI,
3224    };
3225
3226    let api_endpoint =
3227        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3228            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3229            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3230            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3231            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3232        });
3233
3234    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3235        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3236        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3237        LlmProvider::Ollama => "llama2".to_string(),
3238        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3239    });
3240
3241    // Build RagConfig using struct literal with defaults
3242    let rag_config = RagConfig {
3243        provider,
3244        api_endpoint,
3245        api_key,
3246        model,
3247        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3248            .unwrap_or_else(|_| "4096".to_string())
3249            .parse()
3250            .unwrap_or(4096),
3251        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3252            .unwrap_or_else(|_| "0.3".to_string())
3253            .parse()
3254            .unwrap_or(0.3), // Lower temperature for more structured output
3255        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3256            .unwrap_or_else(|_| "60".to_string())
3257            .parse()
3258            .unwrap_or(60),
3259        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3260            .unwrap_or_else(|_| "4000".to_string())
3261            .parse()
3262            .unwrap_or(4000),
3263        ..Default::default()
3264    };
3265
3266    // Build the prompt for spec generation
3267    let spec_type_label = match request.spec_type.as_str() {
3268        "openapi" => "OpenAPI 3.0",
3269        "graphql" => "GraphQL",
3270        "asyncapi" => "AsyncAPI",
3271        _ => "OpenAPI 3.0",
3272    };
3273
3274    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3275
3276    let prompt = format!(
3277        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3278
3279User Requirements:
3280{}
3281
3282Instructions:
32831. Generate a complete, valid {} specification
32842. Include all paths, operations, request/response schemas, and components
32853. Use realistic field names and data types
32864. Include proper descriptions and examples
32875. Follow {} best practices
32886. Return ONLY the specification, no additional explanation
32897. For OpenAPI, use version {}
3290
3291Return the specification in {} format."#,
3292        spec_type_label,
3293        request.query,
3294        spec_type_label,
3295        spec_type_label,
3296        api_version,
3297        if request.spec_type == "graphql" {
3298            "GraphQL SDL"
3299        } else {
3300            "YAML"
3301        }
3302    );
3303
3304    // Create in-memory storage for RAG engine
3305    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3306    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3307    // a simpler approach: create InMemoryStorage directly
3308    use mockforge_data::rag::storage::InMemoryStorage;
3309    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3310
3311    // Create RAG engine
3312    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3313        Ok(engine) => engine,
3314        Err(e) => {
3315            return (
3316                StatusCode::INTERNAL_SERVER_ERROR,
3317                Json(serde_json::json!({
3318                    "error": "Failed to initialize RAG engine",
3319                    "message": e.to_string()
3320                })),
3321            )
3322                .into_response();
3323        }
3324    };
3325
3326    // Generate using RAG engine
3327    match rag_engine.generate(&prompt, None).await {
3328        Ok(generated_text) => {
3329            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3330            let spec = if request.spec_type == "graphql" {
3331                // For GraphQL, extract SDL
3332                extract_graphql_schema(&generated_text)
3333            } else {
3334                // For OpenAPI/AsyncAPI, extract YAML
3335                extract_yaml_spec(&generated_text)
3336            };
3337
3338            Json(serde_json::json!({
3339                "success": true,
3340                "spec": spec,
3341                "spec_type": request.spec_type,
3342            }))
3343            .into_response()
3344        }
3345        Err(e) => (
3346            StatusCode::INTERNAL_SERVER_ERROR,
3347            Json(serde_json::json!({
3348                "error": "AI generation failed",
3349                "message": e.to_string()
3350            })),
3351        )
3352            .into_response(),
3353    }
3354}
3355
3356#[cfg(not(feature = "data-faker"))]
3357async fn generate_ai_spec(
3358    State(_state): State<ManagementState>,
3359    Json(_request): Json<GenerateSpecRequest>,
3360) -> impl IntoResponse {
3361    (
3362        StatusCode::NOT_IMPLEMENTED,
3363        Json(serde_json::json!({
3364            "error": "AI features not enabled",
3365            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3366        })),
3367    )
3368        .into_response()
3369}
3370
3371/// Generate OpenAPI specification from recorded traffic
3372#[cfg(feature = "behavioral-cloning")]
3373async fn generate_openapi_from_traffic(
3374    State(_state): State<ManagementState>,
3375    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3376) -> impl IntoResponse {
3377    use chrono::{DateTime, Utc};
3378    use mockforge_core::intelligent_behavior::{
3379        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3380        IntelligentBehaviorConfig,
3381    };
3382    use mockforge_recorder::{
3383        database::RecorderDatabase,
3384        openapi_export::{QueryFilters, RecordingsToOpenApi},
3385    };
3386    use std::path::PathBuf;
3387
3388    // Determine database path
3389    let db_path = if let Some(ref path) = request.database_path {
3390        PathBuf::from(path)
3391    } else {
3392        std::env::current_dir()
3393            .unwrap_or_else(|_| PathBuf::from("."))
3394            .join("recordings.db")
3395    };
3396
3397    // Open database
3398    let db = match RecorderDatabase::new(&db_path).await {
3399        Ok(db) => db,
3400        Err(e) => {
3401            return (
3402                StatusCode::BAD_REQUEST,
3403                Json(serde_json::json!({
3404                    "error": "Database error",
3405                    "message": format!("Failed to open recorder database: {}", e)
3406                })),
3407            )
3408                .into_response();
3409        }
3410    };
3411
3412    // Parse time filters
3413    let since_dt = if let Some(ref since_str) = request.since {
3414        match DateTime::parse_from_rfc3339(since_str) {
3415            Ok(dt) => Some(dt.with_timezone(&Utc)),
3416            Err(e) => {
3417                return (
3418                    StatusCode::BAD_REQUEST,
3419                    Json(serde_json::json!({
3420                        "error": "Invalid date format",
3421                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3422                    })),
3423                )
3424                    .into_response();
3425            }
3426        }
3427    } else {
3428        None
3429    };
3430
3431    let until_dt = if let Some(ref until_str) = request.until {
3432        match DateTime::parse_from_rfc3339(until_str) {
3433            Ok(dt) => Some(dt.with_timezone(&Utc)),
3434            Err(e) => {
3435                return (
3436                    StatusCode::BAD_REQUEST,
3437                    Json(serde_json::json!({
3438                        "error": "Invalid date format",
3439                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3440                    })),
3441                )
3442                    .into_response();
3443            }
3444        }
3445    } else {
3446        None
3447    };
3448
3449    // Build query filters
3450    let query_filters = QueryFilters {
3451        since: since_dt,
3452        until: until_dt,
3453        path_pattern: request.path_pattern.clone(),
3454        min_status_code: None,
3455        max_requests: Some(1000),
3456    };
3457
3458    // Query HTTP exchanges
3459    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3460    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3461    // dependency, so we need to manually convert to the local version.
3462    let exchanges_from_recorder =
3463        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3464            Ok(exchanges) => exchanges,
3465            Err(e) => {
3466                return (
3467                    StatusCode::INTERNAL_SERVER_ERROR,
3468                    Json(serde_json::json!({
3469                        "error": "Query error",
3470                        "message": format!("Failed to query HTTP exchanges: {}", e)
3471                    })),
3472                )
3473                    .into_response();
3474            }
3475        };
3476
3477    if exchanges_from_recorder.is_empty() {
3478        return (
3479            StatusCode::NOT_FOUND,
3480            Json(serde_json::json!({
3481                "error": "No exchanges found",
3482                "message": "No HTTP exchanges found matching the specified filters"
3483            })),
3484        )
3485            .into_response();
3486    }
3487
3488    // Convert to local HttpExchange type to avoid version mismatch
3489    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3490    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3491        .into_iter()
3492        .map(|e| LocalHttpExchange {
3493            method: e.method,
3494            path: e.path,
3495            query_params: e.query_params,
3496            headers: e.headers,
3497            body: e.body,
3498            body_encoding: e.body_encoding,
3499            status_code: e.status_code,
3500            response_headers: e.response_headers,
3501            response_body: e.response_body,
3502            response_body_encoding: e.response_body_encoding,
3503            timestamp: e.timestamp,
3504        })
3505        .collect();
3506
3507    // Create OpenAPI generator config
3508    let behavior_config = IntelligentBehaviorConfig::default();
3509    let gen_config = OpenApiGenerationConfig {
3510        min_confidence: request.min_confidence,
3511        behavior_model: Some(behavior_config.behavior_model),
3512    };
3513
3514    // Generate OpenAPI spec
3515    let generator = OpenApiSpecGenerator::new(gen_config);
3516    let result = match generator.generate_from_exchanges(exchanges).await {
3517        Ok(result) => result,
3518        Err(e) => {
3519            return (
3520                StatusCode::INTERNAL_SERVER_ERROR,
3521                Json(serde_json::json!({
3522                    "error": "Generation error",
3523                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3524                })),
3525            )
3526                .into_response();
3527        }
3528    };
3529
3530    // Prepare response
3531    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3532        raw.clone()
3533    } else {
3534        match serde_json::to_value(&result.spec.spec) {
3535            Ok(json) => json,
3536            Err(e) => {
3537                return (
3538                    StatusCode::INTERNAL_SERVER_ERROR,
3539                    Json(serde_json::json!({
3540                        "error": "Serialization error",
3541                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3542                    })),
3543                )
3544                    .into_response();
3545            }
3546        }
3547    };
3548
3549    // Build response with metadata
3550    let response = serde_json::json!({
3551        "spec": spec_json,
3552        "metadata": {
3553            "requests_analyzed": result.metadata.requests_analyzed,
3554            "paths_inferred": result.metadata.paths_inferred,
3555            "path_confidence": result.metadata.path_confidence,
3556            "generated_at": result.metadata.generated_at.to_rfc3339(),
3557            "duration_ms": result.metadata.duration_ms,
3558        }
3559    });
3560
3561    Json(response).into_response()
3562}
3563
3564/// List all rule explanations
3565async fn list_rule_explanations(
3566    State(state): State<ManagementState>,
3567    Query(params): Query<std::collections::HashMap<String, String>>,
3568) -> impl IntoResponse {
3569    use mockforge_core::intelligent_behavior::RuleType;
3570
3571    let explanations = state.rule_explanations.read().await;
3572    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3573
3574    // Filter by rule type if provided
3575    if let Some(rule_type_str) = params.get("rule_type") {
3576        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3577            explanations_vec.retain(|e| e.rule_type == rule_type);
3578        }
3579    }
3580
3581    // Filter by minimum confidence if provided
3582    if let Some(min_confidence_str) = params.get("min_confidence") {
3583        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3584            explanations_vec.retain(|e| e.confidence >= min_confidence);
3585        }
3586    }
3587
3588    // Sort by confidence (descending) and then by generated_at (descending)
3589    explanations_vec.sort_by(|a, b| {
3590        b.confidence
3591            .partial_cmp(&a.confidence)
3592            .unwrap_or(std::cmp::Ordering::Equal)
3593            .then_with(|| b.generated_at.cmp(&a.generated_at))
3594    });
3595
3596    Json(serde_json::json!({
3597        "explanations": explanations_vec,
3598        "total": explanations_vec.len(),
3599    }))
3600    .into_response()
3601}
3602
3603/// Get a specific rule explanation by ID
3604async fn get_rule_explanation(
3605    State(state): State<ManagementState>,
3606    Path(rule_id): Path<String>,
3607) -> impl IntoResponse {
3608    let explanations = state.rule_explanations.read().await;
3609
3610    match explanations.get(&rule_id) {
3611        Some(explanation) => Json(serde_json::json!({
3612            "explanation": explanation,
3613        }))
3614        .into_response(),
3615        None => (
3616            StatusCode::NOT_FOUND,
3617            Json(serde_json::json!({
3618                "error": "Rule explanation not found",
3619                "message": format!("No explanation found for rule ID: {}", rule_id)
3620            })),
3621        )
3622            .into_response(),
3623    }
3624}
3625
3626/// Request for learning from examples
3627#[derive(Debug, Deserialize)]
3628pub struct LearnFromExamplesRequest {
3629    /// Example request/response pairs to learn from
3630    pub examples: Vec<ExamplePairRequest>,
3631    /// Optional configuration override
3632    #[serde(default)]
3633    pub config: Option<serde_json::Value>,
3634}
3635
3636/// Example pair request format
3637#[derive(Debug, Deserialize)]
3638pub struct ExamplePairRequest {
3639    /// Request data (method, path, body, etc.)
3640    pub request: serde_json::Value,
3641    /// Response data (status_code, body, etc.)
3642    pub response: serde_json::Value,
3643}
3644
3645/// Learn behavioral rules from example pairs
3646///
3647/// This endpoint accepts example request/response pairs, generates behavioral rules
3648/// with explanations, and stores the explanations for later retrieval.
3649async fn learn_from_examples(
3650    State(state): State<ManagementState>,
3651    Json(request): Json<LearnFromExamplesRequest>,
3652) -> impl IntoResponse {
3653    use mockforge_core::intelligent_behavior::{
3654        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3655        rule_generator::{ExamplePair, RuleGenerator},
3656    };
3657
3658    if request.examples.is_empty() {
3659        return (
3660            StatusCode::BAD_REQUEST,
3661            Json(serde_json::json!({
3662                "error": "No examples provided",
3663                "message": "At least one example pair is required"
3664            })),
3665        )
3666            .into_response();
3667    }
3668
3669    // Convert request examples to ExamplePair format
3670    let example_pairs: Result<Vec<ExamplePair>, String> = request
3671        .examples
3672        .into_iter()
3673        .enumerate()
3674        .map(|(idx, ex)| {
3675            // Parse request JSON to extract method, path, body, etc.
3676            let method = ex
3677                .request
3678                .get("method")
3679                .and_then(|v| v.as_str())
3680                .map(|s| s.to_string())
3681                .unwrap_or_else(|| "GET".to_string());
3682            let path = ex
3683                .request
3684                .get("path")
3685                .and_then(|v| v.as_str())
3686                .map(|s| s.to_string())
3687                .unwrap_or_else(|| "/".to_string());
3688            let request_body = ex.request.get("body").cloned();
3689            let query_params = ex
3690                .request
3691                .get("query_params")
3692                .and_then(|v| v.as_object())
3693                .map(|obj| {
3694                    obj.iter()
3695                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3696                        .collect()
3697                })
3698                .unwrap_or_default();
3699            let headers = ex
3700                .request
3701                .get("headers")
3702                .and_then(|v| v.as_object())
3703                .map(|obj| {
3704                    obj.iter()
3705                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3706                        .collect()
3707                })
3708                .unwrap_or_default();
3709
3710            // Parse response JSON to extract status, body, etc.
3711            let status = ex
3712                .response
3713                .get("status_code")
3714                .or_else(|| ex.response.get("status"))
3715                .and_then(|v| v.as_u64())
3716                .map(|n| n as u16)
3717                .unwrap_or(200);
3718            let response_body = ex.response.get("body").cloned();
3719
3720            Ok(ExamplePair {
3721                method,
3722                path,
3723                request: request_body,
3724                status,
3725                response: response_body,
3726                query_params,
3727                headers,
3728                metadata: {
3729                    let mut meta = std::collections::HashMap::new();
3730                    meta.insert("source".to_string(), "api".to_string());
3731                    meta.insert("example_index".to_string(), idx.to_string());
3732                    meta
3733                },
3734            })
3735        })
3736        .collect();
3737
3738    let example_pairs = match example_pairs {
3739        Ok(pairs) => pairs,
3740        Err(e) => {
3741            return (
3742                StatusCode::BAD_REQUEST,
3743                Json(serde_json::json!({
3744                    "error": "Invalid examples",
3745                    "message": e
3746                })),
3747            )
3748                .into_response();
3749        }
3750    };
3751
3752    // Create behavior config (use provided config or default)
3753    let behavior_config = if let Some(config_json) = request.config {
3754        // Try to deserialize custom config, fall back to default
3755        serde_json::from_value(config_json)
3756            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3757            .behavior_model
3758    } else {
3759        BehaviorModelConfig::default()
3760    };
3761
3762    // Create rule generator
3763    let generator = RuleGenerator::new(behavior_config);
3764
3765    // Generate rules with explanations
3766    let (rules, explanations) =
3767        match generator.generate_rules_with_explanations(example_pairs).await {
3768            Ok(result) => result,
3769            Err(e) => {
3770                return (
3771                    StatusCode::INTERNAL_SERVER_ERROR,
3772                    Json(serde_json::json!({
3773                        "error": "Rule generation failed",
3774                        "message": format!("Failed to generate rules: {}", e)
3775                    })),
3776                )
3777                    .into_response();
3778            }
3779        };
3780
3781    // Store explanations in ManagementState
3782    {
3783        let mut stored_explanations = state.rule_explanations.write().await;
3784        for explanation in &explanations {
3785            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3786        }
3787    }
3788
3789    // Prepare response
3790    let response = serde_json::json!({
3791        "success": true,
3792        "rules_generated": {
3793            "consistency_rules": rules.consistency_rules.len(),
3794            "schemas": rules.schemas.len(),
3795            "state_machines": rules.state_transitions.len(),
3796            "system_prompt": !rules.system_prompt.is_empty(),
3797        },
3798        "explanations": explanations.iter().map(|e| serde_json::json!({
3799            "rule_id": e.rule_id,
3800            "rule_type": e.rule_type,
3801            "confidence": e.confidence,
3802            "reasoning": e.reasoning,
3803        })).collect::<Vec<_>>(),
3804        "total_explanations": explanations.len(),
3805    });
3806
3807    Json(response).into_response()
3808}
3809
3810#[cfg(feature = "data-faker")]
3811fn extract_yaml_spec(text: &str) -> String {
3812    // Try to find YAML code blocks
3813    if let Some(start) = text.find("```yaml") {
3814        let yaml_start = text[start + 7..].trim_start();
3815        if let Some(end) = yaml_start.find("```") {
3816            return yaml_start[..end].trim().to_string();
3817        }
3818    }
3819    if let Some(start) = text.find("```") {
3820        let content_start = text[start + 3..].trim_start();
3821        if let Some(end) = content_start.find("```") {
3822            return content_start[..end].trim().to_string();
3823        }
3824    }
3825
3826    // Check if it starts with openapi: or asyncapi:
3827    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3828        return text.trim().to_string();
3829    }
3830
3831    // Return as-is if no code blocks found
3832    text.trim().to_string()
3833}
3834
3835/// Extract GraphQL schema from text content
3836#[cfg(feature = "data-faker")]
3837fn extract_graphql_schema(text: &str) -> String {
3838    // Try to find GraphQL code blocks
3839    if let Some(start) = text.find("```graphql") {
3840        let schema_start = text[start + 10..].trim_start();
3841        if let Some(end) = schema_start.find("```") {
3842            return schema_start[..end].trim().to_string();
3843        }
3844    }
3845    if let Some(start) = text.find("```") {
3846        let content_start = text[start + 3..].trim_start();
3847        if let Some(end) = content_start.find("```") {
3848            return content_start[..end].trim().to_string();
3849        }
3850    }
3851
3852    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3853    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3854        return text.trim().to_string();
3855    }
3856
3857    text.trim().to_string()
3858}
3859
3860// ========== Chaos Engineering Management ==========
3861
3862/// Get current chaos engineering configuration
3863async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3864    #[cfg(feature = "chaos")]
3865    {
3866        if let Some(chaos_state) = &_state.chaos_api_state {
3867            let config = chaos_state.config.read().await;
3868            // Convert ChaosConfig to JSON response format
3869            Json(serde_json::json!({
3870                "enabled": config.enabled,
3871                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3872                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3873                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3874                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3875            }))
3876            .into_response()
3877        } else {
3878            // Chaos API not available, return default
3879            Json(serde_json::json!({
3880                "enabled": false,
3881                "latency": null,
3882                "fault_injection": null,
3883                "rate_limit": null,
3884                "traffic_shaping": null,
3885            }))
3886            .into_response()
3887        }
3888    }
3889    #[cfg(not(feature = "chaos"))]
3890    {
3891        // Chaos feature not enabled
3892        Json(serde_json::json!({
3893            "enabled": false,
3894            "latency": null,
3895            "fault_injection": null,
3896            "rate_limit": null,
3897            "traffic_shaping": null,
3898        }))
3899        .into_response()
3900    }
3901}
3902
3903/// Request to update chaos configuration
3904#[derive(Debug, Deserialize)]
3905pub struct ChaosConfigUpdate {
3906    /// Whether to enable chaos engineering
3907    pub enabled: Option<bool>,
3908    /// Latency configuration
3909    pub latency: Option<serde_json::Value>,
3910    /// Fault injection configuration
3911    pub fault_injection: Option<serde_json::Value>,
3912    /// Rate limiting configuration
3913    pub rate_limit: Option<serde_json::Value>,
3914    /// Traffic shaping configuration
3915    pub traffic_shaping: Option<serde_json::Value>,
3916}
3917
3918/// Update chaos engineering configuration
3919async fn update_chaos_config(
3920    State(_state): State<ManagementState>,
3921    Json(_config_update): Json<ChaosConfigUpdate>,
3922) -> impl IntoResponse {
3923    #[cfg(feature = "chaos")]
3924    {
3925        if let Some(chaos_state) = &_state.chaos_api_state {
3926            use mockforge_chaos::config::{
3927                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3928            };
3929
3930            let mut config = chaos_state.config.write().await;
3931
3932            // Update enabled flag if provided
3933            if let Some(enabled) = _config_update.enabled {
3934                config.enabled = enabled;
3935            }
3936
3937            // Update latency config if provided
3938            if let Some(latency_json) = _config_update.latency {
3939                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3940                    config.latency = Some(latency);
3941                }
3942            }
3943
3944            // Update fault injection config if provided
3945            if let Some(fault_json) = _config_update.fault_injection {
3946                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3947                    config.fault_injection = Some(fault);
3948                }
3949            }
3950
3951            // Update rate limit config if provided
3952            if let Some(rate_json) = _config_update.rate_limit {
3953                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3954                    config.rate_limit = Some(rate);
3955                }
3956            }
3957
3958            // Update traffic shaping config if provided
3959            if let Some(traffic_json) = _config_update.traffic_shaping {
3960                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3961                    config.traffic_shaping = Some(traffic);
3962                }
3963            }
3964
3965            // Reinitialize middleware injectors with new config
3966            // The middleware will pick up the changes on the next request
3967            drop(config);
3968
3969            info!("Chaos configuration updated successfully");
3970            Json(serde_json::json!({
3971                "success": true,
3972                "message": "Chaos configuration updated and applied"
3973            }))
3974            .into_response()
3975        } else {
3976            (
3977                StatusCode::SERVICE_UNAVAILABLE,
3978                Json(serde_json::json!({
3979                    "success": false,
3980                    "error": "Chaos API not available",
3981                    "message": "Chaos engineering is not enabled or configured"
3982                })),
3983            )
3984                .into_response()
3985        }
3986    }
3987    #[cfg(not(feature = "chaos"))]
3988    {
3989        (
3990            StatusCode::NOT_IMPLEMENTED,
3991            Json(serde_json::json!({
3992                "success": false,
3993                "error": "Chaos feature not enabled",
3994                "message": "Chaos engineering feature is not compiled into this build"
3995            })),
3996        )
3997            .into_response()
3998    }
3999}
4000
4001// ========== Network Profile Management ==========
4002
4003/// List available network profiles
4004async fn list_network_profiles() -> impl IntoResponse {
4005    use mockforge_core::network_profiles::NetworkProfileCatalog;
4006
4007    let catalog = NetworkProfileCatalog::default();
4008    let profiles: Vec<serde_json::Value> = catalog
4009        .list_profiles_with_description()
4010        .iter()
4011        .map(|(name, description)| {
4012            serde_json::json!({
4013                "name": name,
4014                "description": description,
4015            })
4016        })
4017        .collect();
4018
4019    Json(serde_json::json!({
4020        "profiles": profiles
4021    }))
4022    .into_response()
4023}
4024
4025#[derive(Debug, Deserialize)]
4026/// Request to apply a network profile
4027pub struct ApplyNetworkProfileRequest {
4028    /// Name of the network profile to apply
4029    pub profile_name: String,
4030}
4031
4032/// Apply a network profile
4033async fn apply_network_profile(
4034    State(state): State<ManagementState>,
4035    Json(request): Json<ApplyNetworkProfileRequest>,
4036) -> impl IntoResponse {
4037    use mockforge_core::network_profiles::NetworkProfileCatalog;
4038
4039    let catalog = NetworkProfileCatalog::default();
4040    if let Some(profile) = catalog.get(&request.profile_name) {
4041        // Apply profile to server configuration if available
4042        // NetworkProfile contains latency and traffic_shaping configs
4043        if let Some(server_config) = &state.server_config {
4044            let mut config = server_config.write().await;
4045
4046            // Apply network profile's traffic shaping to core config
4047            use mockforge_core::config::NetworkShapingConfig;
4048
4049            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
4050            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
4051            // which has bandwidth and burst_loss fields
4052            let network_shaping = NetworkShapingConfig {
4053                enabled: profile.traffic_shaping.bandwidth.enabled
4054                    || profile.traffic_shaping.burst_loss.enabled,
4055                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4056                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4057                max_connections: 1000, // Default value
4058            };
4059
4060            // Update chaos config if it exists, or create it
4061            // Chaos config is in observability.chaos, not core.chaos
4062            if let Some(ref mut chaos) = config.observability.chaos {
4063                chaos.traffic_shaping = Some(network_shaping);
4064            } else {
4065                // Create minimal chaos config with traffic shaping
4066                use mockforge_core::config::ChaosEngConfig;
4067                config.observability.chaos = Some(ChaosEngConfig {
4068                    enabled: true,
4069                    latency: None,
4070                    fault_injection: None,
4071                    rate_limit: None,
4072                    traffic_shaping: Some(network_shaping),
4073                    scenario: None,
4074                });
4075            }
4076
4077            info!("Network profile '{}' applied to server configuration", request.profile_name);
4078        } else {
4079            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4080        }
4081
4082        // Also update chaos API state if available
4083        #[cfg(feature = "chaos")]
4084        {
4085            if let Some(chaos_state) = &state.chaos_api_state {
4086                use mockforge_chaos::config::TrafficShapingConfig;
4087
4088                let mut chaos_config = chaos_state.config.write().await;
4089                // Apply profile's traffic shaping to chaos API state
4090                let chaos_traffic_shaping = TrafficShapingConfig {
4091                    enabled: profile.traffic_shaping.bandwidth.enabled
4092                        || profile.traffic_shaping.burst_loss.enabled,
4093                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4094                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4095                    max_connections: 0,
4096                    connection_timeout_ms: 30000,
4097                };
4098                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4099                chaos_config.enabled = true; // Enable chaos when applying a profile
4100                drop(chaos_config);
4101                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4102            }
4103        }
4104
4105        Json(serde_json::json!({
4106            "success": true,
4107            "message": format!("Network profile '{}' applied", request.profile_name),
4108            "profile": {
4109                "name": profile.name,
4110                "description": profile.description,
4111            }
4112        }))
4113        .into_response()
4114    } else {
4115        (
4116            StatusCode::NOT_FOUND,
4117            Json(serde_json::json!({
4118                "error": "Profile not found",
4119                "message": format!("Network profile '{}' not found", request.profile_name)
4120            })),
4121        )
4122            .into_response()
4123    }
4124}
4125
4126/// Build the management API router with UI Builder support
4127pub fn management_router_with_ui_builder(
4128    state: ManagementState,
4129    server_config: mockforge_core::config::ServerConfig,
4130) -> Router {
4131    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4132
4133    // Create the base management router
4134    let management = management_router(state);
4135
4136    // Create UI Builder state and router
4137    let ui_builder_state = UIBuilderState::new(server_config);
4138    let ui_builder = create_ui_builder_router(ui_builder_state);
4139
4140    // Nest UI Builder under /ui-builder
4141    management.nest("/ui-builder", ui_builder)
4142}
4143
4144/// Build management router with spec import API
4145pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4146    use crate::spec_import::{spec_import_router, SpecImportState};
4147
4148    // Create base management router
4149    let management = management_router(state);
4150
4151    // Merge with spec import router
4152    Router::new()
4153        .merge(management)
4154        .merge(spec_import_router(SpecImportState::new()))
4155}
4156
4157#[cfg(test)]
4158mod tests {
4159    use super::*;
4160
4161    #[tokio::test]
4162    async fn test_create_and_get_mock() {
4163        let state = ManagementState::new(None, None, 3000);
4164
4165        let mock = MockConfig {
4166            id: "test-1".to_string(),
4167            name: "Test Mock".to_string(),
4168            method: "GET".to_string(),
4169            path: "/test".to_string(),
4170            response: MockResponse {
4171                body: serde_json::json!({"message": "test"}),
4172                headers: None,
4173            },
4174            enabled: true,
4175            latency_ms: None,
4176            status_code: Some(200),
4177            request_match: None,
4178            priority: None,
4179            scenario: None,
4180            required_scenario_state: None,
4181            new_scenario_state: None,
4182        };
4183
4184        // Create mock
4185        {
4186            let mut mocks = state.mocks.write().await;
4187            mocks.push(mock.clone());
4188        }
4189
4190        // Get mock
4191        let mocks = state.mocks.read().await;
4192        let found = mocks.iter().find(|m| m.id == "test-1");
4193        assert!(found.is_some());
4194        assert_eq!(found.unwrap().name, "Test Mock");
4195    }
4196
4197    #[tokio::test]
4198    async fn test_server_stats() {
4199        let state = ManagementState::new(None, None, 3000);
4200
4201        // Add some mocks
4202        {
4203            let mut mocks = state.mocks.write().await;
4204            mocks.push(MockConfig {
4205                id: "1".to_string(),
4206                name: "Mock 1".to_string(),
4207                method: "GET".to_string(),
4208                path: "/test1".to_string(),
4209                response: MockResponse {
4210                    body: serde_json::json!({}),
4211                    headers: None,
4212                },
4213                enabled: true,
4214                latency_ms: None,
4215                status_code: Some(200),
4216                request_match: None,
4217                priority: None,
4218                scenario: None,
4219                required_scenario_state: None,
4220                new_scenario_state: None,
4221            });
4222            mocks.push(MockConfig {
4223                id: "2".to_string(),
4224                name: "Mock 2".to_string(),
4225                method: "POST".to_string(),
4226                path: "/test2".to_string(),
4227                response: MockResponse {
4228                    body: serde_json::json!({}),
4229                    headers: None,
4230                },
4231                enabled: false,
4232                latency_ms: None,
4233                status_code: Some(201),
4234                request_match: None,
4235                priority: None,
4236                scenario: None,
4237                required_scenario_state: None,
4238                new_scenario_state: None,
4239            });
4240        }
4241
4242        let mocks = state.mocks.read().await;
4243        assert_eq!(mocks.len(), 2);
4244        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4245    }
4246
4247    #[test]
4248    fn test_mock_matches_request_with_xpath_absolute_path() {
4249        let mock = MockConfig {
4250            id: "xpath-1".to_string(),
4251            name: "XPath Match".to_string(),
4252            method: "POST".to_string(),
4253            path: "/xml".to_string(),
4254            response: MockResponse {
4255                body: serde_json::json!({"ok": true}),
4256                headers: None,
4257            },
4258            enabled: true,
4259            latency_ms: None,
4260            status_code: Some(200),
4261            request_match: Some(RequestMatchCriteria {
4262                xpath: Some("/root/order/id".to_string()),
4263                ..Default::default()
4264            }),
4265            priority: None,
4266            scenario: None,
4267            required_scenario_state: None,
4268            new_scenario_state: None,
4269        };
4270
4271        let body = br#"<root><order><id>123</id></order></root>"#;
4272        let headers = std::collections::HashMap::new();
4273        let query = std::collections::HashMap::new();
4274
4275        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4276    }
4277
4278    #[test]
4279    fn test_mock_matches_request_with_xpath_text_predicate() {
4280        let mock = MockConfig {
4281            id: "xpath-2".to_string(),
4282            name: "XPath Predicate Match".to_string(),
4283            method: "POST".to_string(),
4284            path: "/xml".to_string(),
4285            response: MockResponse {
4286                body: serde_json::json!({"ok": true}),
4287                headers: None,
4288            },
4289            enabled: true,
4290            latency_ms: None,
4291            status_code: Some(200),
4292            request_match: Some(RequestMatchCriteria {
4293                xpath: Some("//order/id[text()='123']".to_string()),
4294                ..Default::default()
4295            }),
4296            priority: None,
4297            scenario: None,
4298            required_scenario_state: None,
4299            new_scenario_state: None,
4300        };
4301
4302        let body = br#"<root><order><id>123</id></order></root>"#;
4303        let headers = std::collections::HashMap::new();
4304        let query = std::collections::HashMap::new();
4305
4306        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4307    }
4308
4309    #[test]
4310    fn test_mock_matches_request_with_xpath_no_match() {
4311        let mock = MockConfig {
4312            id: "xpath-3".to_string(),
4313            name: "XPath No Match".to_string(),
4314            method: "POST".to_string(),
4315            path: "/xml".to_string(),
4316            response: MockResponse {
4317                body: serde_json::json!({"ok": true}),
4318                headers: None,
4319            },
4320            enabled: true,
4321            latency_ms: None,
4322            status_code: Some(200),
4323            request_match: Some(RequestMatchCriteria {
4324                xpath: Some("//order/id[text()='456']".to_string()),
4325                ..Default::default()
4326            }),
4327            priority: None,
4328            scenario: None,
4329            required_scenario_state: None,
4330            new_scenario_state: None,
4331        };
4332
4333        let body = br#"<root><order><id>123</id></order></root>"#;
4334        let headers = std::collections::HashMap::new();
4335        let query = std::collections::HashMap::new();
4336
4337        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4338    }
4339}