Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128/// Mock response configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131    /// Response body as JSON
132    pub body: serde_json::Value,
133    /// Optional custom response headers
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138/// Request matching criteria for advanced request matching
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141    /// Headers that must be present and match (case-insensitive header names)
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub headers: std::collections::HashMap<String, String>,
144    /// Query parameters that must be present and match
145    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146    pub query_params: std::collections::HashMap<String, String>,
147    /// Request body pattern (supports exact match or regex)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub body_pattern: Option<String>,
150    /// JSONPath expression for JSON body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub json_path: Option<String>,
153    /// XPath expression for XML body matching
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub xpath: Option<String>,
156    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub custom_matcher: Option<String>,
159}
160
161/// Check if a request matches the given mock configuration
162///
163/// This function implements comprehensive request matching including:
164/// - Method and path matching
165/// - Header matching (with regex support)
166/// - Query parameter matching
167/// - Body pattern matching (exact, regex, JSONPath, XPath)
168/// - Custom matcher expressions
169pub fn mock_matches_request(
170    mock: &MockConfig,
171    method: &str,
172    path: &str,
173    headers: &std::collections::HashMap<String, String>,
174    query_params: &std::collections::HashMap<String, String>,
175    body: Option<&[u8]>,
176) -> bool {
177    use regex::Regex;
178
179    // Check if mock is enabled
180    if !mock.enabled {
181        return false;
182    }
183
184    // Check method (case-insensitive)
185    if mock.method.to_uppercase() != method.to_uppercase() {
186        return false;
187    }
188
189    // Check path pattern (supports wildcards and path parameters)
190    if !path_matches_pattern(&mock.path, path) {
191        return false;
192    }
193
194    // Check request matching criteria if present
195    if let Some(criteria) = &mock.request_match {
196        // Check headers
197        for (key, expected_value) in &criteria.headers {
198            let header_key_lower = key.to_lowercase();
199            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201            if let Some((_, actual_value)) = found {
202                // Try regex match first, then exact match
203                if let Ok(re) = Regex::new(expected_value) {
204                    if !re.is_match(actual_value) {
205                        return false;
206                    }
207                } else if actual_value != expected_value {
208                    return false;
209                }
210            } else {
211                return false; // Header not found
212            }
213        }
214
215        // Check query parameters
216        for (key, expected_value) in &criteria.query_params {
217            if let Some(actual_value) = query_params.get(key) {
218                if actual_value != expected_value {
219                    return false;
220                }
221            } else {
222                return false; // Query param not found
223            }
224        }
225
226        // Check body pattern
227        if let Some(pattern) = &criteria.body_pattern {
228            if let Some(body_bytes) = body {
229                let body_str = String::from_utf8_lossy(body_bytes);
230                // Try regex first, then exact match
231                if let Ok(re) = Regex::new(pattern) {
232                    if !re.is_match(&body_str) {
233                        return false;
234                    }
235                } else if body_str.as_ref() != pattern {
236                    return false;
237                }
238            } else {
239                return false; // Body required but not present
240            }
241        }
242
243        // Check JSONPath (simplified implementation)
244        if let Some(json_path) = &criteria.json_path {
245            if let Some(body_bytes) = body {
246                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248                        // Simple JSONPath check
249                        if !json_path_exists(&json_value, json_path) {
250                            return false;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Check XPath (supports a focused subset)
258        if let Some(xpath) = &criteria.xpath {
259            if let Some(body_bytes) = body {
260                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261                    if !xml_xpath_exists(body_str, xpath) {
262                        return false;
263                    }
264                } else {
265                    return false;
266                }
267            } else {
268                return false; // Body required but not present
269            }
270        }
271
272        // Check custom matcher
273        if let Some(custom) = &criteria.custom_matcher {
274            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275                return false;
276            }
277        }
278    }
279
280    true
281}
282
283/// Check if a path matches a pattern (supports wildcards and path parameters)
284fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285    // Exact match
286    if pattern == path {
287        return true;
288    }
289
290    // Wildcard match
291    if pattern == "*" {
292        return true;
293    }
294
295    // Path parameter matching (e.g., /users/{id} matches /users/123)
296    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299    if pattern_parts.len() != path_parts.len() {
300        // Check for wildcard patterns
301        if pattern.contains('*') {
302            return matches_wildcard_pattern(pattern, path);
303        }
304        return false;
305    }
306
307    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308        // Check for path parameters {param}
309        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310            continue; // Matches any value
311        }
312
313        if pattern_part != path_part {
314            return false;
315        }
316    }
317
318    true
319}
320
321/// Check if path matches a wildcard pattern
322fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323    use regex::Regex;
324
325    // Convert pattern to regex
326    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329        return re.is_match(path);
330    }
331
332    false
333}
334
335/// Check if a JSONPath exists in a JSON value
336///
337/// Supports:
338/// - `$` — root element
339/// - `$.field.subfield` — nested object access
340/// - `$.items[0].name` — array index access
341/// - `$.items[*]` — array wildcard (checks array is non-empty)
342fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343    let path = if json_path == "$" {
344        return true;
345    } else if let Some(p) = json_path.strip_prefix("$.") {
346        p
347    } else if let Some(p) = json_path.strip_prefix('$') {
348        p.strip_prefix('.').unwrap_or(p)
349    } else {
350        json_path
351    };
352
353    let mut current = json;
354    for segment in split_json_path_segments(path) {
355        match segment {
356            JsonPathSegment::Field(name) => {
357                if let Some(obj) = current.as_object() {
358                    if let Some(value) = obj.get(name) {
359                        current = value;
360                    } else {
361                        return false;
362                    }
363                } else {
364                    return false;
365                }
366            }
367            JsonPathSegment::Index(idx) => {
368                if let Some(arr) = current.as_array() {
369                    if let Some(value) = arr.get(idx) {
370                        current = value;
371                    } else {
372                        return false;
373                    }
374                } else {
375                    return false;
376                }
377            }
378            JsonPathSegment::Wildcard => {
379                if let Some(arr) = current.as_array() {
380                    return !arr.is_empty();
381                }
382                return false;
383            }
384        }
385    }
386    true
387}
388
389enum JsonPathSegment<'a> {
390    Field(&'a str),
391    Index(usize),
392    Wildcard,
393}
394
395/// Split a JSONPath (without the leading `$`) into segments
396fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397    let mut segments = Vec::new();
398    for part in path.split('.') {
399        if part.is_empty() {
400            continue;
401        }
402        if let Some(bracket_start) = part.find('[') {
403            let field_name = &part[..bracket_start];
404            if !field_name.is_empty() {
405                segments.push(JsonPathSegment::Field(field_name));
406            }
407            let bracket_content = &part[bracket_start + 1..part.len() - 1];
408            if bracket_content == "*" {
409                segments.push(JsonPathSegment::Wildcard);
410            } else if let Ok(idx) = bracket_content.parse::<usize>() {
411                segments.push(JsonPathSegment::Index(idx));
412            }
413        } else {
414            segments.push(JsonPathSegment::Field(part));
415        }
416    }
417    segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422    name: String,
423    text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427    if segment.is_empty() {
428        return None;
429    }
430
431    let trimmed = segment.trim();
432    if let Some(bracket_start) = trimmed.find('[') {
433        if !trimmed.ends_with(']') {
434            return None;
435        }
436
437        let name = trimmed[..bracket_start].trim();
438        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439        let predicate = predicate.trim();
440
441        // Support simple predicate: [text()="value"] or [text()='value']
442        if let Some(raw) = predicate.strip_prefix("text()=") {
443            let raw = raw.trim();
444            if raw.len() >= 2
445                && ((raw.starts_with('"') && raw.ends_with('"'))
446                    || (raw.starts_with('\'') && raw.ends_with('\'')))
447            {
448                let text = raw[1..raw.len() - 1].to_string();
449                if !name.is_empty() {
450                    return Some(XPathSegment {
451                        name: name.to_string(),
452                        text_equals: Some(text),
453                    });
454                }
455            }
456        }
457
458        None
459    } else {
460        Some(XPathSegment {
461            name: trimmed.to_string(),
462            text_equals: None,
463        })
464    }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468    if !node.is_element() {
469        return false;
470    }
471    if node.tag_name().name() != segment.name {
472        return false;
473    }
474    match &segment.text_equals {
475        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476        None => true,
477    }
478}
479
480/// Check if an XPath expression matches an XML body.
481///
482/// Supported subset:
483/// - Absolute paths: `/root/child/item`
484/// - Descendant search: `//item` and `//parent/child`
485/// - Optional text predicate per segment: `item[text()="value"]`
486fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487    let doc = match roxmltree::Document::parse(xml_body) {
488        Ok(doc) => doc,
489        Err(err) => {
490            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491            return false;
492        }
493    };
494
495    let expr = xpath.trim();
496    if expr.is_empty() {
497        return false;
498    }
499
500    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501        (true, rest)
502    } else if let Some(rest) = expr.strip_prefix('/') {
503        (false, rest)
504    } else {
505        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506        return false;
507    };
508
509    let segments: Vec<XPathSegment> = path_str
510        .split('/')
511        .filter(|s| !s.trim().is_empty())
512        .filter_map(parse_xpath_segment)
513        .collect();
514
515    if segments.is_empty() {
516        return false;
517    }
518
519    if is_descendant {
520        let first = &segments[0];
521        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522            let mut frontier = vec![node];
523            for segment in &segments[1..] {
524                let mut next_frontier = Vec::new();
525                for parent in &frontier {
526                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527                        next_frontier.push(child);
528                    }
529                }
530                if next_frontier.is_empty() {
531                    frontier.clear();
532                    break;
533                }
534                frontier = next_frontier;
535            }
536            if !frontier.is_empty() {
537                return true;
538            }
539        }
540        false
541    } else {
542        let mut frontier = vec![doc.root_element()];
543        for (index, segment) in segments.iter().enumerate() {
544            let mut next_frontier = Vec::new();
545            for parent in &frontier {
546                if index == 0 {
547                    if segment_matches(*parent, segment) {
548                        next_frontier.push(*parent);
549                    }
550                    continue;
551                }
552                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553                    next_frontier.push(child);
554                }
555            }
556            if next_frontier.is_empty() {
557                return false;
558            }
559            frontier = next_frontier;
560        }
561        !frontier.is_empty()
562    }
563}
564
565/// Evaluate a custom matcher expression
566fn evaluate_custom_matcher(
567    expression: &str,
568    method: &str,
569    path: &str,
570    headers: &std::collections::HashMap<String, String>,
571    query_params: &std::collections::HashMap<String, String>,
572    body: Option<&[u8]>,
573) -> bool {
574    use regex::Regex;
575
576    let expr = expression.trim();
577
578    // Handle equality expressions (field == "value")
579    if expr.contains("==") {
580        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581        if parts.len() != 2 {
582            return false;
583        }
584
585        let field = parts[0];
586        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588        match field {
589            "method" => method == expected_value,
590            "path" => path == expected_value,
591            _ if field.starts_with("headers.") => {
592                let header_name = &field[8..];
593                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594            }
595            _ if field.starts_with("query.") => {
596                let param_name = &field[6..];
597                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598            }
599            _ => false,
600        }
601    }
602    // Handle regex match expressions (field =~ "pattern")
603    else if expr.contains("=~") {
604        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605        if parts.len() != 2 {
606            return false;
607        }
608
609        let field = parts[0];
610        let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612        if let Ok(re) = Regex::new(pattern) {
613            match field {
614                "method" => re.is_match(method),
615                "path" => re.is_match(path),
616                _ if field.starts_with("headers.") => {
617                    let header_name = &field[8..];
618                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619                }
620                _ if field.starts_with("query.") => {
621                    let param_name = &field[6..];
622                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623                }
624                _ => false,
625            }
626        } else {
627            false
628        }
629    }
630    // Handle contains expressions (field contains "value")
631    else if expr.contains("contains") {
632        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633        if parts.len() != 2 {
634            return false;
635        }
636
637        let field = parts[0];
638        let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640        match field {
641            "path" => path.contains(search_value),
642            _ if field.starts_with("headers.") => {
643                let header_name = &field[8..];
644                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645            }
646            _ if field.starts_with("body") => {
647                if let Some(body_bytes) = body {
648                    let body_str = String::from_utf8_lossy(body_bytes);
649                    body_str.contains(search_value)
650                } else {
651                    false
652                }
653            }
654            _ => false,
655        }
656    } else {
657        // Unknown expression format
658        tracing::warn!("Unknown custom matcher expression format: {}", expr);
659        false
660    }
661}
662
663/// Server statistics
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666    /// Server uptime in seconds
667    pub uptime_seconds: u64,
668    /// Total number of requests processed
669    pub total_requests: u64,
670    /// Number of active mock configurations
671    pub active_mocks: usize,
672    /// Number of currently enabled mocks
673    pub enabled_mocks: usize,
674    /// Number of registered API routes
675    pub registered_routes: usize,
676}
677
678/// Server configuration info
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681    /// MockForge version string
682    pub version: String,
683    /// Server port number
684    pub port: u16,
685    /// Whether an OpenAPI spec is loaded
686    pub has_openapi_spec: bool,
687    /// Optional path to the OpenAPI spec file
688    #[serde(skip_serializing_if = "Option::is_none")]
689    pub spec_path: Option<String>,
690}
691
692/// Shared state for the management API
693#[derive(Clone)]
694pub struct ManagementState {
695    /// Collection of mock configurations
696    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697    /// Optional OpenAPI specification
698    pub spec: Option<Arc<OpenApiSpec>>,
699    /// Optional path to the OpenAPI spec file
700    pub spec_path: Option<String>,
701    /// Server port number
702    pub port: u16,
703    /// Server start time for uptime calculation
704    pub start_time: std::time::Instant,
705    /// Counter for total requests processed
706    pub request_counter: Arc<RwLock<u64>>,
707    /// Optional proxy configuration for migration pipeline
708    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709    /// Optional SMTP registry for email mocking
710    #[cfg(feature = "smtp")]
711    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712    /// Optional MQTT broker for message mocking
713    #[cfg(feature = "mqtt")]
714    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715    /// Optional Kafka broker for event streaming
716    #[cfg(feature = "kafka")]
717    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718    /// Broadcast channel for message events (MQTT & Kafka)
719    #[cfg(any(feature = "mqtt", feature = "kafka"))]
720    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721    /// State machine manager for scenario state machines
722    pub state_machine_manager:
723        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724    /// Optional WebSocket broadcast channel for real-time updates
725    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726    /// Lifecycle hook registry for extensibility
727    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728    /// Rule explanations storage (in-memory for now)
729    pub rule_explanations: Arc<
730        RwLock<
731            std::collections::HashMap<
732                String,
733                mockforge_core::intelligent_behavior::RuleExplanation,
734            >,
735        >,
736    >,
737    /// Optional chaos API state for chaos config management
738    #[cfg(feature = "chaos")]
739    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740    /// Optional server configuration for profile application
741    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742    /// Conformance testing state
743    #[cfg(feature = "conformance")]
744    pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748    /// Create a new management state
749    ///
750    /// # Arguments
751    /// * `spec` - Optional OpenAPI specification
752    /// * `spec_path` - Optional path to the OpenAPI spec file
753    /// * `port` - Server port number
754    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755        Self {
756            mocks: Arc::new(RwLock::new(Vec::new())),
757            spec,
758            spec_path,
759            port,
760            start_time: std::time::Instant::now(),
761            request_counter: Arc::new(RwLock::new(0)),
762            proxy_config: None,
763            #[cfg(feature = "smtp")]
764            smtp_registry: None,
765            #[cfg(feature = "mqtt")]
766            mqtt_broker: None,
767            #[cfg(feature = "kafka")]
768            kafka_broker: None,
769            #[cfg(any(feature = "mqtt", feature = "kafka"))]
770            message_events: {
771                let capacity = get_message_broadcast_capacity();
772                let (tx, _) = broadcast::channel(capacity);
773                Arc::new(tx)
774            },
775            state_machine_manager: Arc::new(RwLock::new(
776                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777            )),
778            ws_broadcast: None,
779            lifecycle_hooks: None,
780            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781            #[cfg(feature = "chaos")]
782            chaos_api_state: None,
783            server_config: None,
784            #[cfg(feature = "conformance")]
785            conformance_state: crate::handlers::conformance::ConformanceState::new(),
786        }
787    }
788
789    /// Add lifecycle hook registry to management state
790    pub fn with_lifecycle_hooks(
791        mut self,
792        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793    ) -> Self {
794        self.lifecycle_hooks = Some(hooks);
795        self
796    }
797
798    /// Add WebSocket broadcast channel to management state
799    pub fn with_ws_broadcast(
800        mut self,
801        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802    ) -> Self {
803        self.ws_broadcast = Some(ws_broadcast);
804        self
805    }
806
807    /// Add proxy configuration to management state
808    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809        self.proxy_config = Some(proxy_config);
810        self
811    }
812
813    #[cfg(feature = "smtp")]
814    /// Add SMTP registry to management state
815    pub fn with_smtp_registry(
816        mut self,
817        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818    ) -> Self {
819        self.smtp_registry = Some(smtp_registry);
820        self
821    }
822
823    #[cfg(feature = "mqtt")]
824    /// Add MQTT broker to management state
825    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826        self.mqtt_broker = Some(mqtt_broker);
827        self
828    }
829
830    #[cfg(feature = "kafka")]
831    /// Add Kafka broker to management state
832    pub fn with_kafka_broker(
833        mut self,
834        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835    ) -> Self {
836        self.kafka_broker = Some(kafka_broker);
837        self
838    }
839
840    #[cfg(feature = "chaos")]
841    /// Add chaos API state to management state
842    pub fn with_chaos_api_state(
843        mut self,
844        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845    ) -> Self {
846        self.chaos_api_state = Some(chaos_api_state);
847        self
848    }
849
850    /// Add server configuration to management state
851    pub fn with_server_config(
852        mut self,
853        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854    ) -> Self {
855        self.server_config = Some(server_config);
856        self
857    }
858}
859
860/// List all mocks
861async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862    let mocks = state.mocks.read().await;
863    Json(serde_json::json!({
864        "mocks": *mocks,
865        "total": mocks.len(),
866        "enabled": mocks.iter().filter(|m| m.enabled).count()
867    }))
868}
869
870/// Get a specific mock by ID
871async fn get_mock(
872    State(state): State<ManagementState>,
873    Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875    let mocks = state.mocks.read().await;
876    mocks
877        .iter()
878        .find(|m| m.id == id)
879        .cloned()
880        .map(Json)
881        .ok_or(StatusCode::NOT_FOUND)
882}
883
884/// Create a new mock
885async fn create_mock(
886    State(state): State<ManagementState>,
887    Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889    let mut mocks = state.mocks.write().await;
890
891    // Generate ID if not provided
892    if mock.id.is_empty() {
893        mock.id = uuid::Uuid::new_v4().to_string();
894    }
895
896    // Check for duplicate ID
897    if mocks.iter().any(|m| m.id == mock.id) {
898        return Err(StatusCode::CONFLICT);
899    }
900
901    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903    // Invoke lifecycle hooks
904    if let Some(hooks) = &state.lifecycle_hooks {
905        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906            id: mock.id.clone(),
907            name: mock.name.clone(),
908            config: serde_json::to_value(&mock).unwrap_or_default(),
909        };
910        hooks.invoke_mock_created(&event).await;
911    }
912
913    mocks.push(mock.clone());
914
915    // Broadcast WebSocket event
916    if let Some(tx) = &state.ws_broadcast {
917        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918    }
919
920    Ok(Json(mock))
921}
922
923/// Update an existing mock
924async fn update_mock(
925    State(state): State<ManagementState>,
926    Path(id): Path<String>,
927    Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929    let mut mocks = state.mocks.write().await;
930
931    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933    // Get old mock for comparison
934    let old_mock = mocks[position].clone();
935
936    info!("Updating mock: {}", id);
937    mocks[position] = updated_mock.clone();
938
939    // Invoke lifecycle hooks
940    if let Some(hooks) = &state.lifecycle_hooks {
941        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942            id: updated_mock.id.clone(),
943            name: updated_mock.name.clone(),
944            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945        };
946        hooks.invoke_mock_updated(&event).await;
947
948        // Check if enabled state changed
949        if old_mock.enabled != updated_mock.enabled {
950            let state_event = if updated_mock.enabled {
951                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952                    id: updated_mock.id.clone(),
953                }
954            } else {
955                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956                    id: updated_mock.id.clone(),
957                }
958            };
959            hooks.invoke_mock_state_changed(&state_event).await;
960        }
961    }
962
963    // Broadcast WebSocket event
964    if let Some(tx) = &state.ws_broadcast {
965        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966    }
967
968    Ok(Json(updated_mock))
969}
970
971/// Delete a mock
972async fn delete_mock(
973    State(state): State<ManagementState>,
974    Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976    let mut mocks = state.mocks.write().await;
977
978    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980    // Get mock info before deletion for lifecycle hooks
981    let deleted_mock = mocks[position].clone();
982
983    info!("Deleting mock: {}", id);
984    mocks.remove(position);
985
986    // Invoke lifecycle hooks
987    if let Some(hooks) = &state.lifecycle_hooks {
988        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989            id: deleted_mock.id.clone(),
990            name: deleted_mock.name.clone(),
991        };
992        hooks.invoke_mock_deleted(&event).await;
993    }
994
995    // Broadcast WebSocket event
996    if let Some(tx) = &state.ws_broadcast {
997        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998    }
999
1000    Ok(StatusCode::NO_CONTENT)
1001}
1002
1003/// Request to validate configuration
1004#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006    /// Configuration to validate (as JSON)
1007    pub config: serde_json::Value,
1008    /// Format of the configuration ("json" or "yaml")
1009    #[serde(default = "default_format")]
1010    pub format: String,
1011}
1012
1013fn default_format() -> String {
1014    "json".to_string()
1015}
1016
1017/// Validate configuration without applying it
1018async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019    use mockforge_core::config::ServerConfig;
1020
1021    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022        "yaml" | "yml" => {
1023            let yaml_str = match serde_json::to_string(&request.config) {
1024                Ok(s) => s,
1025                Err(e) => {
1026                    return (
1027                        StatusCode::BAD_REQUEST,
1028                        Json(serde_json::json!({
1029                            "valid": false,
1030                            "error": format!("Failed to convert to string: {}", e),
1031                            "message": "Configuration validation failed"
1032                        })),
1033                    )
1034                        .into_response();
1035                }
1036            };
1037            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038        }
1039        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040    };
1041
1042    match config_result {
1043        Ok(_) => Json(serde_json::json!({
1044            "valid": true,
1045            "message": "Configuration is valid"
1046        }))
1047        .into_response(),
1048        Err(e) => (
1049            StatusCode::BAD_REQUEST,
1050            Json(serde_json::json!({
1051                "valid": false,
1052                "error": format!("Invalid configuration: {}", e),
1053                "message": "Configuration validation failed"
1054            })),
1055        )
1056            .into_response(),
1057    }
1058}
1059
1060/// Request for bulk configuration update
1061#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063    /// Partial configuration updates (only specified fields will be updated)
1064    pub updates: serde_json::Value,
1065}
1066
1067/// Bulk update configuration
1068///
1069/// This endpoint allows updating multiple configuration options at once.
1070/// Only the specified fields in the updates object will be modified.
1071///
1072/// Configuration updates are applied to the server configuration if available
1073/// in ManagementState. Changes take effect immediately for supported settings.
1074async fn bulk_update_config(
1075    State(state): State<ManagementState>,
1076    Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078    // Validate the updates structure
1079    if !request.updates.is_object() {
1080        return (
1081            StatusCode::BAD_REQUEST,
1082            Json(serde_json::json!({
1083                "error": "Invalid request",
1084                "message": "Updates must be a JSON object"
1085            })),
1086        )
1087            .into_response();
1088    }
1089
1090    // Try to validate as partial ServerConfig
1091    use mockforge_core::config::ServerConfig;
1092
1093    // Create a minimal valid config and try to merge updates
1094    let base_config = ServerConfig::default();
1095    let base_json = match serde_json::to_value(&base_config) {
1096        Ok(v) => v,
1097        Err(e) => {
1098            return (
1099                StatusCode::INTERNAL_SERVER_ERROR,
1100                Json(serde_json::json!({
1101                    "error": "Internal error",
1102                    "message": format!("Failed to serialize base config: {}", e)
1103                })),
1104            )
1105                .into_response();
1106        }
1107    };
1108
1109    // Merge updates into base config (simplified merge)
1110    let mut merged = base_json.clone();
1111    if let (Some(merged_obj), Some(updates_obj)) =
1112        (merged.as_object_mut(), request.updates.as_object())
1113    {
1114        for (key, value) in updates_obj {
1115            merged_obj.insert(key.clone(), value.clone());
1116        }
1117    }
1118
1119    // Validate the merged config
1120    match serde_json::from_value::<ServerConfig>(merged) {
1121        Ok(validated_config) => {
1122            // Apply config if server_config is available in ManagementState
1123            if let Some(ref config_lock) = state.server_config {
1124                let mut config = config_lock.write().await;
1125                *config = validated_config;
1126                Json(serde_json::json!({
1127                    "success": true,
1128                    "message": "Bulk configuration update applied successfully",
1129                    "updates_received": request.updates,
1130                    "validated": true,
1131                    "applied": true
1132                }))
1133                .into_response()
1134            } else {
1135                Json(serde_json::json!({
1136                    "success": true,
1137                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138                    "updates_received": request.updates,
1139                    "validated": true,
1140                    "applied": false
1141                }))
1142                .into_response()
1143            }
1144        }
1145        Err(e) => (
1146            StatusCode::BAD_REQUEST,
1147            Json(serde_json::json!({
1148                "error": "Invalid configuration",
1149                "message": format!("Configuration validation failed: {}", e),
1150                "validated": false
1151            })),
1152        )
1153            .into_response(),
1154    }
1155}
1156
1157/// Get server statistics
1158async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159    let mocks = state.mocks.read().await;
1160    let request_count = *state.request_counter.read().await;
1161
1162    Json(ServerStats {
1163        uptime_seconds: state.start_time.elapsed().as_secs(),
1164        total_requests: request_count,
1165        active_mocks: mocks.len(),
1166        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1168    })
1169}
1170
1171/// Get server configuration
1172async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173    Json(ServerConfig {
1174        version: env!("CARGO_PKG_VERSION").to_string(),
1175        port: state.port,
1176        has_openapi_spec: state.spec.is_some(),
1177        spec_path: state.spec_path.clone(),
1178    })
1179}
1180
1181/// Serve the loaded OpenAPI spec as JSON
1182async fn get_openapi_spec(
1183    State(state): State<ManagementState>,
1184) -> Result<Json<serde_json::Value>, StatusCode> {
1185    match &state.spec {
1186        Some(spec) => match &spec.raw_document {
1187            Some(doc) => Ok(Json(doc.clone())),
1188            None => {
1189                // Fall back to serializing the parsed spec
1190                match serde_json::to_value(&spec.spec) {
1191                    Ok(val) => Ok(Json(val)),
1192                    Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1193                }
1194            }
1195        },
1196        None => Err(StatusCode::NOT_FOUND),
1197    }
1198}
1199
1200/// Health check endpoint
1201async fn health_check() -> Json<serde_json::Value> {
1202    Json(serde_json::json!({
1203        "status": "healthy",
1204        "service": "mockforge-management",
1205        "timestamp": chrono::Utc::now().to_rfc3339()
1206    }))
1207}
1208
1209/// Report which features and protocols are available in this build.
1210///
1211/// The UI queries this on startup to auto-stub any missing features
1212/// instead of maintaining hardcoded prefix lists.
1213async fn get_capabilities() -> Json<serde_json::Value> {
1214    let mut features = vec!["core", "http", "management", "mocks", "proxy", "ai"];
1215
1216    #[cfg(feature = "smtp")]
1217    features.push("smtp");
1218    #[cfg(feature = "mqtt")]
1219    features.push("mqtt");
1220    #[cfg(feature = "kafka")]
1221    features.push("kafka");
1222    #[cfg(feature = "conformance")]
1223    features.push("conformance");
1224    #[cfg(feature = "behavioral-cloning")]
1225    features.push("behavioral-cloning");
1226
1227    // Always-available subsystems (registered unconditionally in the router)
1228    features.extend_from_slice(&[
1229        "chaos",
1230        "network-profiles",
1231        "state-machines",
1232        "migration",
1233        "snapshot-diff",
1234        "mockai",
1235    ]);
1236
1237    Json(serde_json::json!({
1238        "features": features,
1239        "version": env!("CARGO_PKG_VERSION"),
1240    }))
1241}
1242
1243/// Export format for mock configurations
1244#[derive(Debug, Clone, Serialize, Deserialize)]
1245#[serde(rename_all = "lowercase")]
1246pub enum ExportFormat {
1247    /// JSON format
1248    Json,
1249    /// YAML format
1250    Yaml,
1251}
1252
1253/// Export mocks in specified format
1254async fn export_mocks(
1255    State(state): State<ManagementState>,
1256    Query(params): Query<std::collections::HashMap<String, String>>,
1257) -> Result<(StatusCode, String), StatusCode> {
1258    let mocks = state.mocks.read().await;
1259
1260    let format = params
1261        .get("format")
1262        .map(|f| match f.as_str() {
1263            "yaml" | "yml" => ExportFormat::Yaml,
1264            _ => ExportFormat::Json,
1265        })
1266        .unwrap_or(ExportFormat::Json);
1267
1268    match format {
1269        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1270            .map(|json| (StatusCode::OK, json))
1271            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1272        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1273            .map(|yaml| (StatusCode::OK, yaml))
1274            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1275    }
1276}
1277
1278/// Import mocks from JSON/YAML
1279async fn import_mocks(
1280    State(state): State<ManagementState>,
1281    Json(mocks): Json<Vec<MockConfig>>,
1282) -> impl IntoResponse {
1283    let mut current_mocks = state.mocks.write().await;
1284    current_mocks.clear();
1285    current_mocks.extend(mocks);
1286    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1287}
1288
1289#[cfg(feature = "smtp")]
1290/// List SMTP emails in mailbox
1291async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1292    if let Some(ref smtp_registry) = state.smtp_registry {
1293        match smtp_registry.get_emails() {
1294            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1295            Err(e) => (
1296                StatusCode::INTERNAL_SERVER_ERROR,
1297                Json(serde_json::json!({
1298                    "error": "Failed to retrieve emails",
1299                    "message": e.to_string()
1300                })),
1301            ),
1302        }
1303    } else {
1304        (
1305            StatusCode::NOT_IMPLEMENTED,
1306            Json(serde_json::json!({
1307                "error": "SMTP mailbox management not available",
1308                "message": "SMTP server is not enabled or registry not available."
1309            })),
1310        )
1311    }
1312}
1313
1314/// Get specific SMTP email
1315#[cfg(feature = "smtp")]
1316async fn get_smtp_email(
1317    State(state): State<ManagementState>,
1318    Path(id): Path<String>,
1319) -> impl IntoResponse {
1320    if let Some(ref smtp_registry) = state.smtp_registry {
1321        match smtp_registry.get_email_by_id(&id) {
1322            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1323            Ok(None) => (
1324                StatusCode::NOT_FOUND,
1325                Json(serde_json::json!({
1326                    "error": "Email not found",
1327                    "id": id
1328                })),
1329            ),
1330            Err(e) => (
1331                StatusCode::INTERNAL_SERVER_ERROR,
1332                Json(serde_json::json!({
1333                    "error": "Failed to retrieve email",
1334                    "message": e.to_string()
1335                })),
1336            ),
1337        }
1338    } else {
1339        (
1340            StatusCode::NOT_IMPLEMENTED,
1341            Json(serde_json::json!({
1342                "error": "SMTP mailbox management not available",
1343                "message": "SMTP server is not enabled or registry not available."
1344            })),
1345        )
1346    }
1347}
1348
1349/// Clear SMTP mailbox
1350#[cfg(feature = "smtp")]
1351async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1352    if let Some(ref smtp_registry) = state.smtp_registry {
1353        match smtp_registry.clear_mailbox() {
1354            Ok(()) => (
1355                StatusCode::OK,
1356                Json(serde_json::json!({
1357                    "message": "Mailbox cleared successfully"
1358                })),
1359            ),
1360            Err(e) => (
1361                StatusCode::INTERNAL_SERVER_ERROR,
1362                Json(serde_json::json!({
1363                    "error": "Failed to clear mailbox",
1364                    "message": e.to_string()
1365                })),
1366            ),
1367        }
1368    } else {
1369        (
1370            StatusCode::NOT_IMPLEMENTED,
1371            Json(serde_json::json!({
1372                "error": "SMTP mailbox management not available",
1373                "message": "SMTP server is not enabled or registry not available."
1374            })),
1375        )
1376    }
1377}
1378
1379/// Export SMTP mailbox
1380#[cfg(feature = "smtp")]
1381async fn export_smtp_mailbox(
1382    Query(params): Query<std::collections::HashMap<String, String>>,
1383) -> impl IntoResponse {
1384    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1385    (
1386        StatusCode::NOT_IMPLEMENTED,
1387        Json(serde_json::json!({
1388            "error": "SMTP mailbox management not available via HTTP API",
1389            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1390            "requested_format": format
1391        })),
1392    )
1393}
1394
1395/// Search SMTP emails
1396#[cfg(feature = "smtp")]
1397async fn search_smtp_emails(
1398    State(state): State<ManagementState>,
1399    Query(params): Query<std::collections::HashMap<String, String>>,
1400) -> impl IntoResponse {
1401    if let Some(ref smtp_registry) = state.smtp_registry {
1402        let filters = EmailSearchFilters {
1403            sender: params.get("sender").cloned(),
1404            recipient: params.get("recipient").cloned(),
1405            subject: params.get("subject").cloned(),
1406            body: params.get("body").cloned(),
1407            since: params
1408                .get("since")
1409                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1410                .map(|dt| dt.with_timezone(&chrono::Utc)),
1411            until: params
1412                .get("until")
1413                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1414                .map(|dt| dt.with_timezone(&chrono::Utc)),
1415            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1416            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1417        };
1418
1419        match smtp_registry.search_emails(filters) {
1420            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1421            Err(e) => (
1422                StatusCode::INTERNAL_SERVER_ERROR,
1423                Json(serde_json::json!({
1424                    "error": "Failed to search emails",
1425                    "message": e.to_string()
1426                })),
1427            ),
1428        }
1429    } else {
1430        (
1431            StatusCode::NOT_IMPLEMENTED,
1432            Json(serde_json::json!({
1433                "error": "SMTP mailbox management not available",
1434                "message": "SMTP server is not enabled or registry not available."
1435            })),
1436        )
1437    }
1438}
1439
1440/// MQTT broker statistics
1441#[cfg(feature = "mqtt")]
1442#[derive(Debug, Clone, Serialize, Deserialize)]
1443pub struct MqttBrokerStats {
1444    /// Number of connected MQTT clients
1445    pub connected_clients: usize,
1446    /// Number of active MQTT topics
1447    pub active_topics: usize,
1448    /// Number of retained messages
1449    pub retained_messages: usize,
1450    /// Total number of subscriptions
1451    pub total_subscriptions: usize,
1452}
1453
1454/// MQTT management handlers
1455#[cfg(feature = "mqtt")]
1456async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1457    if let Some(broker) = &state.mqtt_broker {
1458        let connected_clients = broker.get_connected_clients().await.len();
1459        let active_topics = broker.get_active_topics().await.len();
1460        let stats = broker.get_topic_stats().await;
1461
1462        let broker_stats = MqttBrokerStats {
1463            connected_clients,
1464            active_topics,
1465            retained_messages: stats.retained_messages,
1466            total_subscriptions: stats.total_subscriptions,
1467        };
1468
1469        Json(broker_stats).into_response()
1470    } else {
1471        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1472    }
1473}
1474
1475#[cfg(feature = "mqtt")]
1476async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1477    if let Some(broker) = &state.mqtt_broker {
1478        let clients = broker.get_connected_clients().await;
1479        Json(serde_json::json!({
1480            "clients": clients
1481        }))
1482        .into_response()
1483    } else {
1484        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1485    }
1486}
1487
1488#[cfg(feature = "mqtt")]
1489async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1490    if let Some(broker) = &state.mqtt_broker {
1491        let topics = broker.get_active_topics().await;
1492        Json(serde_json::json!({
1493            "topics": topics
1494        }))
1495        .into_response()
1496    } else {
1497        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1498    }
1499}
1500
1501#[cfg(feature = "mqtt")]
1502async fn disconnect_mqtt_client(
1503    State(state): State<ManagementState>,
1504    Path(client_id): Path<String>,
1505) -> impl IntoResponse {
1506    if let Some(broker) = &state.mqtt_broker {
1507        match broker.disconnect_client(&client_id).await {
1508            Ok(_) => {
1509                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1510            }
1511            Err(e) => {
1512                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1513                    .into_response()
1514            }
1515        }
1516    } else {
1517        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1518    }
1519}
1520
1521// ========== MQTT Publish Handler ==========
1522
1523#[cfg(feature = "mqtt")]
1524/// Request to publish a single MQTT message
1525#[derive(Debug, Deserialize)]
1526pub struct MqttPublishRequest {
1527    /// Topic to publish to
1528    pub topic: String,
1529    /// Message payload (string or JSON)
1530    pub payload: String,
1531    /// QoS level (0, 1, or 2)
1532    #[serde(default = "default_qos")]
1533    pub qos: u8,
1534    /// Whether to retain the message
1535    #[serde(default)]
1536    pub retain: bool,
1537}
1538
1539#[cfg(feature = "mqtt")]
1540fn default_qos() -> u8 {
1541    0
1542}
1543
1544#[cfg(feature = "mqtt")]
1545/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1546async fn publish_mqtt_message_handler(
1547    State(state): State<ManagementState>,
1548    Json(request): Json<serde_json::Value>,
1549) -> impl IntoResponse {
1550    // Extract fields from JSON manually
1551    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1552    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1553    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1554    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1555
1556    if topic.is_none() || payload.is_none() {
1557        return (
1558            StatusCode::BAD_REQUEST,
1559            Json(serde_json::json!({
1560                "error": "Invalid request",
1561                "message": "Missing required fields: topic and payload"
1562            })),
1563        );
1564    }
1565
1566    let topic = topic.unwrap();
1567    let payload = payload.unwrap();
1568
1569    if let Some(broker) = &state.mqtt_broker {
1570        // Validate QoS
1571        if qos > 2 {
1572            return (
1573                StatusCode::BAD_REQUEST,
1574                Json(serde_json::json!({
1575                    "error": "Invalid QoS",
1576                    "message": "QoS must be 0, 1, or 2"
1577                })),
1578            );
1579        }
1580
1581        // Convert payload to bytes
1582        let payload_bytes = payload.as_bytes().to_vec();
1583        let client_id = "mockforge-management-api".to_string();
1584
1585        let publish_result = broker
1586            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1587            .await
1588            .map_err(|e| format!("{}", e));
1589
1590        match publish_result {
1591            Ok(_) => {
1592                // Emit message event for real-time monitoring
1593                let event = MessageEvent::Mqtt(MqttMessageEvent {
1594                    topic: topic.clone(),
1595                    payload: payload.clone(),
1596                    qos,
1597                    retain,
1598                    timestamp: chrono::Utc::now().to_rfc3339(),
1599                });
1600                let _ = state.message_events.send(event);
1601
1602                (
1603                    StatusCode::OK,
1604                    Json(serde_json::json!({
1605                        "success": true,
1606                        "message": format!("Message published to topic '{}'", topic),
1607                        "topic": topic,
1608                        "qos": qos,
1609                        "retain": retain
1610                    })),
1611                )
1612            }
1613            Err(error_msg) => (
1614                StatusCode::INTERNAL_SERVER_ERROR,
1615                Json(serde_json::json!({
1616                    "error": "Failed to publish message",
1617                    "message": error_msg
1618                })),
1619            ),
1620        }
1621    } else {
1622        (
1623            StatusCode::SERVICE_UNAVAILABLE,
1624            Json(serde_json::json!({
1625                "error": "MQTT broker not available",
1626                "message": "MQTT broker is not enabled or not available."
1627            })),
1628        )
1629    }
1630}
1631
1632#[cfg(not(feature = "mqtt"))]
1633/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1634async fn publish_mqtt_message_handler(
1635    State(_state): State<ManagementState>,
1636    Json(_request): Json<serde_json::Value>,
1637) -> impl IntoResponse {
1638    (
1639        StatusCode::SERVICE_UNAVAILABLE,
1640        Json(serde_json::json!({
1641            "error": "MQTT feature not enabled",
1642            "message": "MQTT support is not compiled into this build"
1643        })),
1644    )
1645}
1646
1647#[cfg(feature = "mqtt")]
1648/// Request to publish multiple MQTT messages
1649#[derive(Debug, Deserialize)]
1650pub struct MqttBatchPublishRequest {
1651    /// List of messages to publish
1652    pub messages: Vec<MqttPublishRequest>,
1653    /// Delay between messages in milliseconds
1654    #[serde(default = "default_delay")]
1655    pub delay_ms: u64,
1656}
1657
1658#[cfg(feature = "mqtt")]
1659fn default_delay() -> u64 {
1660    100
1661}
1662
1663#[cfg(feature = "mqtt")]
1664/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1665async fn publish_mqtt_batch_handler(
1666    State(state): State<ManagementState>,
1667    Json(request): Json<serde_json::Value>,
1668) -> impl IntoResponse {
1669    // Extract fields from JSON manually
1670    let messages_json = request.get("messages").and_then(|v| v.as_array());
1671    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1672
1673    if messages_json.is_none() {
1674        return (
1675            StatusCode::BAD_REQUEST,
1676            Json(serde_json::json!({
1677                "error": "Invalid request",
1678                "message": "Missing required field: messages"
1679            })),
1680        );
1681    }
1682
1683    let messages_json = messages_json.unwrap();
1684
1685    if let Some(broker) = &state.mqtt_broker {
1686        if messages_json.is_empty() {
1687            return (
1688                StatusCode::BAD_REQUEST,
1689                Json(serde_json::json!({
1690                    "error": "Empty batch",
1691                    "message": "At least one message is required"
1692                })),
1693            );
1694        }
1695
1696        let mut results = Vec::new();
1697        let client_id = "mockforge-management-api".to_string();
1698
1699        for (index, msg_json) in messages_json.iter().enumerate() {
1700            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1701            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1702            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1703            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1704
1705            if topic.is_none() || payload.is_none() {
1706                results.push(serde_json::json!({
1707                    "index": index,
1708                    "success": false,
1709                    "error": "Missing required fields: topic and payload"
1710                }));
1711                continue;
1712            }
1713
1714            let topic = topic.unwrap();
1715            let payload = payload.unwrap();
1716
1717            // Validate QoS
1718            if qos > 2 {
1719                results.push(serde_json::json!({
1720                    "index": index,
1721                    "success": false,
1722                    "error": "Invalid QoS (must be 0, 1, or 2)"
1723                }));
1724                continue;
1725            }
1726
1727            // Convert payload to bytes
1728            let payload_bytes = payload.as_bytes().to_vec();
1729
1730            let publish_result = broker
1731                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1732                .await
1733                .map_err(|e| format!("{}", e));
1734
1735            match publish_result {
1736                Ok(_) => {
1737                    // Emit message event
1738                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1739                        topic: topic.clone(),
1740                        payload: payload.clone(),
1741                        qos,
1742                        retain,
1743                        timestamp: chrono::Utc::now().to_rfc3339(),
1744                    });
1745                    let _ = state.message_events.send(event);
1746
1747                    results.push(serde_json::json!({
1748                        "index": index,
1749                        "success": true,
1750                        "topic": topic,
1751                        "qos": qos
1752                    }));
1753                }
1754                Err(error_msg) => {
1755                    results.push(serde_json::json!({
1756                        "index": index,
1757                        "success": false,
1758                        "error": error_msg
1759                    }));
1760                }
1761            }
1762
1763            // Add delay between messages (except for the last one)
1764            if index < messages_json.len() - 1 && delay_ms > 0 {
1765                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1766            }
1767        }
1768
1769        let success_count =
1770            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1771
1772        (
1773            StatusCode::OK,
1774            Json(serde_json::json!({
1775                "success": true,
1776                "total": messages_json.len(),
1777                "succeeded": success_count,
1778                "failed": messages_json.len() - success_count,
1779                "results": results
1780            })),
1781        )
1782    } else {
1783        (
1784            StatusCode::SERVICE_UNAVAILABLE,
1785            Json(serde_json::json!({
1786                "error": "MQTT broker not available",
1787                "message": "MQTT broker is not enabled or not available."
1788            })),
1789        )
1790    }
1791}
1792
1793#[cfg(not(feature = "mqtt"))]
1794/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1795async fn publish_mqtt_batch_handler(
1796    State(_state): State<ManagementState>,
1797    Json(_request): Json<serde_json::Value>,
1798) -> impl IntoResponse {
1799    (
1800        StatusCode::SERVICE_UNAVAILABLE,
1801        Json(serde_json::json!({
1802            "error": "MQTT feature not enabled",
1803            "message": "MQTT support is not compiled into this build"
1804        })),
1805    )
1806}
1807
1808// Migration pipeline handlers
1809
1810/// Request to set migration mode
1811#[derive(Debug, Deserialize)]
1812struct SetMigrationModeRequest {
1813    mode: String,
1814}
1815
1816/// Get all migration routes
1817async fn get_migration_routes(
1818    State(state): State<ManagementState>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820    let proxy_config = match &state.proxy_config {
1821        Some(config) => config,
1822        None => {
1823            return Ok(Json(serde_json::json!({
1824                "error": "Migration not configured. Proxy config not available."
1825            })));
1826        }
1827    };
1828
1829    let config = proxy_config.read().await;
1830    let routes = config.get_migration_routes();
1831
1832    Ok(Json(serde_json::json!({
1833        "routes": routes
1834    })))
1835}
1836
1837/// Toggle a route's migration mode
1838async fn toggle_route_migration(
1839    State(state): State<ManagementState>,
1840    Path(pattern): Path<String>,
1841) -> Result<Json<serde_json::Value>, StatusCode> {
1842    let proxy_config = match &state.proxy_config {
1843        Some(config) => config,
1844        None => {
1845            return Ok(Json(serde_json::json!({
1846                "error": "Migration not configured. Proxy config not available."
1847            })));
1848        }
1849    };
1850
1851    let mut config = proxy_config.write().await;
1852    let new_mode = match config.toggle_route_migration(&pattern) {
1853        Some(mode) => mode,
1854        None => {
1855            return Ok(Json(serde_json::json!({
1856                "error": format!("Route pattern not found: {}", pattern)
1857            })));
1858        }
1859    };
1860
1861    Ok(Json(serde_json::json!({
1862        "pattern": pattern,
1863        "mode": format!("{:?}", new_mode).to_lowercase()
1864    })))
1865}
1866
1867/// Set a route's migration mode explicitly
1868async fn set_route_migration_mode(
1869    State(state): State<ManagementState>,
1870    Path(pattern): Path<String>,
1871    Json(request): Json<SetMigrationModeRequest>,
1872) -> Result<Json<serde_json::Value>, StatusCode> {
1873    let proxy_config = match &state.proxy_config {
1874        Some(config) => config,
1875        None => {
1876            return Ok(Json(serde_json::json!({
1877                "error": "Migration not configured. Proxy config not available."
1878            })));
1879        }
1880    };
1881
1882    use mockforge_core::proxy::config::MigrationMode;
1883    let mode = match request.mode.to_lowercase().as_str() {
1884        "mock" => MigrationMode::Mock,
1885        "shadow" => MigrationMode::Shadow,
1886        "real" => MigrationMode::Real,
1887        "auto" => MigrationMode::Auto,
1888        _ => {
1889            return Ok(Json(serde_json::json!({
1890                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1891            })));
1892        }
1893    };
1894
1895    let mut config = proxy_config.write().await;
1896    let updated = config.update_rule_migration_mode(&pattern, mode);
1897
1898    if !updated {
1899        return Ok(Json(serde_json::json!({
1900            "error": format!("Route pattern not found: {}", pattern)
1901        })));
1902    }
1903
1904    Ok(Json(serde_json::json!({
1905        "pattern": pattern,
1906        "mode": format!("{:?}", mode).to_lowercase()
1907    })))
1908}
1909
1910/// Toggle a group's migration mode
1911async fn toggle_group_migration(
1912    State(state): State<ManagementState>,
1913    Path(group): Path<String>,
1914) -> Result<Json<serde_json::Value>, StatusCode> {
1915    let proxy_config = match &state.proxy_config {
1916        Some(config) => config,
1917        None => {
1918            return Ok(Json(serde_json::json!({
1919                "error": "Migration not configured. Proxy config not available."
1920            })));
1921        }
1922    };
1923
1924    let mut config = proxy_config.write().await;
1925    let new_mode = config.toggle_group_migration(&group);
1926
1927    Ok(Json(serde_json::json!({
1928        "group": group,
1929        "mode": format!("{:?}", new_mode).to_lowercase()
1930    })))
1931}
1932
1933/// Set a group's migration mode explicitly
1934async fn set_group_migration_mode(
1935    State(state): State<ManagementState>,
1936    Path(group): Path<String>,
1937    Json(request): Json<SetMigrationModeRequest>,
1938) -> Result<Json<serde_json::Value>, StatusCode> {
1939    let proxy_config = match &state.proxy_config {
1940        Some(config) => config,
1941        None => {
1942            return Ok(Json(serde_json::json!({
1943                "error": "Migration not configured. Proxy config not available."
1944            })));
1945        }
1946    };
1947
1948    use mockforge_core::proxy::config::MigrationMode;
1949    let mode = match request.mode.to_lowercase().as_str() {
1950        "mock" => MigrationMode::Mock,
1951        "shadow" => MigrationMode::Shadow,
1952        "real" => MigrationMode::Real,
1953        "auto" => MigrationMode::Auto,
1954        _ => {
1955            return Ok(Json(serde_json::json!({
1956                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1957            })));
1958        }
1959    };
1960
1961    let mut config = proxy_config.write().await;
1962    config.update_group_migration_mode(&group, mode);
1963
1964    Ok(Json(serde_json::json!({
1965        "group": group,
1966        "mode": format!("{:?}", mode).to_lowercase()
1967    })))
1968}
1969
1970/// Get all migration groups
1971async fn get_migration_groups(
1972    State(state): State<ManagementState>,
1973) -> Result<Json<serde_json::Value>, StatusCode> {
1974    let proxy_config = match &state.proxy_config {
1975        Some(config) => config,
1976        None => {
1977            return Ok(Json(serde_json::json!({
1978                "error": "Migration not configured. Proxy config not available."
1979            })));
1980        }
1981    };
1982
1983    let config = proxy_config.read().await;
1984    let groups = config.get_migration_groups();
1985
1986    // Convert to JSON-serializable format
1987    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1988        .into_iter()
1989        .map(|(name, info)| {
1990            (
1991                name,
1992                serde_json::json!({
1993                    "name": info.name,
1994                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1995                    "route_count": info.route_count
1996                }),
1997            )
1998        })
1999        .collect();
2000
2001    Ok(Json(serde_json::json!(groups_json)))
2002}
2003
2004/// Get overall migration status
2005async fn get_migration_status(
2006    State(state): State<ManagementState>,
2007) -> Result<Json<serde_json::Value>, StatusCode> {
2008    let proxy_config = match &state.proxy_config {
2009        Some(config) => config,
2010        None => {
2011            return Ok(Json(serde_json::json!({
2012                "error": "Migration not configured. Proxy config not available."
2013            })));
2014        }
2015    };
2016
2017    let config = proxy_config.read().await;
2018    let routes = config.get_migration_routes();
2019    let groups = config.get_migration_groups();
2020
2021    let mut mock_count = 0;
2022    let mut shadow_count = 0;
2023    let mut real_count = 0;
2024    let mut auto_count = 0;
2025
2026    for route in &routes {
2027        match route.migration_mode {
2028            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
2029            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
2030            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
2031            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
2032        }
2033    }
2034
2035    Ok(Json(serde_json::json!({
2036        "total_routes": routes.len(),
2037        "mock_routes": mock_count,
2038        "shadow_routes": shadow_count,
2039        "real_routes": real_count,
2040        "auto_routes": auto_count,
2041        "total_groups": groups.len(),
2042        "migration_enabled": config.migration_enabled
2043    })))
2044}
2045
2046// ========== Proxy Replacement Rules Management ==========
2047
2048/// Request body for creating/updating proxy replacement rules
2049#[derive(Debug, Deserialize, Serialize)]
2050pub struct ProxyRuleRequest {
2051    /// URL pattern to match (supports wildcards like "/api/users/*")
2052    pub pattern: String,
2053    /// Rule type: "request" or "response"
2054    #[serde(rename = "type")]
2055    pub rule_type: String,
2056    /// Optional status code filter for response rules
2057    #[serde(default)]
2058    pub status_codes: Vec<u16>,
2059    /// Body transformations to apply
2060    pub body_transforms: Vec<BodyTransformRequest>,
2061    /// Whether this rule is enabled
2062    #[serde(default = "default_true")]
2063    pub enabled: bool,
2064}
2065
2066/// Request body for individual body transformations
2067#[derive(Debug, Deserialize, Serialize)]
2068pub struct BodyTransformRequest {
2069    /// JSONPath expression to target (e.g., "$.userId", "$.email")
2070    pub path: String,
2071    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
2072    pub replace: String,
2073    /// Operation to perform: "replace", "add", or "remove"
2074    #[serde(default)]
2075    pub operation: String,
2076}
2077
2078/// Response format for proxy rules
2079#[derive(Debug, Serialize)]
2080pub struct ProxyRuleResponse {
2081    /// Rule ID (index in the array)
2082    pub id: usize,
2083    /// URL pattern
2084    pub pattern: String,
2085    /// Rule type
2086    #[serde(rename = "type")]
2087    pub rule_type: String,
2088    /// Status codes (for response rules)
2089    pub status_codes: Vec<u16>,
2090    /// Body transformations
2091    pub body_transforms: Vec<BodyTransformRequest>,
2092    /// Whether enabled
2093    pub enabled: bool,
2094}
2095
2096/// List all proxy replacement rules
2097async fn list_proxy_rules(
2098    State(state): State<ManagementState>,
2099) -> Result<Json<serde_json::Value>, StatusCode> {
2100    let proxy_config = match &state.proxy_config {
2101        Some(config) => config,
2102        None => {
2103            return Ok(Json(serde_json::json!({
2104                "error": "Proxy not configured. Proxy config not available."
2105            })));
2106        }
2107    };
2108
2109    let config = proxy_config.read().await;
2110
2111    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2112
2113    // Add request replacement rules
2114    for (idx, rule) in config.request_replacements.iter().enumerate() {
2115        rules.push(ProxyRuleResponse {
2116            id: idx,
2117            pattern: rule.pattern.clone(),
2118            rule_type: "request".to_string(),
2119            status_codes: Vec::new(),
2120            body_transforms: rule
2121                .body_transforms
2122                .iter()
2123                .map(|t| BodyTransformRequest {
2124                    path: t.path.clone(),
2125                    replace: t.replace.clone(),
2126                    operation: format!("{:?}", t.operation).to_lowercase(),
2127                })
2128                .collect(),
2129            enabled: rule.enabled,
2130        });
2131    }
2132
2133    // Add response replacement rules
2134    let request_count = config.request_replacements.len();
2135    for (idx, rule) in config.response_replacements.iter().enumerate() {
2136        rules.push(ProxyRuleResponse {
2137            id: request_count + idx,
2138            pattern: rule.pattern.clone(),
2139            rule_type: "response".to_string(),
2140            status_codes: rule.status_codes.clone(),
2141            body_transforms: rule
2142                .body_transforms
2143                .iter()
2144                .map(|t| BodyTransformRequest {
2145                    path: t.path.clone(),
2146                    replace: t.replace.clone(),
2147                    operation: format!("{:?}", t.operation).to_lowercase(),
2148                })
2149                .collect(),
2150            enabled: rule.enabled,
2151        });
2152    }
2153
2154    Ok(Json(serde_json::json!({
2155        "rules": rules
2156    })))
2157}
2158
2159/// Create a new proxy replacement rule
2160async fn create_proxy_rule(
2161    State(state): State<ManagementState>,
2162    Json(request): Json<ProxyRuleRequest>,
2163) -> Result<Json<serde_json::Value>, StatusCode> {
2164    let proxy_config = match &state.proxy_config {
2165        Some(config) => config,
2166        None => {
2167            return Ok(Json(serde_json::json!({
2168                "error": "Proxy not configured. Proxy config not available."
2169            })));
2170        }
2171    };
2172
2173    // Validate request
2174    if request.body_transforms.is_empty() {
2175        return Ok(Json(serde_json::json!({
2176            "error": "At least one body transform is required"
2177        })));
2178    }
2179
2180    let body_transforms: Vec<BodyTransform> = request
2181        .body_transforms
2182        .iter()
2183        .map(|t| {
2184            let op = match t.operation.as_str() {
2185                "replace" => TransformOperation::Replace,
2186                "add" => TransformOperation::Add,
2187                "remove" => TransformOperation::Remove,
2188                _ => TransformOperation::Replace,
2189            };
2190            BodyTransform {
2191                path: t.path.clone(),
2192                replace: t.replace.clone(),
2193                operation: op,
2194            }
2195        })
2196        .collect();
2197
2198    let new_rule = BodyTransformRule {
2199        pattern: request.pattern.clone(),
2200        status_codes: request.status_codes.clone(),
2201        body_transforms,
2202        enabled: request.enabled,
2203    };
2204
2205    let mut config = proxy_config.write().await;
2206
2207    let rule_id = if request.rule_type == "request" {
2208        config.request_replacements.push(new_rule);
2209        config.request_replacements.len() - 1
2210    } else if request.rule_type == "response" {
2211        config.response_replacements.push(new_rule);
2212        config.request_replacements.len() + config.response_replacements.len() - 1
2213    } else {
2214        return Ok(Json(serde_json::json!({
2215            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2216        })));
2217    };
2218
2219    Ok(Json(serde_json::json!({
2220        "id": rule_id,
2221        "message": "Rule created successfully"
2222    })))
2223}
2224
2225/// Get a specific proxy replacement rule
2226async fn get_proxy_rule(
2227    State(state): State<ManagementState>,
2228    Path(id): Path<String>,
2229) -> Result<Json<serde_json::Value>, StatusCode> {
2230    let proxy_config = match &state.proxy_config {
2231        Some(config) => config,
2232        None => {
2233            return Ok(Json(serde_json::json!({
2234                "error": "Proxy not configured. Proxy config not available."
2235            })));
2236        }
2237    };
2238
2239    let config = proxy_config.read().await;
2240    let rule_id: usize = match id.parse() {
2241        Ok(id) => id,
2242        Err(_) => {
2243            return Ok(Json(serde_json::json!({
2244                "error": format!("Invalid rule ID: {}", id)
2245            })));
2246        }
2247    };
2248
2249    let request_count = config.request_replacements.len();
2250
2251    if rule_id < request_count {
2252        // Request rule
2253        let rule = &config.request_replacements[rule_id];
2254        Ok(Json(serde_json::json!({
2255            "id": rule_id,
2256            "pattern": rule.pattern,
2257            "type": "request",
2258            "status_codes": [],
2259            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2260                "path": t.path,
2261                "replace": t.replace,
2262                "operation": format!("{:?}", t.operation).to_lowercase()
2263            })).collect::<Vec<_>>(),
2264            "enabled": rule.enabled
2265        })))
2266    } else if rule_id < request_count + config.response_replacements.len() {
2267        // Response rule
2268        let response_idx = rule_id - request_count;
2269        let rule = &config.response_replacements[response_idx];
2270        Ok(Json(serde_json::json!({
2271            "id": rule_id,
2272            "pattern": rule.pattern,
2273            "type": "response",
2274            "status_codes": rule.status_codes,
2275            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2276                "path": t.path,
2277                "replace": t.replace,
2278                "operation": format!("{:?}", t.operation).to_lowercase()
2279            })).collect::<Vec<_>>(),
2280            "enabled": rule.enabled
2281        })))
2282    } else {
2283        Ok(Json(serde_json::json!({
2284            "error": format!("Rule ID {} not found", rule_id)
2285        })))
2286    }
2287}
2288
2289/// Update a proxy replacement rule
2290async fn update_proxy_rule(
2291    State(state): State<ManagementState>,
2292    Path(id): Path<String>,
2293    Json(request): Json<ProxyRuleRequest>,
2294) -> Result<Json<serde_json::Value>, StatusCode> {
2295    let proxy_config = match &state.proxy_config {
2296        Some(config) => config,
2297        None => {
2298            return Ok(Json(serde_json::json!({
2299                "error": "Proxy not configured. Proxy config not available."
2300            })));
2301        }
2302    };
2303
2304    let mut config = proxy_config.write().await;
2305    let rule_id: usize = match id.parse() {
2306        Ok(id) => id,
2307        Err(_) => {
2308            return Ok(Json(serde_json::json!({
2309                "error": format!("Invalid rule ID: {}", id)
2310            })));
2311        }
2312    };
2313
2314    let body_transforms: Vec<BodyTransform> = request
2315        .body_transforms
2316        .iter()
2317        .map(|t| {
2318            let op = match t.operation.as_str() {
2319                "replace" => TransformOperation::Replace,
2320                "add" => TransformOperation::Add,
2321                "remove" => TransformOperation::Remove,
2322                _ => TransformOperation::Replace,
2323            };
2324            BodyTransform {
2325                path: t.path.clone(),
2326                replace: t.replace.clone(),
2327                operation: op,
2328            }
2329        })
2330        .collect();
2331
2332    let updated_rule = BodyTransformRule {
2333        pattern: request.pattern.clone(),
2334        status_codes: request.status_codes.clone(),
2335        body_transforms,
2336        enabled: request.enabled,
2337    };
2338
2339    let request_count = config.request_replacements.len();
2340
2341    if rule_id < request_count {
2342        // Update request rule
2343        config.request_replacements[rule_id] = updated_rule;
2344    } else if rule_id < request_count + config.response_replacements.len() {
2345        // Update response rule
2346        let response_idx = rule_id - request_count;
2347        config.response_replacements[response_idx] = updated_rule;
2348    } else {
2349        return Ok(Json(serde_json::json!({
2350            "error": format!("Rule ID {} not found", rule_id)
2351        })));
2352    }
2353
2354    Ok(Json(serde_json::json!({
2355        "id": rule_id,
2356        "message": "Rule updated successfully"
2357    })))
2358}
2359
2360/// Delete a proxy replacement rule
2361async fn delete_proxy_rule(
2362    State(state): State<ManagementState>,
2363    Path(id): Path<String>,
2364) -> Result<Json<serde_json::Value>, StatusCode> {
2365    let proxy_config = match &state.proxy_config {
2366        Some(config) => config,
2367        None => {
2368            return Ok(Json(serde_json::json!({
2369                "error": "Proxy not configured. Proxy config not available."
2370            })));
2371        }
2372    };
2373
2374    let mut config = proxy_config.write().await;
2375    let rule_id: usize = match id.parse() {
2376        Ok(id) => id,
2377        Err(_) => {
2378            return Ok(Json(serde_json::json!({
2379                "error": format!("Invalid rule ID: {}", id)
2380            })));
2381        }
2382    };
2383
2384    let request_count = config.request_replacements.len();
2385
2386    if rule_id < request_count {
2387        // Delete request rule
2388        config.request_replacements.remove(rule_id);
2389    } else if rule_id < request_count + config.response_replacements.len() {
2390        // Delete response rule
2391        let response_idx = rule_id - request_count;
2392        config.response_replacements.remove(response_idx);
2393    } else {
2394        return Ok(Json(serde_json::json!({
2395            "error": format!("Rule ID {} not found", rule_id)
2396        })));
2397    }
2398
2399    Ok(Json(serde_json::json!({
2400        "id": rule_id,
2401        "message": "Rule deleted successfully"
2402    })))
2403}
2404
2405/// Get proxy rules and transformation configuration for inspection
2406async fn get_proxy_inspect(
2407    State(state): State<ManagementState>,
2408    Query(params): Query<std::collections::HashMap<String, String>>,
2409) -> Result<Json<serde_json::Value>, StatusCode> {
2410    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2411    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2412
2413    let proxy_config = match &state.proxy_config {
2414        Some(config) => config.read().await,
2415        None => {
2416            return Ok(Json(serde_json::json!({
2417                "error": "Proxy not configured. Proxy config not available."
2418            })));
2419        }
2420    };
2421
2422    let mut rules = Vec::new();
2423    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2424        rules.push(serde_json::json!({
2425            "id": idx,
2426            "kind": "request",
2427            "pattern": rule.pattern,
2428            "enabled": rule.enabled,
2429            "status_codes": rule.status_codes,
2430            "transform_count": rule.body_transforms.len(),
2431            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2432                "path": t.path,
2433                "operation": t.operation,
2434                "replace": t.replace
2435            })).collect::<Vec<_>>()
2436        }));
2437    }
2438    let request_rule_count = rules.len();
2439    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2440        rules.push(serde_json::json!({
2441            "id": request_rule_count + idx,
2442            "kind": "response",
2443            "pattern": rule.pattern,
2444            "enabled": rule.enabled,
2445            "status_codes": rule.status_codes,
2446            "transform_count": rule.body_transforms.len(),
2447            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2448                "path": t.path,
2449                "operation": t.operation,
2450                "replace": t.replace
2451            })).collect::<Vec<_>>()
2452        }));
2453    }
2454
2455    let total = rules.len();
2456    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2457
2458    Ok(Json(serde_json::json!({
2459        "enabled": proxy_config.enabled,
2460        "target_url": proxy_config.target_url,
2461        "prefix": proxy_config.prefix,
2462        "timeout_seconds": proxy_config.timeout_seconds,
2463        "follow_redirects": proxy_config.follow_redirects,
2464        "passthrough_by_default": proxy_config.passthrough_by_default,
2465        "rules": paged_rules,
2466        "request_rule_count": request_rule_count,
2467        "response_rule_count": total.saturating_sub(request_rule_count),
2468        "limit": limit,
2469        "offset": offset,
2470        "total": total
2471    })))
2472}
2473
2474/// Build the management API router
2475pub fn management_router(state: ManagementState) -> Router {
2476    let router = Router::new()
2477        .route("/capabilities", get(get_capabilities))
2478        .route("/health", get(health_check))
2479        .route("/stats", get(get_stats))
2480        .route("/config", get(get_config))
2481        .route("/config/validate", post(validate_config))
2482        .route("/config/bulk", post(bulk_update_config))
2483        .route("/mocks", get(list_mocks))
2484        .route("/mocks", post(create_mock))
2485        .route("/mocks/{id}", get(get_mock))
2486        .route("/mocks/{id}", put(update_mock))
2487        .route("/mocks/{id}", delete(delete_mock))
2488        .route("/export", get(export_mocks))
2489        .route("/import", post(import_mocks))
2490        .route("/spec", get(get_openapi_spec));
2491
2492    #[cfg(feature = "smtp")]
2493    let router = router
2494        .route("/smtp/mailbox", get(list_smtp_emails))
2495        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2496        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2497        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2498        .route("/smtp/mailbox/search", get(search_smtp_emails));
2499
2500    #[cfg(not(feature = "smtp"))]
2501    let router = router;
2502
2503    // MQTT routes
2504    #[cfg(feature = "mqtt")]
2505    let router = router
2506        .route("/mqtt/stats", get(get_mqtt_stats))
2507        .route("/mqtt/clients", get(get_mqtt_clients))
2508        .route("/mqtt/topics", get(get_mqtt_topics))
2509        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2510        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2511        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2512        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2513
2514    #[cfg(not(feature = "mqtt"))]
2515    let router = router
2516        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2517        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2518
2519    #[cfg(feature = "kafka")]
2520    let router = router
2521        .route("/kafka/stats", get(get_kafka_stats))
2522        .route("/kafka/topics", get(get_kafka_topics))
2523        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2524        .route("/kafka/groups", get(get_kafka_groups))
2525        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2526        .route("/kafka/produce", post(produce_kafka_message))
2527        .route("/kafka/produce/batch", post(produce_kafka_batch))
2528        .route("/kafka/messages/stream", get(kafka_messages_stream));
2529
2530    #[cfg(not(feature = "kafka"))]
2531    let router = router;
2532
2533    // Migration pipeline routes
2534    let router = router
2535        .route("/migration/routes", get(get_migration_routes))
2536        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2537        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2538        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2539        .route("/migration/groups/{group}", put(set_group_migration_mode))
2540        .route("/migration/groups", get(get_migration_groups))
2541        .route("/migration/status", get(get_migration_status));
2542
2543    // Proxy replacement rules routes
2544    let router = router
2545        .route("/proxy/rules", get(list_proxy_rules))
2546        .route("/proxy/rules", post(create_proxy_rule))
2547        .route("/proxy/rules/{id}", get(get_proxy_rule))
2548        .route("/proxy/rules/{id}", put(update_proxy_rule))
2549        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2550        .route("/proxy/inspect", get(get_proxy_inspect));
2551
2552    // AI-powered features
2553    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2554
2555    // Snapshot diff endpoints
2556    let router = router.nest(
2557        "/snapshot-diff",
2558        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2559    );
2560
2561    #[cfg(feature = "behavioral-cloning")]
2562    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2563
2564    let router = router
2565        .route("/mockai/learn", post(learn_from_examples))
2566        .route("/mockai/rules/explanations", get(list_rule_explanations))
2567        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2568        .route("/chaos/config", get(get_chaos_config))
2569        .route("/chaos/config", post(update_chaos_config))
2570        .route("/network/profiles", get(list_network_profiles))
2571        .route("/network/profile/apply", post(apply_network_profile));
2572
2573    // State machine API routes
2574    let router =
2575        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2576
2577    // Conformance testing API routes
2578    #[cfg(feature = "conformance")]
2579    let router = router.nest_service(
2580        "/conformance",
2581        crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2582    );
2583    #[cfg(not(feature = "conformance"))]
2584    let router = router;
2585
2586    router.with_state(state)
2587}
2588
2589#[cfg(feature = "kafka")]
2590/// Kafka broker statistics
2591#[derive(Debug, Clone, Serialize, Deserialize)]
2592pub struct KafkaBrokerStats {
2593    /// Number of topics
2594    pub topics: usize,
2595    /// Total number of partitions
2596    pub partitions: usize,
2597    /// Number of consumer groups
2598    pub consumer_groups: usize,
2599    /// Total messages produced
2600    pub messages_produced: u64,
2601    /// Total messages consumed
2602    pub messages_consumed: u64,
2603}
2604
2605#[cfg(feature = "kafka")]
2606#[allow(missing_docs)]
2607#[derive(Debug, Clone, Serialize, Deserialize)]
2608pub struct KafkaTopicInfo {
2609    pub name: String,
2610    pub partitions: usize,
2611    pub replication_factor: i32,
2612}
2613
2614#[cfg(feature = "kafka")]
2615#[allow(missing_docs)]
2616#[derive(Debug, Clone, Serialize, Deserialize)]
2617pub struct KafkaConsumerGroupInfo {
2618    pub group_id: String,
2619    pub members: usize,
2620    pub state: String,
2621}
2622
2623#[cfg(feature = "kafka")]
2624/// Get Kafka broker statistics
2625async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2626    if let Some(broker) = &state.kafka_broker {
2627        let topics = broker.topics.read().await;
2628        let consumer_groups = broker.consumer_groups.read().await;
2629
2630        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2631
2632        // Get metrics snapshot for message counts
2633        let metrics_snapshot = broker.metrics().snapshot();
2634
2635        let stats = KafkaBrokerStats {
2636            topics: topics.len(),
2637            partitions: total_partitions,
2638            consumer_groups: consumer_groups.groups().len(),
2639            messages_produced: metrics_snapshot.messages_produced_total,
2640            messages_consumed: metrics_snapshot.messages_consumed_total,
2641        };
2642
2643        Json(stats).into_response()
2644    } else {
2645        (
2646            StatusCode::SERVICE_UNAVAILABLE,
2647            Json(serde_json::json!({
2648                "error": "Kafka broker not available",
2649                "message": "Kafka broker is not enabled or not available."
2650            })),
2651        )
2652            .into_response()
2653    }
2654}
2655
2656#[cfg(feature = "kafka")]
2657/// List Kafka topics
2658async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2659    if let Some(broker) = &state.kafka_broker {
2660        let topics = broker.topics.read().await;
2661        let topic_list: Vec<KafkaTopicInfo> = topics
2662            .iter()
2663            .map(|(name, topic)| KafkaTopicInfo {
2664                name: name.clone(),
2665                partitions: topic.partitions.len(),
2666                replication_factor: topic.config.replication_factor as i32,
2667            })
2668            .collect();
2669
2670        Json(serde_json::json!({
2671            "topics": topic_list
2672        }))
2673        .into_response()
2674    } else {
2675        (
2676            StatusCode::SERVICE_UNAVAILABLE,
2677            Json(serde_json::json!({
2678                "error": "Kafka broker not available",
2679                "message": "Kafka broker is not enabled or not available."
2680            })),
2681        )
2682            .into_response()
2683    }
2684}
2685
2686#[cfg(feature = "kafka")]
2687/// Get Kafka topic details
2688async fn get_kafka_topic(
2689    State(state): State<ManagementState>,
2690    Path(topic_name): Path<String>,
2691) -> impl IntoResponse {
2692    if let Some(broker) = &state.kafka_broker {
2693        let topics = broker.topics.read().await;
2694        if let Some(topic) = topics.get(&topic_name) {
2695            Json(serde_json::json!({
2696                "name": topic_name,
2697                "partitions": topic.partitions.len(),
2698                "replication_factor": topic.config.replication_factor,
2699                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2700                    "id": idx as i32,
2701                    "leader": 0,
2702                    "replicas": vec![0],
2703                    "message_count": partition.messages.len()
2704                })).collect::<Vec<_>>()
2705            })).into_response()
2706        } else {
2707            (
2708                StatusCode::NOT_FOUND,
2709                Json(serde_json::json!({
2710                    "error": "Topic not found",
2711                    "topic": topic_name
2712                })),
2713            )
2714                .into_response()
2715        }
2716    } else {
2717        (
2718            StatusCode::SERVICE_UNAVAILABLE,
2719            Json(serde_json::json!({
2720                "error": "Kafka broker not available",
2721                "message": "Kafka broker is not enabled or not available."
2722            })),
2723        )
2724            .into_response()
2725    }
2726}
2727
2728#[cfg(feature = "kafka")]
2729/// List Kafka consumer groups
2730async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2731    if let Some(broker) = &state.kafka_broker {
2732        let consumer_groups = broker.consumer_groups.read().await;
2733        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2734            .groups()
2735            .iter()
2736            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2737                group_id: group_id.clone(),
2738                members: group.members.len(),
2739                state: "Stable".to_string(), // Simplified - could be more detailed
2740            })
2741            .collect();
2742
2743        Json(serde_json::json!({
2744            "groups": groups
2745        }))
2746        .into_response()
2747    } else {
2748        (
2749            StatusCode::SERVICE_UNAVAILABLE,
2750            Json(serde_json::json!({
2751                "error": "Kafka broker not available",
2752                "message": "Kafka broker is not enabled or not available."
2753            })),
2754        )
2755            .into_response()
2756    }
2757}
2758
2759#[cfg(feature = "kafka")]
2760/// Get Kafka consumer group details
2761async fn get_kafka_group(
2762    State(state): State<ManagementState>,
2763    Path(group_id): Path<String>,
2764) -> impl IntoResponse {
2765    if let Some(broker) = &state.kafka_broker {
2766        let consumer_groups = broker.consumer_groups.read().await;
2767        if let Some(group) = consumer_groups.groups().get(&group_id) {
2768            Json(serde_json::json!({
2769                "group_id": group_id,
2770                "members": group.members.len(),
2771                "state": "Stable",
2772                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2773                    "member_id": member_id,
2774                    "client_id": member.client_id,
2775                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2776                        "topic": a.topic,
2777                        "partitions": a.partitions
2778                    })).collect::<Vec<_>>()
2779                })).collect::<Vec<_>>(),
2780                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2781                    "topic": topic,
2782                    "partition": partition,
2783                    "offset": offset
2784                })).collect::<Vec<_>>()
2785            })).into_response()
2786        } else {
2787            (
2788                StatusCode::NOT_FOUND,
2789                Json(serde_json::json!({
2790                    "error": "Consumer group not found",
2791                    "group_id": group_id
2792                })),
2793            )
2794                .into_response()
2795        }
2796    } else {
2797        (
2798            StatusCode::SERVICE_UNAVAILABLE,
2799            Json(serde_json::json!({
2800                "error": "Kafka broker not available",
2801                "message": "Kafka broker is not enabled or not available."
2802            })),
2803        )
2804            .into_response()
2805    }
2806}
2807
2808// ========== Kafka Produce Handler ==========
2809
2810#[cfg(feature = "kafka")]
2811/// Request body for producing a Kafka message
2812#[derive(Debug, Deserialize)]
2813pub struct KafkaProduceRequest {
2814    /// Topic to produce to
2815    pub topic: String,
2816    /// Message key (optional)
2817    #[serde(default)]
2818    pub key: Option<String>,
2819    /// Message value (JSON string or plain string)
2820    pub value: String,
2821    /// Partition ID (optional, auto-assigned if not provided)
2822    #[serde(default)]
2823    pub partition: Option<i32>,
2824    /// Message headers (optional, key-value pairs)
2825    #[serde(default)]
2826    pub headers: Option<std::collections::HashMap<String, String>>,
2827}
2828
2829#[cfg(feature = "kafka")]
2830/// Produce a message to a Kafka topic
2831async fn produce_kafka_message(
2832    State(state): State<ManagementState>,
2833    Json(request): Json<KafkaProduceRequest>,
2834) -> impl IntoResponse {
2835    if let Some(broker) = &state.kafka_broker {
2836        let mut topics = broker.topics.write().await;
2837
2838        // Get or create the topic
2839        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2840            mockforge_kafka::topics::Topic::new(
2841                request.topic.clone(),
2842                mockforge_kafka::topics::TopicConfig::default(),
2843            )
2844        });
2845
2846        // Determine partition
2847        let partition_id = if let Some(partition) = request.partition {
2848            partition
2849        } else {
2850            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2851        };
2852
2853        // Validate partition exists
2854        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2855            return (
2856                StatusCode::BAD_REQUEST,
2857                Json(serde_json::json!({
2858                    "error": "Invalid partition",
2859                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2860                })),
2861            )
2862                .into_response();
2863        }
2864
2865        // Create the message
2866        let key_clone = request.key.clone();
2867        let headers_clone = request.headers.clone();
2868        let message = mockforge_kafka::partitions::KafkaMessage {
2869            offset: 0, // Will be set by partition.append
2870            timestamp: chrono::Utc::now().timestamp_millis(),
2871            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2872            value: request.value.as_bytes().to_vec(),
2873            headers: headers_clone
2874                .clone()
2875                .unwrap_or_default()
2876                .into_iter()
2877                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2878                .collect(),
2879        };
2880
2881        // Produce to partition
2882        match topic_entry.produce(partition_id, message).await {
2883            Ok(offset) => {
2884                // Record metrics for successful message production
2885                if let Some(broker) = &state.kafka_broker {
2886                    broker.metrics().record_messages_produced(1);
2887                }
2888
2889                // Emit message event for real-time monitoring
2890                #[cfg(feature = "kafka")]
2891                {
2892                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2893                        topic: request.topic.clone(),
2894                        key: key_clone,
2895                        value: request.value.clone(),
2896                        partition: partition_id,
2897                        offset,
2898                        headers: headers_clone,
2899                        timestamp: chrono::Utc::now().to_rfc3339(),
2900                    });
2901                    let _ = state.message_events.send(event);
2902                }
2903
2904                Json(serde_json::json!({
2905                    "success": true,
2906                    "message": format!("Message produced to topic '{}'", request.topic),
2907                    "topic": request.topic,
2908                    "partition": partition_id,
2909                    "offset": offset
2910                }))
2911                .into_response()
2912            }
2913            Err(e) => (
2914                StatusCode::INTERNAL_SERVER_ERROR,
2915                Json(serde_json::json!({
2916                    "error": "Failed to produce message",
2917                    "message": e.to_string()
2918                })),
2919            )
2920                .into_response(),
2921        }
2922    } else {
2923        (
2924            StatusCode::SERVICE_UNAVAILABLE,
2925            Json(serde_json::json!({
2926                "error": "Kafka broker not available",
2927                "message": "Kafka broker is not enabled or not available."
2928            })),
2929        )
2930            .into_response()
2931    }
2932}
2933
2934#[cfg(feature = "kafka")]
2935/// Request body for producing a batch of Kafka messages
2936#[derive(Debug, Deserialize)]
2937pub struct KafkaBatchProduceRequest {
2938    /// List of messages to produce
2939    pub messages: Vec<KafkaProduceRequest>,
2940    /// Delay between messages in milliseconds
2941    #[serde(default = "default_delay")]
2942    pub delay_ms: u64,
2943}
2944
2945#[cfg(feature = "kafka")]
2946/// Produce multiple messages to Kafka topics
2947async fn produce_kafka_batch(
2948    State(state): State<ManagementState>,
2949    Json(request): Json<KafkaBatchProduceRequest>,
2950) -> impl IntoResponse {
2951    if let Some(broker) = &state.kafka_broker {
2952        if request.messages.is_empty() {
2953            return (
2954                StatusCode::BAD_REQUEST,
2955                Json(serde_json::json!({
2956                    "error": "Empty batch",
2957                    "message": "At least one message is required"
2958                })),
2959            )
2960                .into_response();
2961        }
2962
2963        let mut results = Vec::new();
2964
2965        for (index, msg_request) in request.messages.iter().enumerate() {
2966            let mut topics = broker.topics.write().await;
2967
2968            // Get or create the topic
2969            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2970                mockforge_kafka::topics::Topic::new(
2971                    msg_request.topic.clone(),
2972                    mockforge_kafka::topics::TopicConfig::default(),
2973                )
2974            });
2975
2976            // Determine partition
2977            let partition_id = if let Some(partition) = msg_request.partition {
2978                partition
2979            } else {
2980                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2981            };
2982
2983            // Validate partition exists
2984            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2985                results.push(serde_json::json!({
2986                    "index": index,
2987                    "success": false,
2988                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2989                }));
2990                continue;
2991            }
2992
2993            // Create the message
2994            let message = mockforge_kafka::partitions::KafkaMessage {
2995                offset: 0,
2996                timestamp: chrono::Utc::now().timestamp_millis(),
2997                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2998                value: msg_request.value.as_bytes().to_vec(),
2999                headers: msg_request
3000                    .headers
3001                    .clone()
3002                    .unwrap_or_default()
3003                    .into_iter()
3004                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
3005                    .collect(),
3006            };
3007
3008            // Produce to partition
3009            match topic_entry.produce(partition_id, message).await {
3010                Ok(offset) => {
3011                    // Record metrics for successful message production
3012                    if let Some(broker) = &state.kafka_broker {
3013                        broker.metrics().record_messages_produced(1);
3014                    }
3015
3016                    // Emit message event
3017                    let event = MessageEvent::Kafka(KafkaMessageEvent {
3018                        topic: msg_request.topic.clone(),
3019                        key: msg_request.key.clone(),
3020                        value: msg_request.value.clone(),
3021                        partition: partition_id,
3022                        offset,
3023                        headers: msg_request.headers.clone(),
3024                        timestamp: chrono::Utc::now().to_rfc3339(),
3025                    });
3026                    let _ = state.message_events.send(event);
3027
3028                    results.push(serde_json::json!({
3029                        "index": index,
3030                        "success": true,
3031                        "topic": msg_request.topic,
3032                        "partition": partition_id,
3033                        "offset": offset
3034                    }));
3035                }
3036                Err(e) => {
3037                    results.push(serde_json::json!({
3038                        "index": index,
3039                        "success": false,
3040                        "error": e.to_string()
3041                    }));
3042                }
3043            }
3044
3045            // Add delay between messages (except for the last one)
3046            if index < request.messages.len() - 1 && request.delay_ms > 0 {
3047                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3048            }
3049        }
3050
3051        let success_count =
3052            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3053
3054        Json(serde_json::json!({
3055            "success": true,
3056            "total": request.messages.len(),
3057            "succeeded": success_count,
3058            "failed": request.messages.len() - success_count,
3059            "results": results
3060        }))
3061        .into_response()
3062    } else {
3063        (
3064            StatusCode::SERVICE_UNAVAILABLE,
3065            Json(serde_json::json!({
3066                "error": "Kafka broker not available",
3067                "message": "Kafka broker is not enabled or not available."
3068            })),
3069        )
3070            .into_response()
3071    }
3072}
3073
3074// ========== Real-time Message Streaming (SSE) ==========
3075
3076#[cfg(feature = "mqtt")]
3077/// SSE stream for MQTT messages
3078async fn mqtt_messages_stream(
3079    State(state): State<ManagementState>,
3080    Query(params): Query<std::collections::HashMap<String, String>>,
3081) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3082    let rx = state.message_events.subscribe();
3083    let topic_filter = params.get("topic").cloned();
3084
3085    let stream = stream::unfold(rx, move |mut rx| {
3086        let topic_filter = topic_filter.clone();
3087
3088        async move {
3089            loop {
3090                match rx.recv().await {
3091                    Ok(MessageEvent::Mqtt(event)) => {
3092                        // Apply topic filter if specified
3093                        if let Some(filter) = &topic_filter {
3094                            if !event.topic.contains(filter) {
3095                                continue;
3096                            }
3097                        }
3098
3099                        let event_json = serde_json::json!({
3100                            "protocol": "mqtt",
3101                            "topic": event.topic,
3102                            "payload": event.payload,
3103                            "qos": event.qos,
3104                            "retain": event.retain,
3105                            "timestamp": event.timestamp,
3106                        });
3107
3108                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3109                            let sse_event = Event::default().event("mqtt_message").data(event_data);
3110                            return Some((Ok(sse_event), rx));
3111                        }
3112                    }
3113                    #[cfg(feature = "kafka")]
3114                    Ok(MessageEvent::Kafka(_)) => {
3115                        // Skip Kafka events in MQTT stream
3116                        continue;
3117                    }
3118                    Err(broadcast::error::RecvError::Closed) => {
3119                        return None;
3120                    }
3121                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3122                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
3123                        continue;
3124                    }
3125                }
3126            }
3127        }
3128    });
3129
3130    Sse::new(stream).keep_alive(
3131        axum::response::sse::KeepAlive::new()
3132            .interval(std::time::Duration::from_secs(15))
3133            .text("keep-alive-text"),
3134    )
3135}
3136
3137#[cfg(feature = "kafka")]
3138/// SSE stream for Kafka messages
3139async fn kafka_messages_stream(
3140    State(state): State<ManagementState>,
3141    Query(params): Query<std::collections::HashMap<String, String>>,
3142) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3143    let rx = state.message_events.subscribe();
3144    let topic_filter = params.get("topic").cloned();
3145
3146    let stream = stream::unfold(rx, move |mut rx| {
3147        let topic_filter = topic_filter.clone();
3148
3149        async move {
3150            loop {
3151                match rx.recv().await {
3152                    #[cfg(feature = "mqtt")]
3153                    Ok(MessageEvent::Mqtt(_)) => {
3154                        // Skip MQTT events in Kafka stream
3155                        continue;
3156                    }
3157                    Ok(MessageEvent::Kafka(event)) => {
3158                        // Apply topic filter if specified
3159                        if let Some(filter) = &topic_filter {
3160                            if !event.topic.contains(filter) {
3161                                continue;
3162                            }
3163                        }
3164
3165                        let event_json = serde_json::json!({
3166                            "protocol": "kafka",
3167                            "topic": event.topic,
3168                            "key": event.key,
3169                            "value": event.value,
3170                            "partition": event.partition,
3171                            "offset": event.offset,
3172                            "headers": event.headers,
3173                            "timestamp": event.timestamp,
3174                        });
3175
3176                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3177                            let sse_event =
3178                                Event::default().event("kafka_message").data(event_data);
3179                            return Some((Ok(sse_event), rx));
3180                        }
3181                    }
3182                    Err(broadcast::error::RecvError::Closed) => {
3183                        return None;
3184                    }
3185                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3186                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3187                        continue;
3188                    }
3189                }
3190            }
3191        }
3192    });
3193
3194    Sse::new(stream).keep_alive(
3195        axum::response::sse::KeepAlive::new()
3196            .interval(std::time::Duration::from_secs(15))
3197            .text("keep-alive-text"),
3198    )
3199}
3200
3201// ========== AI-Powered Features ==========
3202
3203/// Request for AI-powered API specification generation
3204#[derive(Debug, Deserialize)]
3205pub struct GenerateSpecRequest {
3206    /// Natural language description of the API to generate
3207    pub query: String,
3208    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3209    pub spec_type: String,
3210    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3211    pub api_version: Option<String>,
3212}
3213
3214/// Request for OpenAPI generation from recorded traffic
3215#[derive(Debug, Deserialize)]
3216pub struct GenerateOpenApiFromTrafficRequest {
3217    /// Path to recorder database (optional, defaults to ./recordings.db)
3218    #[serde(default)]
3219    pub database_path: Option<String>,
3220    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3221    #[serde(default)]
3222    pub since: Option<String>,
3223    /// End time for filtering (ISO 8601 format)
3224    #[serde(default)]
3225    pub until: Option<String>,
3226    /// Path pattern filter (supports wildcards, e.g., /api/*)
3227    #[serde(default)]
3228    pub path_pattern: Option<String>,
3229    /// Minimum confidence score for including paths (0.0 to 1.0)
3230    #[serde(default = "default_min_confidence")]
3231    pub min_confidence: f64,
3232}
3233
3234fn default_min_confidence() -> f64 {
3235    0.7
3236}
3237
3238/// Generate API specification from natural language using AI
3239#[cfg(feature = "data-faker")]
3240async fn generate_ai_spec(
3241    State(_state): State<ManagementState>,
3242    Json(request): Json<GenerateSpecRequest>,
3243) -> impl IntoResponse {
3244    use mockforge_data::rag::{
3245        config::{LlmProvider, RagConfig},
3246        engine::RagEngine,
3247        storage::DocumentStorage,
3248    };
3249    use std::sync::Arc;
3250
3251    // Build RAG config from environment variables
3252    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3253        .ok()
3254        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3255
3256    // Check if RAG is configured - require API key
3257    if api_key.is_none() {
3258        return (
3259            StatusCode::SERVICE_UNAVAILABLE,
3260            Json(serde_json::json!({
3261                "error": "AI service not configured",
3262                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3263            })),
3264        )
3265            .into_response();
3266    }
3267
3268    // Build RAG configuration
3269    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3270        .unwrap_or_else(|_| "openai".to_string())
3271        .to_lowercase();
3272
3273    let provider = match provider_str.as_str() {
3274        "openai" => LlmProvider::OpenAI,
3275        "anthropic" => LlmProvider::Anthropic,
3276        "ollama" => LlmProvider::Ollama,
3277        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3278        _ => LlmProvider::OpenAI,
3279    };
3280
3281    let api_endpoint =
3282        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3283            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3284            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3285            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3286            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3287        });
3288
3289    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3290        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3291        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3292        LlmProvider::Ollama => "llama2".to_string(),
3293        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3294    });
3295
3296    // Build RagConfig using struct literal with defaults
3297    let rag_config = RagConfig {
3298        provider,
3299        api_endpoint,
3300        api_key,
3301        model,
3302        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3303            .unwrap_or_else(|_| "4096".to_string())
3304            .parse()
3305            .unwrap_or(4096),
3306        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3307            .unwrap_or_else(|_| "0.3".to_string())
3308            .parse()
3309            .unwrap_or(0.3), // Lower temperature for more structured output
3310        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3311            .unwrap_or_else(|_| "60".to_string())
3312            .parse()
3313            .unwrap_or(60),
3314        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3315            .unwrap_or_else(|_| "4000".to_string())
3316            .parse()
3317            .unwrap_or(4000),
3318        ..Default::default()
3319    };
3320
3321    // Build the prompt for spec generation
3322    let spec_type_label = match request.spec_type.as_str() {
3323        "openapi" => "OpenAPI 3.0",
3324        "graphql" => "GraphQL",
3325        "asyncapi" => "AsyncAPI",
3326        _ => "OpenAPI 3.0",
3327    };
3328
3329    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3330
3331    let prompt = format!(
3332        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3333
3334User Requirements:
3335{}
3336
3337Instructions:
33381. Generate a complete, valid {} specification
33392. Include all paths, operations, request/response schemas, and components
33403. Use realistic field names and data types
33414. Include proper descriptions and examples
33425. Follow {} best practices
33436. Return ONLY the specification, no additional explanation
33447. For OpenAPI, use version {}
3345
3346Return the specification in {} format."#,
3347        spec_type_label,
3348        request.query,
3349        spec_type_label,
3350        spec_type_label,
3351        api_version,
3352        if request.spec_type == "graphql" {
3353            "GraphQL SDL"
3354        } else {
3355            "YAML"
3356        }
3357    );
3358
3359    // Create in-memory storage for RAG engine
3360    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3361    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3362    // a simpler approach: create InMemoryStorage directly
3363    use mockforge_data::rag::storage::InMemoryStorage;
3364    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3365
3366    // Create RAG engine
3367    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3368        Ok(engine) => engine,
3369        Err(e) => {
3370            return (
3371                StatusCode::INTERNAL_SERVER_ERROR,
3372                Json(serde_json::json!({
3373                    "error": "Failed to initialize RAG engine",
3374                    "message": e.to_string()
3375                })),
3376            )
3377                .into_response();
3378        }
3379    };
3380
3381    // Generate using RAG engine
3382    match rag_engine.generate(&prompt, None).await {
3383        Ok(generated_text) => {
3384            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3385            let spec = if request.spec_type == "graphql" {
3386                // For GraphQL, extract SDL
3387                extract_graphql_schema(&generated_text)
3388            } else {
3389                // For OpenAPI/AsyncAPI, extract YAML
3390                extract_yaml_spec(&generated_text)
3391            };
3392
3393            Json(serde_json::json!({
3394                "success": true,
3395                "spec": spec,
3396                "spec_type": request.spec_type,
3397            }))
3398            .into_response()
3399        }
3400        Err(e) => (
3401            StatusCode::INTERNAL_SERVER_ERROR,
3402            Json(serde_json::json!({
3403                "error": "AI generation failed",
3404                "message": e.to_string()
3405            })),
3406        )
3407            .into_response(),
3408    }
3409}
3410
3411#[cfg(not(feature = "data-faker"))]
3412async fn generate_ai_spec(
3413    State(_state): State<ManagementState>,
3414    Json(_request): Json<GenerateSpecRequest>,
3415) -> impl IntoResponse {
3416    (
3417        StatusCode::NOT_IMPLEMENTED,
3418        Json(serde_json::json!({
3419            "error": "AI features not enabled",
3420            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3421        })),
3422    )
3423        .into_response()
3424}
3425
3426/// Generate OpenAPI specification from recorded traffic
3427#[cfg(feature = "behavioral-cloning")]
3428async fn generate_openapi_from_traffic(
3429    State(_state): State<ManagementState>,
3430    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3431) -> impl IntoResponse {
3432    use chrono::{DateTime, Utc};
3433    use mockforge_core::intelligent_behavior::{
3434        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3435        IntelligentBehaviorConfig,
3436    };
3437    use mockforge_recorder::{
3438        database::RecorderDatabase,
3439        openapi_export::{QueryFilters, RecordingsToOpenApi},
3440    };
3441    use std::path::PathBuf;
3442
3443    // Determine database path
3444    let db_path = if let Some(ref path) = request.database_path {
3445        PathBuf::from(path)
3446    } else {
3447        std::env::current_dir()
3448            .unwrap_or_else(|_| PathBuf::from("."))
3449            .join("recordings.db")
3450    };
3451
3452    // Open database
3453    let db = match RecorderDatabase::new(&db_path).await {
3454        Ok(db) => db,
3455        Err(e) => {
3456            return (
3457                StatusCode::BAD_REQUEST,
3458                Json(serde_json::json!({
3459                    "error": "Database error",
3460                    "message": format!("Failed to open recorder database: {}", e)
3461                })),
3462            )
3463                .into_response();
3464        }
3465    };
3466
3467    // Parse time filters
3468    let since_dt = if let Some(ref since_str) = request.since {
3469        match DateTime::parse_from_rfc3339(since_str) {
3470            Ok(dt) => Some(dt.with_timezone(&Utc)),
3471            Err(e) => {
3472                return (
3473                    StatusCode::BAD_REQUEST,
3474                    Json(serde_json::json!({
3475                        "error": "Invalid date format",
3476                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3477                    })),
3478                )
3479                    .into_response();
3480            }
3481        }
3482    } else {
3483        None
3484    };
3485
3486    let until_dt = if let Some(ref until_str) = request.until {
3487        match DateTime::parse_from_rfc3339(until_str) {
3488            Ok(dt) => Some(dt.with_timezone(&Utc)),
3489            Err(e) => {
3490                return (
3491                    StatusCode::BAD_REQUEST,
3492                    Json(serde_json::json!({
3493                        "error": "Invalid date format",
3494                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3495                    })),
3496                )
3497                    .into_response();
3498            }
3499        }
3500    } else {
3501        None
3502    };
3503
3504    // Build query filters
3505    let query_filters = QueryFilters {
3506        since: since_dt,
3507        until: until_dt,
3508        path_pattern: request.path_pattern.clone(),
3509        min_status_code: None,
3510        max_requests: Some(1000),
3511    };
3512
3513    // Query HTTP exchanges
3514    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3515    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3516    // dependency, so we need to manually convert to the local version.
3517    let exchanges_from_recorder =
3518        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3519            Ok(exchanges) => exchanges,
3520            Err(e) => {
3521                return (
3522                    StatusCode::INTERNAL_SERVER_ERROR,
3523                    Json(serde_json::json!({
3524                        "error": "Query error",
3525                        "message": format!("Failed to query HTTP exchanges: {}", e)
3526                    })),
3527                )
3528                    .into_response();
3529            }
3530        };
3531
3532    if exchanges_from_recorder.is_empty() {
3533        return (
3534            StatusCode::NOT_FOUND,
3535            Json(serde_json::json!({
3536                "error": "No exchanges found",
3537                "message": "No HTTP exchanges found matching the specified filters"
3538            })),
3539        )
3540            .into_response();
3541    }
3542
3543    // Convert to local HttpExchange type to avoid version mismatch
3544    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3545    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3546        .into_iter()
3547        .map(|e| LocalHttpExchange {
3548            method: e.method,
3549            path: e.path,
3550            query_params: e.query_params,
3551            headers: e.headers,
3552            body: e.body,
3553            body_encoding: e.body_encoding,
3554            status_code: e.status_code,
3555            response_headers: e.response_headers,
3556            response_body: e.response_body,
3557            response_body_encoding: e.response_body_encoding,
3558            timestamp: e.timestamp,
3559        })
3560        .collect();
3561
3562    // Create OpenAPI generator config
3563    let behavior_config = IntelligentBehaviorConfig::default();
3564    let gen_config = OpenApiGenerationConfig {
3565        min_confidence: request.min_confidence,
3566        behavior_model: Some(behavior_config.behavior_model),
3567    };
3568
3569    // Generate OpenAPI spec
3570    let generator = OpenApiSpecGenerator::new(gen_config);
3571    let result = match generator.generate_from_exchanges(exchanges).await {
3572        Ok(result) => result,
3573        Err(e) => {
3574            return (
3575                StatusCode::INTERNAL_SERVER_ERROR,
3576                Json(serde_json::json!({
3577                    "error": "Generation error",
3578                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3579                })),
3580            )
3581                .into_response();
3582        }
3583    };
3584
3585    // Prepare response
3586    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3587        raw.clone()
3588    } else {
3589        match serde_json::to_value(&result.spec.spec) {
3590            Ok(json) => json,
3591            Err(e) => {
3592                return (
3593                    StatusCode::INTERNAL_SERVER_ERROR,
3594                    Json(serde_json::json!({
3595                        "error": "Serialization error",
3596                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3597                    })),
3598                )
3599                    .into_response();
3600            }
3601        }
3602    };
3603
3604    // Build response with metadata
3605    let response = serde_json::json!({
3606        "spec": spec_json,
3607        "metadata": {
3608            "requests_analyzed": result.metadata.requests_analyzed,
3609            "paths_inferred": result.metadata.paths_inferred,
3610            "path_confidence": result.metadata.path_confidence,
3611            "generated_at": result.metadata.generated_at.to_rfc3339(),
3612            "duration_ms": result.metadata.duration_ms,
3613        }
3614    });
3615
3616    Json(response).into_response()
3617}
3618
3619/// List all rule explanations
3620async fn list_rule_explanations(
3621    State(state): State<ManagementState>,
3622    Query(params): Query<std::collections::HashMap<String, String>>,
3623) -> impl IntoResponse {
3624    use mockforge_core::intelligent_behavior::RuleType;
3625
3626    let explanations = state.rule_explanations.read().await;
3627    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3628
3629    // Filter by rule type if provided
3630    if let Some(rule_type_str) = params.get("rule_type") {
3631        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3632            explanations_vec.retain(|e| e.rule_type == rule_type);
3633        }
3634    }
3635
3636    // Filter by minimum confidence if provided
3637    if let Some(min_confidence_str) = params.get("min_confidence") {
3638        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3639            explanations_vec.retain(|e| e.confidence >= min_confidence);
3640        }
3641    }
3642
3643    // Sort by confidence (descending) and then by generated_at (descending)
3644    explanations_vec.sort_by(|a, b| {
3645        b.confidence
3646            .partial_cmp(&a.confidence)
3647            .unwrap_or(std::cmp::Ordering::Equal)
3648            .then_with(|| b.generated_at.cmp(&a.generated_at))
3649    });
3650
3651    Json(serde_json::json!({
3652        "explanations": explanations_vec,
3653        "total": explanations_vec.len(),
3654    }))
3655    .into_response()
3656}
3657
3658/// Get a specific rule explanation by ID
3659async fn get_rule_explanation(
3660    State(state): State<ManagementState>,
3661    Path(rule_id): Path<String>,
3662) -> impl IntoResponse {
3663    let explanations = state.rule_explanations.read().await;
3664
3665    match explanations.get(&rule_id) {
3666        Some(explanation) => Json(serde_json::json!({
3667            "explanation": explanation,
3668        }))
3669        .into_response(),
3670        None => (
3671            StatusCode::NOT_FOUND,
3672            Json(serde_json::json!({
3673                "error": "Rule explanation not found",
3674                "message": format!("No explanation found for rule ID: {}", rule_id)
3675            })),
3676        )
3677            .into_response(),
3678    }
3679}
3680
3681/// Request for learning from examples
3682#[derive(Debug, Deserialize)]
3683pub struct LearnFromExamplesRequest {
3684    /// Example request/response pairs to learn from
3685    pub examples: Vec<ExamplePairRequest>,
3686    /// Optional configuration override
3687    #[serde(default)]
3688    pub config: Option<serde_json::Value>,
3689}
3690
3691/// Example pair request format
3692#[derive(Debug, Deserialize)]
3693pub struct ExamplePairRequest {
3694    /// Request data (method, path, body, etc.)
3695    pub request: serde_json::Value,
3696    /// Response data (status_code, body, etc.)
3697    pub response: serde_json::Value,
3698}
3699
3700/// Learn behavioral rules from example pairs
3701///
3702/// This endpoint accepts example request/response pairs, generates behavioral rules
3703/// with explanations, and stores the explanations for later retrieval.
3704async fn learn_from_examples(
3705    State(state): State<ManagementState>,
3706    Json(request): Json<LearnFromExamplesRequest>,
3707) -> impl IntoResponse {
3708    use mockforge_core::intelligent_behavior::{
3709        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3710        rule_generator::{ExamplePair, RuleGenerator},
3711    };
3712
3713    if request.examples.is_empty() {
3714        return (
3715            StatusCode::BAD_REQUEST,
3716            Json(serde_json::json!({
3717                "error": "No examples provided",
3718                "message": "At least one example pair is required"
3719            })),
3720        )
3721            .into_response();
3722    }
3723
3724    // Convert request examples to ExamplePair format
3725    let example_pairs: Result<Vec<ExamplePair>, String> = request
3726        .examples
3727        .into_iter()
3728        .enumerate()
3729        .map(|(idx, ex)| {
3730            // Parse request JSON to extract method, path, body, etc.
3731            let method = ex
3732                .request
3733                .get("method")
3734                .and_then(|v| v.as_str())
3735                .map(|s| s.to_string())
3736                .unwrap_or_else(|| "GET".to_string());
3737            let path = ex
3738                .request
3739                .get("path")
3740                .and_then(|v| v.as_str())
3741                .map(|s| s.to_string())
3742                .unwrap_or_else(|| "/".to_string());
3743            let request_body = ex.request.get("body").cloned();
3744            let query_params = ex
3745                .request
3746                .get("query_params")
3747                .and_then(|v| v.as_object())
3748                .map(|obj| {
3749                    obj.iter()
3750                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3751                        .collect()
3752                })
3753                .unwrap_or_default();
3754            let headers = ex
3755                .request
3756                .get("headers")
3757                .and_then(|v| v.as_object())
3758                .map(|obj| {
3759                    obj.iter()
3760                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3761                        .collect()
3762                })
3763                .unwrap_or_default();
3764
3765            // Parse response JSON to extract status, body, etc.
3766            let status = ex
3767                .response
3768                .get("status_code")
3769                .or_else(|| ex.response.get("status"))
3770                .and_then(|v| v.as_u64())
3771                .map(|n| n as u16)
3772                .unwrap_or(200);
3773            let response_body = ex.response.get("body").cloned();
3774
3775            Ok(ExamplePair {
3776                method,
3777                path,
3778                request: request_body,
3779                status,
3780                response: response_body,
3781                query_params,
3782                headers,
3783                metadata: {
3784                    let mut meta = std::collections::HashMap::new();
3785                    meta.insert("source".to_string(), "api".to_string());
3786                    meta.insert("example_index".to_string(), idx.to_string());
3787                    meta
3788                },
3789            })
3790        })
3791        .collect();
3792
3793    let example_pairs = match example_pairs {
3794        Ok(pairs) => pairs,
3795        Err(e) => {
3796            return (
3797                StatusCode::BAD_REQUEST,
3798                Json(serde_json::json!({
3799                    "error": "Invalid examples",
3800                    "message": e
3801                })),
3802            )
3803                .into_response();
3804        }
3805    };
3806
3807    // Create behavior config (use provided config or default)
3808    let behavior_config = if let Some(config_json) = request.config {
3809        // Try to deserialize custom config, fall back to default
3810        serde_json::from_value(config_json)
3811            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3812            .behavior_model
3813    } else {
3814        BehaviorModelConfig::default()
3815    };
3816
3817    // Create rule generator
3818    let generator = RuleGenerator::new(behavior_config);
3819
3820    // Generate rules with explanations
3821    let (rules, explanations) =
3822        match generator.generate_rules_with_explanations(example_pairs).await {
3823            Ok(result) => result,
3824            Err(e) => {
3825                return (
3826                    StatusCode::INTERNAL_SERVER_ERROR,
3827                    Json(serde_json::json!({
3828                        "error": "Rule generation failed",
3829                        "message": format!("Failed to generate rules: {}", e)
3830                    })),
3831                )
3832                    .into_response();
3833            }
3834        };
3835
3836    // Store explanations in ManagementState
3837    {
3838        let mut stored_explanations = state.rule_explanations.write().await;
3839        for explanation in &explanations {
3840            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3841        }
3842    }
3843
3844    // Prepare response
3845    let response = serde_json::json!({
3846        "success": true,
3847        "rules_generated": {
3848            "consistency_rules": rules.consistency_rules.len(),
3849            "schemas": rules.schemas.len(),
3850            "state_machines": rules.state_transitions.len(),
3851            "system_prompt": !rules.system_prompt.is_empty(),
3852        },
3853        "explanations": explanations.iter().map(|e| serde_json::json!({
3854            "rule_id": e.rule_id,
3855            "rule_type": e.rule_type,
3856            "confidence": e.confidence,
3857            "reasoning": e.reasoning,
3858        })).collect::<Vec<_>>(),
3859        "total_explanations": explanations.len(),
3860    });
3861
3862    Json(response).into_response()
3863}
3864
3865#[cfg(feature = "data-faker")]
3866fn extract_yaml_spec(text: &str) -> String {
3867    // Try to find YAML code blocks
3868    if let Some(start) = text.find("```yaml") {
3869        let yaml_start = text[start + 7..].trim_start();
3870        if let Some(end) = yaml_start.find("```") {
3871            return yaml_start[..end].trim().to_string();
3872        }
3873    }
3874    if let Some(start) = text.find("```") {
3875        let content_start = text[start + 3..].trim_start();
3876        if let Some(end) = content_start.find("```") {
3877            return content_start[..end].trim().to_string();
3878        }
3879    }
3880
3881    // Check if it starts with openapi: or asyncapi:
3882    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3883        return text.trim().to_string();
3884    }
3885
3886    // Return as-is if no code blocks found
3887    text.trim().to_string()
3888}
3889
3890/// Extract GraphQL schema from text content
3891#[cfg(feature = "data-faker")]
3892fn extract_graphql_schema(text: &str) -> String {
3893    // Try to find GraphQL code blocks
3894    if let Some(start) = text.find("```graphql") {
3895        let schema_start = text[start + 10..].trim_start();
3896        if let Some(end) = schema_start.find("```") {
3897            return schema_start[..end].trim().to_string();
3898        }
3899    }
3900    if let Some(start) = text.find("```") {
3901        let content_start = text[start + 3..].trim_start();
3902        if let Some(end) = content_start.find("```") {
3903            return content_start[..end].trim().to_string();
3904        }
3905    }
3906
3907    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3908    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3909        return text.trim().to_string();
3910    }
3911
3912    text.trim().to_string()
3913}
3914
3915// ========== Chaos Engineering Management ==========
3916
3917/// Get current chaos engineering configuration
3918async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3919    #[cfg(feature = "chaos")]
3920    {
3921        if let Some(chaos_state) = &_state.chaos_api_state {
3922            let config = chaos_state.config.read().await;
3923            // Convert ChaosConfig to JSON response format
3924            Json(serde_json::json!({
3925                "enabled": config.enabled,
3926                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3927                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3928                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3929                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3930            }))
3931            .into_response()
3932        } else {
3933            // Chaos API not available, return default
3934            Json(serde_json::json!({
3935                "enabled": false,
3936                "latency": null,
3937                "fault_injection": null,
3938                "rate_limit": null,
3939                "traffic_shaping": null,
3940            }))
3941            .into_response()
3942        }
3943    }
3944    #[cfg(not(feature = "chaos"))]
3945    {
3946        // Chaos feature not enabled
3947        Json(serde_json::json!({
3948            "enabled": false,
3949            "latency": null,
3950            "fault_injection": null,
3951            "rate_limit": null,
3952            "traffic_shaping": null,
3953        }))
3954        .into_response()
3955    }
3956}
3957
3958/// Request to update chaos configuration
3959#[derive(Debug, Deserialize)]
3960pub struct ChaosConfigUpdate {
3961    /// Whether to enable chaos engineering
3962    pub enabled: Option<bool>,
3963    /// Latency configuration
3964    pub latency: Option<serde_json::Value>,
3965    /// Fault injection configuration
3966    pub fault_injection: Option<serde_json::Value>,
3967    /// Rate limiting configuration
3968    pub rate_limit: Option<serde_json::Value>,
3969    /// Traffic shaping configuration
3970    pub traffic_shaping: Option<serde_json::Value>,
3971}
3972
3973/// Update chaos engineering configuration
3974async fn update_chaos_config(
3975    State(_state): State<ManagementState>,
3976    Json(_config_update): Json<ChaosConfigUpdate>,
3977) -> impl IntoResponse {
3978    #[cfg(feature = "chaos")]
3979    {
3980        if let Some(chaos_state) = &_state.chaos_api_state {
3981            use mockforge_chaos::config::{
3982                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3983            };
3984
3985            let mut config = chaos_state.config.write().await;
3986
3987            // Update enabled flag if provided
3988            if let Some(enabled) = _config_update.enabled {
3989                config.enabled = enabled;
3990            }
3991
3992            // Update latency config if provided
3993            if let Some(latency_json) = _config_update.latency {
3994                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3995                    config.latency = Some(latency);
3996                }
3997            }
3998
3999            // Update fault injection config if provided
4000            if let Some(fault_json) = _config_update.fault_injection {
4001                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
4002                    config.fault_injection = Some(fault);
4003                }
4004            }
4005
4006            // Update rate limit config if provided
4007            if let Some(rate_json) = _config_update.rate_limit {
4008                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
4009                    config.rate_limit = Some(rate);
4010                }
4011            }
4012
4013            // Update traffic shaping config if provided
4014            if let Some(traffic_json) = _config_update.traffic_shaping {
4015                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
4016                    config.traffic_shaping = Some(traffic);
4017                }
4018            }
4019
4020            // Reinitialize middleware injectors with new config
4021            // The middleware will pick up the changes on the next request
4022            drop(config);
4023
4024            info!("Chaos configuration updated successfully");
4025            Json(serde_json::json!({
4026                "success": true,
4027                "message": "Chaos configuration updated and applied"
4028            }))
4029            .into_response()
4030        } else {
4031            (
4032                StatusCode::SERVICE_UNAVAILABLE,
4033                Json(serde_json::json!({
4034                    "success": false,
4035                    "error": "Chaos API not available",
4036                    "message": "Chaos engineering is not enabled or configured"
4037                })),
4038            )
4039                .into_response()
4040        }
4041    }
4042    #[cfg(not(feature = "chaos"))]
4043    {
4044        (
4045            StatusCode::NOT_IMPLEMENTED,
4046            Json(serde_json::json!({
4047                "success": false,
4048                "error": "Chaos feature not enabled",
4049                "message": "Chaos engineering feature is not compiled into this build"
4050            })),
4051        )
4052            .into_response()
4053    }
4054}
4055
4056// ========== Network Profile Management ==========
4057
4058/// List available network profiles
4059async fn list_network_profiles() -> impl IntoResponse {
4060    use mockforge_core::network_profiles::NetworkProfileCatalog;
4061
4062    let catalog = NetworkProfileCatalog::default();
4063    let profiles: Vec<serde_json::Value> = catalog
4064        .list_profiles_with_description()
4065        .iter()
4066        .map(|(name, description)| {
4067            serde_json::json!({
4068                "name": name,
4069                "description": description,
4070            })
4071        })
4072        .collect();
4073
4074    Json(serde_json::json!({
4075        "profiles": profiles
4076    }))
4077    .into_response()
4078}
4079
4080#[derive(Debug, Deserialize)]
4081/// Request to apply a network profile
4082pub struct ApplyNetworkProfileRequest {
4083    /// Name of the network profile to apply
4084    pub profile_name: String,
4085}
4086
4087/// Apply a network profile
4088async fn apply_network_profile(
4089    State(state): State<ManagementState>,
4090    Json(request): Json<ApplyNetworkProfileRequest>,
4091) -> impl IntoResponse {
4092    use mockforge_core::network_profiles::NetworkProfileCatalog;
4093
4094    let catalog = NetworkProfileCatalog::default();
4095    if let Some(profile) = catalog.get(&request.profile_name) {
4096        // Apply profile to server configuration if available
4097        // NetworkProfile contains latency and traffic_shaping configs
4098        if let Some(server_config) = &state.server_config {
4099            let mut config = server_config.write().await;
4100
4101            // Apply network profile's traffic shaping to core config
4102            use mockforge_core::config::NetworkShapingConfig;
4103
4104            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
4105            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
4106            // which has bandwidth and burst_loss fields
4107            let network_shaping = NetworkShapingConfig {
4108                enabled: profile.traffic_shaping.bandwidth.enabled
4109                    || profile.traffic_shaping.burst_loss.enabled,
4110                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4111                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4112                max_connections: 1000, // Default value
4113            };
4114
4115            // Update chaos config if it exists, or create it
4116            // Chaos config is in observability.chaos, not core.chaos
4117            if let Some(ref mut chaos) = config.observability.chaos {
4118                chaos.traffic_shaping = Some(network_shaping);
4119            } else {
4120                // Create minimal chaos config with traffic shaping
4121                use mockforge_core::config::ChaosEngConfig;
4122                config.observability.chaos = Some(ChaosEngConfig {
4123                    enabled: true,
4124                    latency: None,
4125                    fault_injection: None,
4126                    rate_limit: None,
4127                    traffic_shaping: Some(network_shaping),
4128                    scenario: None,
4129                });
4130            }
4131
4132            info!("Network profile '{}' applied to server configuration", request.profile_name);
4133        } else {
4134            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4135        }
4136
4137        // Also update chaos API state if available
4138        #[cfg(feature = "chaos")]
4139        {
4140            if let Some(chaos_state) = &state.chaos_api_state {
4141                use mockforge_chaos::config::TrafficShapingConfig;
4142
4143                let mut chaos_config = chaos_state.config.write().await;
4144                // Apply profile's traffic shaping to chaos API state
4145                let chaos_traffic_shaping = TrafficShapingConfig {
4146                    enabled: profile.traffic_shaping.bandwidth.enabled
4147                        || profile.traffic_shaping.burst_loss.enabled,
4148                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4149                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4150                    max_connections: 0,
4151                    connection_timeout_ms: 30000,
4152                };
4153                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4154                chaos_config.enabled = true; // Enable chaos when applying a profile
4155                drop(chaos_config);
4156                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4157            }
4158        }
4159
4160        Json(serde_json::json!({
4161            "success": true,
4162            "message": format!("Network profile '{}' applied", request.profile_name),
4163            "profile": {
4164                "name": profile.name,
4165                "description": profile.description,
4166            }
4167        }))
4168        .into_response()
4169    } else {
4170        (
4171            StatusCode::NOT_FOUND,
4172            Json(serde_json::json!({
4173                "error": "Profile not found",
4174                "message": format!("Network profile '{}' not found", request.profile_name)
4175            })),
4176        )
4177            .into_response()
4178    }
4179}
4180
4181/// Build the management API router with UI Builder support
4182pub fn management_router_with_ui_builder(
4183    state: ManagementState,
4184    server_config: mockforge_core::config::ServerConfig,
4185) -> Router {
4186    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4187
4188    // Create the base management router
4189    let management = management_router(state);
4190
4191    // Create UI Builder state and router
4192    let ui_builder_state = UIBuilderState::new(server_config);
4193    let ui_builder = create_ui_builder_router(ui_builder_state);
4194
4195    // Nest UI Builder under /ui-builder
4196    management.nest("/ui-builder", ui_builder)
4197}
4198
4199/// Build management router with spec import API
4200pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4201    use crate::spec_import::{spec_import_router, SpecImportState};
4202
4203    // Create base management router
4204    let management = management_router(state);
4205
4206    // Merge with spec import router
4207    Router::new()
4208        .merge(management)
4209        .merge(spec_import_router(SpecImportState::new()))
4210}
4211
4212#[cfg(test)]
4213mod tests {
4214    use super::*;
4215
4216    #[tokio::test]
4217    async fn test_create_and_get_mock() {
4218        let state = ManagementState::new(None, None, 3000);
4219
4220        let mock = MockConfig {
4221            id: "test-1".to_string(),
4222            name: "Test Mock".to_string(),
4223            method: "GET".to_string(),
4224            path: "/test".to_string(),
4225            response: MockResponse {
4226                body: serde_json::json!({"message": "test"}),
4227                headers: None,
4228            },
4229            enabled: true,
4230            latency_ms: None,
4231            status_code: Some(200),
4232            request_match: None,
4233            priority: None,
4234            scenario: None,
4235            required_scenario_state: None,
4236            new_scenario_state: None,
4237        };
4238
4239        // Create mock
4240        {
4241            let mut mocks = state.mocks.write().await;
4242            mocks.push(mock.clone());
4243        }
4244
4245        // Get mock
4246        let mocks = state.mocks.read().await;
4247        let found = mocks.iter().find(|m| m.id == "test-1");
4248        assert!(found.is_some());
4249        assert_eq!(found.unwrap().name, "Test Mock");
4250    }
4251
4252    #[tokio::test]
4253    async fn test_server_stats() {
4254        let state = ManagementState::new(None, None, 3000);
4255
4256        // Add some mocks
4257        {
4258            let mut mocks = state.mocks.write().await;
4259            mocks.push(MockConfig {
4260                id: "1".to_string(),
4261                name: "Mock 1".to_string(),
4262                method: "GET".to_string(),
4263                path: "/test1".to_string(),
4264                response: MockResponse {
4265                    body: serde_json::json!({}),
4266                    headers: None,
4267                },
4268                enabled: true,
4269                latency_ms: None,
4270                status_code: Some(200),
4271                request_match: None,
4272                priority: None,
4273                scenario: None,
4274                required_scenario_state: None,
4275                new_scenario_state: None,
4276            });
4277            mocks.push(MockConfig {
4278                id: "2".to_string(),
4279                name: "Mock 2".to_string(),
4280                method: "POST".to_string(),
4281                path: "/test2".to_string(),
4282                response: MockResponse {
4283                    body: serde_json::json!({}),
4284                    headers: None,
4285                },
4286                enabled: false,
4287                latency_ms: None,
4288                status_code: Some(201),
4289                request_match: None,
4290                priority: None,
4291                scenario: None,
4292                required_scenario_state: None,
4293                new_scenario_state: None,
4294            });
4295        }
4296
4297        let mocks = state.mocks.read().await;
4298        assert_eq!(mocks.len(), 2);
4299        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4300    }
4301
4302    #[test]
4303    fn test_mock_matches_request_with_xpath_absolute_path() {
4304        let mock = MockConfig {
4305            id: "xpath-1".to_string(),
4306            name: "XPath Match".to_string(),
4307            method: "POST".to_string(),
4308            path: "/xml".to_string(),
4309            response: MockResponse {
4310                body: serde_json::json!({"ok": true}),
4311                headers: None,
4312            },
4313            enabled: true,
4314            latency_ms: None,
4315            status_code: Some(200),
4316            request_match: Some(RequestMatchCriteria {
4317                xpath: Some("/root/order/id".to_string()),
4318                ..Default::default()
4319            }),
4320            priority: None,
4321            scenario: None,
4322            required_scenario_state: None,
4323            new_scenario_state: None,
4324        };
4325
4326        let body = br#"<root><order><id>123</id></order></root>"#;
4327        let headers = std::collections::HashMap::new();
4328        let query = std::collections::HashMap::new();
4329
4330        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4331    }
4332
4333    #[test]
4334    fn test_mock_matches_request_with_xpath_text_predicate() {
4335        let mock = MockConfig {
4336            id: "xpath-2".to_string(),
4337            name: "XPath Predicate Match".to_string(),
4338            method: "POST".to_string(),
4339            path: "/xml".to_string(),
4340            response: MockResponse {
4341                body: serde_json::json!({"ok": true}),
4342                headers: None,
4343            },
4344            enabled: true,
4345            latency_ms: None,
4346            status_code: Some(200),
4347            request_match: Some(RequestMatchCriteria {
4348                xpath: Some("//order/id[text()='123']".to_string()),
4349                ..Default::default()
4350            }),
4351            priority: None,
4352            scenario: None,
4353            required_scenario_state: None,
4354            new_scenario_state: None,
4355        };
4356
4357        let body = br#"<root><order><id>123</id></order></root>"#;
4358        let headers = std::collections::HashMap::new();
4359        let query = std::collections::HashMap::new();
4360
4361        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4362    }
4363
4364    #[test]
4365    fn test_mock_matches_request_with_xpath_no_match() {
4366        let mock = MockConfig {
4367            id: "xpath-3".to_string(),
4368            name: "XPath No Match".to_string(),
4369            method: "POST".to_string(),
4370            path: "/xml".to_string(),
4371            response: MockResponse {
4372                body: serde_json::json!({"ok": true}),
4373                headers: None,
4374            },
4375            enabled: true,
4376            latency_ms: None,
4377            status_code: Some(200),
4378            request_match: Some(RequestMatchCriteria {
4379                xpath: Some("//order/id[text()='456']".to_string()),
4380                ..Default::default()
4381            }),
4382            priority: None,
4383            scenario: None,
4384            required_scenario_state: None,
4385            new_scenario_state: None,
4386        };
4387
4388        let body = br#"<root><order><id>123</id></order></root>"#;
4389        let headers = std::collections::HashMap::new();
4390        let query = std::collections::HashMap::new();
4391
4392        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4393    }
4394}