Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128/// Mock response configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131    /// Response body as JSON
132    pub body: serde_json::Value,
133    /// Optional custom response headers
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138/// Request matching criteria for advanced request matching
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141    /// Headers that must be present and match (case-insensitive header names)
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub headers: std::collections::HashMap<String, String>,
144    /// Query parameters that must be present and match
145    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146    pub query_params: std::collections::HashMap<String, String>,
147    /// Request body pattern (supports exact match or regex)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub body_pattern: Option<String>,
150    /// JSONPath expression for JSON body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub json_path: Option<String>,
153    /// XPath expression for XML body matching
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub xpath: Option<String>,
156    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub custom_matcher: Option<String>,
159}
160
161/// Check if a request matches the given mock configuration
162///
163/// This function implements comprehensive request matching including:
164/// - Method and path matching
165/// - Header matching (with regex support)
166/// - Query parameter matching
167/// - Body pattern matching (exact, regex, JSONPath, XPath)
168/// - Custom matcher expressions
169pub fn mock_matches_request(
170    mock: &MockConfig,
171    method: &str,
172    path: &str,
173    headers: &std::collections::HashMap<String, String>,
174    query_params: &std::collections::HashMap<String, String>,
175    body: Option<&[u8]>,
176) -> bool {
177    use regex::Regex;
178
179    // Check if mock is enabled
180    if !mock.enabled {
181        return false;
182    }
183
184    // Check method (case-insensitive)
185    if mock.method.to_uppercase() != method.to_uppercase() {
186        return false;
187    }
188
189    // Check path pattern (supports wildcards and path parameters)
190    if !path_matches_pattern(&mock.path, path) {
191        return false;
192    }
193
194    // Check request matching criteria if present
195    if let Some(criteria) = &mock.request_match {
196        // Check headers
197        for (key, expected_value) in &criteria.headers {
198            let header_key_lower = key.to_lowercase();
199            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201            if let Some((_, actual_value)) = found {
202                // Try regex match first, then exact match
203                if let Ok(re) = Regex::new(expected_value) {
204                    if !re.is_match(actual_value) {
205                        return false;
206                    }
207                } else if actual_value != expected_value {
208                    return false;
209                }
210            } else {
211                return false; // Header not found
212            }
213        }
214
215        // Check query parameters
216        for (key, expected_value) in &criteria.query_params {
217            if let Some(actual_value) = query_params.get(key) {
218                if actual_value != expected_value {
219                    return false;
220                }
221            } else {
222                return false; // Query param not found
223            }
224        }
225
226        // Check body pattern
227        if let Some(pattern) = &criteria.body_pattern {
228            if let Some(body_bytes) = body {
229                let body_str = String::from_utf8_lossy(body_bytes);
230                // Try regex first, then exact match
231                if let Ok(re) = Regex::new(pattern) {
232                    if !re.is_match(&body_str) {
233                        return false;
234                    }
235                } else if body_str.as_ref() != pattern {
236                    return false;
237                }
238            } else {
239                return false; // Body required but not present
240            }
241        }
242
243        // Check JSONPath (simplified implementation)
244        if let Some(json_path) = &criteria.json_path {
245            if let Some(body_bytes) = body {
246                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248                        // Simple JSONPath check
249                        if !json_path_exists(&json_value, json_path) {
250                            return false;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Check XPath (supports a focused subset)
258        if let Some(xpath) = &criteria.xpath {
259            if let Some(body_bytes) = body {
260                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261                    if !xml_xpath_exists(body_str, xpath) {
262                        return false;
263                    }
264                } else {
265                    return false;
266                }
267            } else {
268                return false; // Body required but not present
269            }
270        }
271
272        // Check custom matcher
273        if let Some(custom) = &criteria.custom_matcher {
274            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275                return false;
276            }
277        }
278    }
279
280    true
281}
282
283/// Check if a path matches a pattern (supports wildcards and path parameters)
284fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285    // Exact match
286    if pattern == path {
287        return true;
288    }
289
290    // Wildcard match
291    if pattern == "*" {
292        return true;
293    }
294
295    // Path parameter matching (e.g., /users/{id} matches /users/123)
296    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299    if pattern_parts.len() != path_parts.len() {
300        // Check for wildcard patterns
301        if pattern.contains('*') {
302            return matches_wildcard_pattern(pattern, path);
303        }
304        return false;
305    }
306
307    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308        // Check for path parameters {param}
309        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310            continue; // Matches any value
311        }
312
313        if pattern_part != path_part {
314            return false;
315        }
316    }
317
318    true
319}
320
321/// Check if path matches a wildcard pattern
322fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323    use regex::Regex;
324
325    // Convert pattern to regex
326    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329        return re.is_match(path);
330    }
331
332    false
333}
334
335/// Check if a JSONPath exists in a JSON value
336///
337/// Supports:
338/// - `$` — root element
339/// - `$.field.subfield` — nested object access
340/// - `$.items[0].name` — array index access
341/// - `$.items[*]` — array wildcard (checks array is non-empty)
342fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343    let path = if json_path == "$" {
344        return true;
345    } else if let Some(p) = json_path.strip_prefix("$.") {
346        p
347    } else if let Some(p) = json_path.strip_prefix('$') {
348        p.strip_prefix('.').unwrap_or(p)
349    } else {
350        json_path
351    };
352
353    let mut current = json;
354    for segment in split_json_path_segments(path) {
355        match segment {
356            JsonPathSegment::Field(name) => {
357                if let Some(obj) = current.as_object() {
358                    if let Some(value) = obj.get(name) {
359                        current = value;
360                    } else {
361                        return false;
362                    }
363                } else {
364                    return false;
365                }
366            }
367            JsonPathSegment::Index(idx) => {
368                if let Some(arr) = current.as_array() {
369                    if let Some(value) = arr.get(idx) {
370                        current = value;
371                    } else {
372                        return false;
373                    }
374                } else {
375                    return false;
376                }
377            }
378            JsonPathSegment::Wildcard => {
379                if let Some(arr) = current.as_array() {
380                    return !arr.is_empty();
381                }
382                return false;
383            }
384        }
385    }
386    true
387}
388
389enum JsonPathSegment<'a> {
390    Field(&'a str),
391    Index(usize),
392    Wildcard,
393}
394
395/// Split a JSONPath (without the leading `$`) into segments
396fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397    let mut segments = Vec::new();
398    for part in path.split('.') {
399        if part.is_empty() {
400            continue;
401        }
402        if let Some(bracket_start) = part.find('[') {
403            let field_name = &part[..bracket_start];
404            if !field_name.is_empty() {
405                segments.push(JsonPathSegment::Field(field_name));
406            }
407            let bracket_content = &part[bracket_start + 1..part.len() - 1];
408            if bracket_content == "*" {
409                segments.push(JsonPathSegment::Wildcard);
410            } else if let Ok(idx) = bracket_content.parse::<usize>() {
411                segments.push(JsonPathSegment::Index(idx));
412            }
413        } else {
414            segments.push(JsonPathSegment::Field(part));
415        }
416    }
417    segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422    name: String,
423    text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427    if segment.is_empty() {
428        return None;
429    }
430
431    let trimmed = segment.trim();
432    if let Some(bracket_start) = trimmed.find('[') {
433        if !trimmed.ends_with(']') {
434            return None;
435        }
436
437        let name = trimmed[..bracket_start].trim();
438        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439        let predicate = predicate.trim();
440
441        // Support simple predicate: [text()="value"] or [text()='value']
442        if let Some(raw) = predicate.strip_prefix("text()=") {
443            let raw = raw.trim();
444            if raw.len() >= 2
445                && ((raw.starts_with('"') && raw.ends_with('"'))
446                    || (raw.starts_with('\'') && raw.ends_with('\'')))
447            {
448                let text = raw[1..raw.len() - 1].to_string();
449                if !name.is_empty() {
450                    return Some(XPathSegment {
451                        name: name.to_string(),
452                        text_equals: Some(text),
453                    });
454                }
455            }
456        }
457
458        None
459    } else {
460        Some(XPathSegment {
461            name: trimmed.to_string(),
462            text_equals: None,
463        })
464    }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468    if !node.is_element() {
469        return false;
470    }
471    if node.tag_name().name() != segment.name {
472        return false;
473    }
474    match &segment.text_equals {
475        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476        None => true,
477    }
478}
479
480/// Check if an XPath expression matches an XML body.
481///
482/// Supported subset:
483/// - Absolute paths: `/root/child/item`
484/// - Descendant search: `//item` and `//parent/child`
485/// - Optional text predicate per segment: `item[text()="value"]`
486fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487    let doc = match roxmltree::Document::parse(xml_body) {
488        Ok(doc) => doc,
489        Err(err) => {
490            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491            return false;
492        }
493    };
494
495    let expr = xpath.trim();
496    if expr.is_empty() {
497        return false;
498    }
499
500    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501        (true, rest)
502    } else if let Some(rest) = expr.strip_prefix('/') {
503        (false, rest)
504    } else {
505        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506        return false;
507    };
508
509    let segments: Vec<XPathSegment> = path_str
510        .split('/')
511        .filter(|s| !s.trim().is_empty())
512        .filter_map(parse_xpath_segment)
513        .collect();
514
515    if segments.is_empty() {
516        return false;
517    }
518
519    if is_descendant {
520        let first = &segments[0];
521        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522            let mut frontier = vec![node];
523            for segment in &segments[1..] {
524                let mut next_frontier = Vec::new();
525                for parent in &frontier {
526                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527                        next_frontier.push(child);
528                    }
529                }
530                if next_frontier.is_empty() {
531                    frontier.clear();
532                    break;
533                }
534                frontier = next_frontier;
535            }
536            if !frontier.is_empty() {
537                return true;
538            }
539        }
540        false
541    } else {
542        let mut frontier = vec![doc.root_element()];
543        for (index, segment) in segments.iter().enumerate() {
544            let mut next_frontier = Vec::new();
545            for parent in &frontier {
546                if index == 0 {
547                    if segment_matches(*parent, segment) {
548                        next_frontier.push(*parent);
549                    }
550                    continue;
551                }
552                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553                    next_frontier.push(child);
554                }
555            }
556            if next_frontier.is_empty() {
557                return false;
558            }
559            frontier = next_frontier;
560        }
561        !frontier.is_empty()
562    }
563}
564
565/// Evaluate a custom matcher expression
566fn evaluate_custom_matcher(
567    expression: &str,
568    method: &str,
569    path: &str,
570    headers: &std::collections::HashMap<String, String>,
571    query_params: &std::collections::HashMap<String, String>,
572    body: Option<&[u8]>,
573) -> bool {
574    use regex::Regex;
575
576    let expr = expression.trim();
577
578    // Handle equality expressions (field == "value")
579    if expr.contains("==") {
580        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581        if parts.len() != 2 {
582            return false;
583        }
584
585        let field = parts[0];
586        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588        match field {
589            "method" => method == expected_value,
590            "path" => path == expected_value,
591            _ if field.starts_with("headers.") => {
592                let header_name = &field[8..];
593                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594            }
595            _ if field.starts_with("query.") => {
596                let param_name = &field[6..];
597                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598            }
599            _ => false,
600        }
601    }
602    // Handle regex match expressions (field =~ "pattern")
603    else if expr.contains("=~") {
604        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605        if parts.len() != 2 {
606            return false;
607        }
608
609        let field = parts[0];
610        let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612        if let Ok(re) = Regex::new(pattern) {
613            match field {
614                "method" => re.is_match(method),
615                "path" => re.is_match(path),
616                _ if field.starts_with("headers.") => {
617                    let header_name = &field[8..];
618                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619                }
620                _ if field.starts_with("query.") => {
621                    let param_name = &field[6..];
622                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623                }
624                _ => false,
625            }
626        } else {
627            false
628        }
629    }
630    // Handle contains expressions (field contains "value")
631    else if expr.contains("contains") {
632        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633        if parts.len() != 2 {
634            return false;
635        }
636
637        let field = parts[0];
638        let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640        match field {
641            "path" => path.contains(search_value),
642            _ if field.starts_with("headers.") => {
643                let header_name = &field[8..];
644                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645            }
646            _ if field.starts_with("body") => {
647                if let Some(body_bytes) = body {
648                    let body_str = String::from_utf8_lossy(body_bytes);
649                    body_str.contains(search_value)
650                } else {
651                    false
652                }
653            }
654            _ => false,
655        }
656    } else {
657        // Unknown expression format
658        tracing::warn!("Unknown custom matcher expression format: {}", expr);
659        false
660    }
661}
662
663/// Server statistics
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666    /// Server uptime in seconds
667    pub uptime_seconds: u64,
668    /// Total number of requests processed
669    pub total_requests: u64,
670    /// Number of active mock configurations
671    pub active_mocks: usize,
672    /// Number of currently enabled mocks
673    pub enabled_mocks: usize,
674    /// Number of registered API routes
675    pub registered_routes: usize,
676}
677
678/// Server configuration info
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681    /// MockForge version string
682    pub version: String,
683    /// Server port number
684    pub port: u16,
685    /// Whether an OpenAPI spec is loaded
686    pub has_openapi_spec: bool,
687    /// Optional path to the OpenAPI spec file
688    #[serde(skip_serializing_if = "Option::is_none")]
689    pub spec_path: Option<String>,
690}
691
692/// Shared state for the management API
693#[derive(Clone)]
694pub struct ManagementState {
695    /// Collection of mock configurations
696    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697    /// Optional OpenAPI specification
698    pub spec: Option<Arc<OpenApiSpec>>,
699    /// Optional path to the OpenAPI spec file
700    pub spec_path: Option<String>,
701    /// Server port number
702    pub port: u16,
703    /// Server start time for uptime calculation
704    pub start_time: std::time::Instant,
705    /// Counter for total requests processed
706    pub request_counter: Arc<RwLock<u64>>,
707    /// Optional proxy configuration for migration pipeline
708    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709    /// Optional SMTP registry for email mocking
710    #[cfg(feature = "smtp")]
711    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712    /// Optional MQTT broker for message mocking
713    #[cfg(feature = "mqtt")]
714    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715    /// Optional Kafka broker for event streaming
716    #[cfg(feature = "kafka")]
717    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718    /// Broadcast channel for message events (MQTT & Kafka)
719    #[cfg(any(feature = "mqtt", feature = "kafka"))]
720    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721    /// State machine manager for scenario state machines
722    pub state_machine_manager:
723        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724    /// Optional WebSocket broadcast channel for real-time updates
725    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726    /// Lifecycle hook registry for extensibility
727    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728    /// Rule explanations storage (in-memory for now)
729    pub rule_explanations: Arc<
730        RwLock<
731            std::collections::HashMap<
732                String,
733                mockforge_core::intelligent_behavior::RuleExplanation,
734            >,
735        >,
736    >,
737    /// Optional chaos API state for chaos config management
738    #[cfg(feature = "chaos")]
739    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740    /// Optional server configuration for profile application
741    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742    /// Conformance testing state
743    #[cfg(feature = "conformance")]
744    pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748    /// Create a new management state
749    ///
750    /// # Arguments
751    /// * `spec` - Optional OpenAPI specification
752    /// * `spec_path` - Optional path to the OpenAPI spec file
753    /// * `port` - Server port number
754    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755        Self {
756            mocks: Arc::new(RwLock::new(Vec::new())),
757            spec,
758            spec_path,
759            port,
760            start_time: std::time::Instant::now(),
761            request_counter: Arc::new(RwLock::new(0)),
762            proxy_config: None,
763            #[cfg(feature = "smtp")]
764            smtp_registry: None,
765            #[cfg(feature = "mqtt")]
766            mqtt_broker: None,
767            #[cfg(feature = "kafka")]
768            kafka_broker: None,
769            #[cfg(any(feature = "mqtt", feature = "kafka"))]
770            message_events: {
771                let capacity = get_message_broadcast_capacity();
772                let (tx, _) = broadcast::channel(capacity);
773                Arc::new(tx)
774            },
775            state_machine_manager: Arc::new(RwLock::new(
776                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777            )),
778            ws_broadcast: None,
779            lifecycle_hooks: None,
780            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781            #[cfg(feature = "chaos")]
782            chaos_api_state: None,
783            server_config: None,
784            #[cfg(feature = "conformance")]
785            conformance_state: crate::handlers::conformance::ConformanceState::new(),
786        }
787    }
788
789    /// Add lifecycle hook registry to management state
790    pub fn with_lifecycle_hooks(
791        mut self,
792        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793    ) -> Self {
794        self.lifecycle_hooks = Some(hooks);
795        self
796    }
797
798    /// Add WebSocket broadcast channel to management state
799    pub fn with_ws_broadcast(
800        mut self,
801        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802    ) -> Self {
803        self.ws_broadcast = Some(ws_broadcast);
804        self
805    }
806
807    /// Add proxy configuration to management state
808    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809        self.proxy_config = Some(proxy_config);
810        self
811    }
812
813    #[cfg(feature = "smtp")]
814    /// Add SMTP registry to management state
815    pub fn with_smtp_registry(
816        mut self,
817        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818    ) -> Self {
819        self.smtp_registry = Some(smtp_registry);
820        self
821    }
822
823    #[cfg(feature = "mqtt")]
824    /// Add MQTT broker to management state
825    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826        self.mqtt_broker = Some(mqtt_broker);
827        self
828    }
829
830    #[cfg(feature = "kafka")]
831    /// Add Kafka broker to management state
832    pub fn with_kafka_broker(
833        mut self,
834        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835    ) -> Self {
836        self.kafka_broker = Some(kafka_broker);
837        self
838    }
839
840    #[cfg(feature = "chaos")]
841    /// Add chaos API state to management state
842    pub fn with_chaos_api_state(
843        mut self,
844        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845    ) -> Self {
846        self.chaos_api_state = Some(chaos_api_state);
847        self
848    }
849
850    /// Add server configuration to management state
851    pub fn with_server_config(
852        mut self,
853        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854    ) -> Self {
855        self.server_config = Some(server_config);
856        self
857    }
858}
859
860/// List all mocks
861async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862    let mocks = state.mocks.read().await;
863    Json(serde_json::json!({
864        "mocks": *mocks,
865        "total": mocks.len(),
866        "enabled": mocks.iter().filter(|m| m.enabled).count()
867    }))
868}
869
870/// Get a specific mock by ID
871async fn get_mock(
872    State(state): State<ManagementState>,
873    Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875    let mocks = state.mocks.read().await;
876    mocks
877        .iter()
878        .find(|m| m.id == id)
879        .cloned()
880        .map(Json)
881        .ok_or(StatusCode::NOT_FOUND)
882}
883
884/// Create a new mock
885async fn create_mock(
886    State(state): State<ManagementState>,
887    Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889    let mut mocks = state.mocks.write().await;
890
891    // Generate ID if not provided
892    if mock.id.is_empty() {
893        mock.id = uuid::Uuid::new_v4().to_string();
894    }
895
896    // Check for duplicate ID
897    if mocks.iter().any(|m| m.id == mock.id) {
898        return Err(StatusCode::CONFLICT);
899    }
900
901    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903    // Invoke lifecycle hooks
904    if let Some(hooks) = &state.lifecycle_hooks {
905        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906            id: mock.id.clone(),
907            name: mock.name.clone(),
908            config: serde_json::to_value(&mock).unwrap_or_default(),
909        };
910        hooks.invoke_mock_created(&event).await;
911    }
912
913    mocks.push(mock.clone());
914
915    // Broadcast WebSocket event
916    if let Some(tx) = &state.ws_broadcast {
917        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918    }
919
920    Ok(Json(mock))
921}
922
923/// Update an existing mock
924async fn update_mock(
925    State(state): State<ManagementState>,
926    Path(id): Path<String>,
927    Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929    let mut mocks = state.mocks.write().await;
930
931    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933    // Get old mock for comparison
934    let old_mock = mocks[position].clone();
935
936    info!("Updating mock: {}", id);
937    mocks[position] = updated_mock.clone();
938
939    // Invoke lifecycle hooks
940    if let Some(hooks) = &state.lifecycle_hooks {
941        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942            id: updated_mock.id.clone(),
943            name: updated_mock.name.clone(),
944            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945        };
946        hooks.invoke_mock_updated(&event).await;
947
948        // Check if enabled state changed
949        if old_mock.enabled != updated_mock.enabled {
950            let state_event = if updated_mock.enabled {
951                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952                    id: updated_mock.id.clone(),
953                }
954            } else {
955                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956                    id: updated_mock.id.clone(),
957                }
958            };
959            hooks.invoke_mock_state_changed(&state_event).await;
960        }
961    }
962
963    // Broadcast WebSocket event
964    if let Some(tx) = &state.ws_broadcast {
965        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966    }
967
968    Ok(Json(updated_mock))
969}
970
971/// Delete a mock
972async fn delete_mock(
973    State(state): State<ManagementState>,
974    Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976    let mut mocks = state.mocks.write().await;
977
978    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980    // Get mock info before deletion for lifecycle hooks
981    let deleted_mock = mocks[position].clone();
982
983    info!("Deleting mock: {}", id);
984    mocks.remove(position);
985
986    // Invoke lifecycle hooks
987    if let Some(hooks) = &state.lifecycle_hooks {
988        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989            id: deleted_mock.id.clone(),
990            name: deleted_mock.name.clone(),
991        };
992        hooks.invoke_mock_deleted(&event).await;
993    }
994
995    // Broadcast WebSocket event
996    if let Some(tx) = &state.ws_broadcast {
997        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998    }
999
1000    Ok(StatusCode::NO_CONTENT)
1001}
1002
1003/// Request to validate configuration
1004#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006    /// Configuration to validate (as JSON)
1007    pub config: serde_json::Value,
1008    /// Format of the configuration ("json" or "yaml")
1009    #[serde(default = "default_format")]
1010    pub format: String,
1011}
1012
1013fn default_format() -> String {
1014    "json".to_string()
1015}
1016
1017/// Validate configuration without applying it
1018async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019    use mockforge_core::config::ServerConfig;
1020
1021    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022        "yaml" | "yml" => {
1023            let yaml_str = match serde_json::to_string(&request.config) {
1024                Ok(s) => s,
1025                Err(e) => {
1026                    return (
1027                        StatusCode::BAD_REQUEST,
1028                        Json(serde_json::json!({
1029                            "valid": false,
1030                            "error": format!("Failed to convert to string: {}", e),
1031                            "message": "Configuration validation failed"
1032                        })),
1033                    )
1034                        .into_response();
1035                }
1036            };
1037            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038        }
1039        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040    };
1041
1042    match config_result {
1043        Ok(_) => Json(serde_json::json!({
1044            "valid": true,
1045            "message": "Configuration is valid"
1046        }))
1047        .into_response(),
1048        Err(e) => (
1049            StatusCode::BAD_REQUEST,
1050            Json(serde_json::json!({
1051                "valid": false,
1052                "error": format!("Invalid configuration: {}", e),
1053                "message": "Configuration validation failed"
1054            })),
1055        )
1056            .into_response(),
1057    }
1058}
1059
1060/// Request for bulk configuration update
1061#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063    /// Partial configuration updates (only specified fields will be updated)
1064    pub updates: serde_json::Value,
1065}
1066
1067/// Bulk update configuration
1068///
1069/// This endpoint allows updating multiple configuration options at once.
1070/// Only the specified fields in the updates object will be modified.
1071///
1072/// Configuration updates are applied to the server configuration if available
1073/// in ManagementState. Changes take effect immediately for supported settings.
1074async fn bulk_update_config(
1075    State(state): State<ManagementState>,
1076    Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078    // Validate the updates structure
1079    if !request.updates.is_object() {
1080        return (
1081            StatusCode::BAD_REQUEST,
1082            Json(serde_json::json!({
1083                "error": "Invalid request",
1084                "message": "Updates must be a JSON object"
1085            })),
1086        )
1087            .into_response();
1088    }
1089
1090    // Try to validate as partial ServerConfig
1091    use mockforge_core::config::ServerConfig;
1092
1093    // Create a minimal valid config and try to merge updates
1094    let base_config = ServerConfig::default();
1095    let base_json = match serde_json::to_value(&base_config) {
1096        Ok(v) => v,
1097        Err(e) => {
1098            return (
1099                StatusCode::INTERNAL_SERVER_ERROR,
1100                Json(serde_json::json!({
1101                    "error": "Internal error",
1102                    "message": format!("Failed to serialize base config: {}", e)
1103                })),
1104            )
1105                .into_response();
1106        }
1107    };
1108
1109    // Merge updates into base config (simplified merge)
1110    let mut merged = base_json.clone();
1111    if let (Some(merged_obj), Some(updates_obj)) =
1112        (merged.as_object_mut(), request.updates.as_object())
1113    {
1114        for (key, value) in updates_obj {
1115            merged_obj.insert(key.clone(), value.clone());
1116        }
1117    }
1118
1119    // Validate the merged config
1120    match serde_json::from_value::<ServerConfig>(merged) {
1121        Ok(validated_config) => {
1122            // Apply config if server_config is available in ManagementState
1123            if let Some(ref config_lock) = state.server_config {
1124                let mut config = config_lock.write().await;
1125                *config = validated_config;
1126                Json(serde_json::json!({
1127                    "success": true,
1128                    "message": "Bulk configuration update applied successfully",
1129                    "updates_received": request.updates,
1130                    "validated": true,
1131                    "applied": true
1132                }))
1133                .into_response()
1134            } else {
1135                Json(serde_json::json!({
1136                    "success": true,
1137                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138                    "updates_received": request.updates,
1139                    "validated": true,
1140                    "applied": false
1141                }))
1142                .into_response()
1143            }
1144        }
1145        Err(e) => (
1146            StatusCode::BAD_REQUEST,
1147            Json(serde_json::json!({
1148                "error": "Invalid configuration",
1149                "message": format!("Configuration validation failed: {}", e),
1150                "validated": false
1151            })),
1152        )
1153            .into_response(),
1154    }
1155}
1156
1157/// Get server statistics
1158async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159    let mocks = state.mocks.read().await;
1160    let request_count = *state.request_counter.read().await;
1161
1162    Json(ServerStats {
1163        uptime_seconds: state.start_time.elapsed().as_secs(),
1164        total_requests: request_count,
1165        active_mocks: mocks.len(),
1166        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1168    })
1169}
1170
1171/// Get server configuration
1172async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173    Json(ServerConfig {
1174        version: env!("CARGO_PKG_VERSION").to_string(),
1175        port: state.port,
1176        has_openapi_spec: state.spec.is_some(),
1177        spec_path: state.spec_path.clone(),
1178    })
1179}
1180
1181/// Serve the loaded OpenAPI spec as JSON
1182async fn get_openapi_spec(
1183    State(state): State<ManagementState>,
1184) -> Result<Json<serde_json::Value>, StatusCode> {
1185    match &state.spec {
1186        Some(spec) => match &spec.raw_document {
1187            Some(doc) => Ok(Json(doc.clone())),
1188            None => {
1189                // Fall back to serializing the parsed spec
1190                match serde_json::to_value(&spec.spec) {
1191                    Ok(val) => Ok(Json(val)),
1192                    Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1193                }
1194            }
1195        },
1196        None => Err(StatusCode::NOT_FOUND),
1197    }
1198}
1199
1200/// Health check endpoint
1201async fn health_check() -> Json<serde_json::Value> {
1202    Json(serde_json::json!({
1203        "status": "healthy",
1204        "service": "mockforge-management",
1205        "timestamp": chrono::Utc::now().to_rfc3339()
1206    }))
1207}
1208
1209/// Export format for mock configurations
1210#[derive(Debug, Clone, Serialize, Deserialize)]
1211#[serde(rename_all = "lowercase")]
1212pub enum ExportFormat {
1213    /// JSON format
1214    Json,
1215    /// YAML format
1216    Yaml,
1217}
1218
1219/// Export mocks in specified format
1220async fn export_mocks(
1221    State(state): State<ManagementState>,
1222    Query(params): Query<std::collections::HashMap<String, String>>,
1223) -> Result<(StatusCode, String), StatusCode> {
1224    let mocks = state.mocks.read().await;
1225
1226    let format = params
1227        .get("format")
1228        .map(|f| match f.as_str() {
1229            "yaml" | "yml" => ExportFormat::Yaml,
1230            _ => ExportFormat::Json,
1231        })
1232        .unwrap_or(ExportFormat::Json);
1233
1234    match format {
1235        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1236            .map(|json| (StatusCode::OK, json))
1237            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1238        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1239            .map(|yaml| (StatusCode::OK, yaml))
1240            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1241    }
1242}
1243
1244/// Import mocks from JSON/YAML
1245async fn import_mocks(
1246    State(state): State<ManagementState>,
1247    Json(mocks): Json<Vec<MockConfig>>,
1248) -> impl IntoResponse {
1249    let mut current_mocks = state.mocks.write().await;
1250    current_mocks.clear();
1251    current_mocks.extend(mocks);
1252    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1253}
1254
1255#[cfg(feature = "smtp")]
1256/// List SMTP emails in mailbox
1257async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1258    if let Some(ref smtp_registry) = state.smtp_registry {
1259        match smtp_registry.get_emails() {
1260            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1261            Err(e) => (
1262                StatusCode::INTERNAL_SERVER_ERROR,
1263                Json(serde_json::json!({
1264                    "error": "Failed to retrieve emails",
1265                    "message": e.to_string()
1266                })),
1267            ),
1268        }
1269    } else {
1270        (
1271            StatusCode::NOT_IMPLEMENTED,
1272            Json(serde_json::json!({
1273                "error": "SMTP mailbox management not available",
1274                "message": "SMTP server is not enabled or registry not available."
1275            })),
1276        )
1277    }
1278}
1279
1280/// Get specific SMTP email
1281#[cfg(feature = "smtp")]
1282async fn get_smtp_email(
1283    State(state): State<ManagementState>,
1284    Path(id): Path<String>,
1285) -> impl IntoResponse {
1286    if let Some(ref smtp_registry) = state.smtp_registry {
1287        match smtp_registry.get_email_by_id(&id) {
1288            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1289            Ok(None) => (
1290                StatusCode::NOT_FOUND,
1291                Json(serde_json::json!({
1292                    "error": "Email not found",
1293                    "id": id
1294                })),
1295            ),
1296            Err(e) => (
1297                StatusCode::INTERNAL_SERVER_ERROR,
1298                Json(serde_json::json!({
1299                    "error": "Failed to retrieve email",
1300                    "message": e.to_string()
1301                })),
1302            ),
1303        }
1304    } else {
1305        (
1306            StatusCode::NOT_IMPLEMENTED,
1307            Json(serde_json::json!({
1308                "error": "SMTP mailbox management not available",
1309                "message": "SMTP server is not enabled or registry not available."
1310            })),
1311        )
1312    }
1313}
1314
1315/// Clear SMTP mailbox
1316#[cfg(feature = "smtp")]
1317async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1318    if let Some(ref smtp_registry) = state.smtp_registry {
1319        match smtp_registry.clear_mailbox() {
1320            Ok(()) => (
1321                StatusCode::OK,
1322                Json(serde_json::json!({
1323                    "message": "Mailbox cleared successfully"
1324                })),
1325            ),
1326            Err(e) => (
1327                StatusCode::INTERNAL_SERVER_ERROR,
1328                Json(serde_json::json!({
1329                    "error": "Failed to clear mailbox",
1330                    "message": e.to_string()
1331                })),
1332            ),
1333        }
1334    } else {
1335        (
1336            StatusCode::NOT_IMPLEMENTED,
1337            Json(serde_json::json!({
1338                "error": "SMTP mailbox management not available",
1339                "message": "SMTP server is not enabled or registry not available."
1340            })),
1341        )
1342    }
1343}
1344
1345/// Export SMTP mailbox
1346#[cfg(feature = "smtp")]
1347async fn export_smtp_mailbox(
1348    Query(params): Query<std::collections::HashMap<String, String>>,
1349) -> impl IntoResponse {
1350    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1351    (
1352        StatusCode::NOT_IMPLEMENTED,
1353        Json(serde_json::json!({
1354            "error": "SMTP mailbox management not available via HTTP API",
1355            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1356            "requested_format": format
1357        })),
1358    )
1359}
1360
1361/// Search SMTP emails
1362#[cfg(feature = "smtp")]
1363async fn search_smtp_emails(
1364    State(state): State<ManagementState>,
1365    Query(params): Query<std::collections::HashMap<String, String>>,
1366) -> impl IntoResponse {
1367    if let Some(ref smtp_registry) = state.smtp_registry {
1368        let filters = EmailSearchFilters {
1369            sender: params.get("sender").cloned(),
1370            recipient: params.get("recipient").cloned(),
1371            subject: params.get("subject").cloned(),
1372            body: params.get("body").cloned(),
1373            since: params
1374                .get("since")
1375                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1376                .map(|dt| dt.with_timezone(&chrono::Utc)),
1377            until: params
1378                .get("until")
1379                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1380                .map(|dt| dt.with_timezone(&chrono::Utc)),
1381            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1382            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1383        };
1384
1385        match smtp_registry.search_emails(filters) {
1386            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1387            Err(e) => (
1388                StatusCode::INTERNAL_SERVER_ERROR,
1389                Json(serde_json::json!({
1390                    "error": "Failed to search emails",
1391                    "message": e.to_string()
1392                })),
1393            ),
1394        }
1395    } else {
1396        (
1397            StatusCode::NOT_IMPLEMENTED,
1398            Json(serde_json::json!({
1399                "error": "SMTP mailbox management not available",
1400                "message": "SMTP server is not enabled or registry not available."
1401            })),
1402        )
1403    }
1404}
1405
1406/// MQTT broker statistics
1407#[cfg(feature = "mqtt")]
1408#[derive(Debug, Clone, Serialize, Deserialize)]
1409pub struct MqttBrokerStats {
1410    /// Number of connected MQTT clients
1411    pub connected_clients: usize,
1412    /// Number of active MQTT topics
1413    pub active_topics: usize,
1414    /// Number of retained messages
1415    pub retained_messages: usize,
1416    /// Total number of subscriptions
1417    pub total_subscriptions: usize,
1418}
1419
1420/// MQTT management handlers
1421#[cfg(feature = "mqtt")]
1422async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1423    if let Some(broker) = &state.mqtt_broker {
1424        let connected_clients = broker.get_connected_clients().await.len();
1425        let active_topics = broker.get_active_topics().await.len();
1426        let stats = broker.get_topic_stats().await;
1427
1428        let broker_stats = MqttBrokerStats {
1429            connected_clients,
1430            active_topics,
1431            retained_messages: stats.retained_messages,
1432            total_subscriptions: stats.total_subscriptions,
1433        };
1434
1435        Json(broker_stats).into_response()
1436    } else {
1437        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1438    }
1439}
1440
1441#[cfg(feature = "mqtt")]
1442async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1443    if let Some(broker) = &state.mqtt_broker {
1444        let clients = broker.get_connected_clients().await;
1445        Json(serde_json::json!({
1446            "clients": clients
1447        }))
1448        .into_response()
1449    } else {
1450        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1451    }
1452}
1453
1454#[cfg(feature = "mqtt")]
1455async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1456    if let Some(broker) = &state.mqtt_broker {
1457        let topics = broker.get_active_topics().await;
1458        Json(serde_json::json!({
1459            "topics": topics
1460        }))
1461        .into_response()
1462    } else {
1463        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1464    }
1465}
1466
1467#[cfg(feature = "mqtt")]
1468async fn disconnect_mqtt_client(
1469    State(state): State<ManagementState>,
1470    Path(client_id): Path<String>,
1471) -> impl IntoResponse {
1472    if let Some(broker) = &state.mqtt_broker {
1473        match broker.disconnect_client(&client_id).await {
1474            Ok(_) => {
1475                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1476            }
1477            Err(e) => {
1478                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1479                    .into_response()
1480            }
1481        }
1482    } else {
1483        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1484    }
1485}
1486
1487// ========== MQTT Publish Handler ==========
1488
1489#[cfg(feature = "mqtt")]
1490/// Request to publish a single MQTT message
1491#[derive(Debug, Deserialize)]
1492pub struct MqttPublishRequest {
1493    /// Topic to publish to
1494    pub topic: String,
1495    /// Message payload (string or JSON)
1496    pub payload: String,
1497    /// QoS level (0, 1, or 2)
1498    #[serde(default = "default_qos")]
1499    pub qos: u8,
1500    /// Whether to retain the message
1501    #[serde(default)]
1502    pub retain: bool,
1503}
1504
1505#[cfg(feature = "mqtt")]
1506fn default_qos() -> u8 {
1507    0
1508}
1509
1510#[cfg(feature = "mqtt")]
1511/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1512async fn publish_mqtt_message_handler(
1513    State(state): State<ManagementState>,
1514    Json(request): Json<serde_json::Value>,
1515) -> impl IntoResponse {
1516    // Extract fields from JSON manually
1517    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1518    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1519    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1520    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1521
1522    if topic.is_none() || payload.is_none() {
1523        return (
1524            StatusCode::BAD_REQUEST,
1525            Json(serde_json::json!({
1526                "error": "Invalid request",
1527                "message": "Missing required fields: topic and payload"
1528            })),
1529        );
1530    }
1531
1532    let topic = topic.unwrap();
1533    let payload = payload.unwrap();
1534
1535    if let Some(broker) = &state.mqtt_broker {
1536        // Validate QoS
1537        if qos > 2 {
1538            return (
1539                StatusCode::BAD_REQUEST,
1540                Json(serde_json::json!({
1541                    "error": "Invalid QoS",
1542                    "message": "QoS must be 0, 1, or 2"
1543                })),
1544            );
1545        }
1546
1547        // Convert payload to bytes
1548        let payload_bytes = payload.as_bytes().to_vec();
1549        let client_id = "mockforge-management-api".to_string();
1550
1551        let publish_result = broker
1552            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1553            .await
1554            .map_err(|e| format!("{}", e));
1555
1556        match publish_result {
1557            Ok(_) => {
1558                // Emit message event for real-time monitoring
1559                let event = MessageEvent::Mqtt(MqttMessageEvent {
1560                    topic: topic.clone(),
1561                    payload: payload.clone(),
1562                    qos,
1563                    retain,
1564                    timestamp: chrono::Utc::now().to_rfc3339(),
1565                });
1566                let _ = state.message_events.send(event);
1567
1568                (
1569                    StatusCode::OK,
1570                    Json(serde_json::json!({
1571                        "success": true,
1572                        "message": format!("Message published to topic '{}'", topic),
1573                        "topic": topic,
1574                        "qos": qos,
1575                        "retain": retain
1576                    })),
1577                )
1578            }
1579            Err(error_msg) => (
1580                StatusCode::INTERNAL_SERVER_ERROR,
1581                Json(serde_json::json!({
1582                    "error": "Failed to publish message",
1583                    "message": error_msg
1584                })),
1585            ),
1586        }
1587    } else {
1588        (
1589            StatusCode::SERVICE_UNAVAILABLE,
1590            Json(serde_json::json!({
1591                "error": "MQTT broker not available",
1592                "message": "MQTT broker is not enabled or not available."
1593            })),
1594        )
1595    }
1596}
1597
1598#[cfg(not(feature = "mqtt"))]
1599/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1600async fn publish_mqtt_message_handler(
1601    State(_state): State<ManagementState>,
1602    Json(_request): Json<serde_json::Value>,
1603) -> impl IntoResponse {
1604    (
1605        StatusCode::SERVICE_UNAVAILABLE,
1606        Json(serde_json::json!({
1607            "error": "MQTT feature not enabled",
1608            "message": "MQTT support is not compiled into this build"
1609        })),
1610    )
1611}
1612
1613#[cfg(feature = "mqtt")]
1614/// Request to publish multiple MQTT messages
1615#[derive(Debug, Deserialize)]
1616pub struct MqttBatchPublishRequest {
1617    /// List of messages to publish
1618    pub messages: Vec<MqttPublishRequest>,
1619    /// Delay between messages in milliseconds
1620    #[serde(default = "default_delay")]
1621    pub delay_ms: u64,
1622}
1623
1624#[cfg(feature = "mqtt")]
1625fn default_delay() -> u64 {
1626    100
1627}
1628
1629#[cfg(feature = "mqtt")]
1630/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1631async fn publish_mqtt_batch_handler(
1632    State(state): State<ManagementState>,
1633    Json(request): Json<serde_json::Value>,
1634) -> impl IntoResponse {
1635    // Extract fields from JSON manually
1636    let messages_json = request.get("messages").and_then(|v| v.as_array());
1637    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1638
1639    if messages_json.is_none() {
1640        return (
1641            StatusCode::BAD_REQUEST,
1642            Json(serde_json::json!({
1643                "error": "Invalid request",
1644                "message": "Missing required field: messages"
1645            })),
1646        );
1647    }
1648
1649    let messages_json = messages_json.unwrap();
1650
1651    if let Some(broker) = &state.mqtt_broker {
1652        if messages_json.is_empty() {
1653            return (
1654                StatusCode::BAD_REQUEST,
1655                Json(serde_json::json!({
1656                    "error": "Empty batch",
1657                    "message": "At least one message is required"
1658                })),
1659            );
1660        }
1661
1662        let mut results = Vec::new();
1663        let client_id = "mockforge-management-api".to_string();
1664
1665        for (index, msg_json) in messages_json.iter().enumerate() {
1666            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1667            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1668            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1669            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1670
1671            if topic.is_none() || payload.is_none() {
1672                results.push(serde_json::json!({
1673                    "index": index,
1674                    "success": false,
1675                    "error": "Missing required fields: topic and payload"
1676                }));
1677                continue;
1678            }
1679
1680            let topic = topic.unwrap();
1681            let payload = payload.unwrap();
1682
1683            // Validate QoS
1684            if qos > 2 {
1685                results.push(serde_json::json!({
1686                    "index": index,
1687                    "success": false,
1688                    "error": "Invalid QoS (must be 0, 1, or 2)"
1689                }));
1690                continue;
1691            }
1692
1693            // Convert payload to bytes
1694            let payload_bytes = payload.as_bytes().to_vec();
1695
1696            let publish_result = broker
1697                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1698                .await
1699                .map_err(|e| format!("{}", e));
1700
1701            match publish_result {
1702                Ok(_) => {
1703                    // Emit message event
1704                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1705                        topic: topic.clone(),
1706                        payload: payload.clone(),
1707                        qos,
1708                        retain,
1709                        timestamp: chrono::Utc::now().to_rfc3339(),
1710                    });
1711                    let _ = state.message_events.send(event);
1712
1713                    results.push(serde_json::json!({
1714                        "index": index,
1715                        "success": true,
1716                        "topic": topic,
1717                        "qos": qos
1718                    }));
1719                }
1720                Err(error_msg) => {
1721                    results.push(serde_json::json!({
1722                        "index": index,
1723                        "success": false,
1724                        "error": error_msg
1725                    }));
1726                }
1727            }
1728
1729            // Add delay between messages (except for the last one)
1730            if index < messages_json.len() - 1 && delay_ms > 0 {
1731                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1732            }
1733        }
1734
1735        let success_count =
1736            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1737
1738        (
1739            StatusCode::OK,
1740            Json(serde_json::json!({
1741                "success": true,
1742                "total": messages_json.len(),
1743                "succeeded": success_count,
1744                "failed": messages_json.len() - success_count,
1745                "results": results
1746            })),
1747        )
1748    } else {
1749        (
1750            StatusCode::SERVICE_UNAVAILABLE,
1751            Json(serde_json::json!({
1752                "error": "MQTT broker not available",
1753                "message": "MQTT broker is not enabled or not available."
1754            })),
1755        )
1756    }
1757}
1758
1759#[cfg(not(feature = "mqtt"))]
1760/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1761async fn publish_mqtt_batch_handler(
1762    State(_state): State<ManagementState>,
1763    Json(_request): Json<serde_json::Value>,
1764) -> impl IntoResponse {
1765    (
1766        StatusCode::SERVICE_UNAVAILABLE,
1767        Json(serde_json::json!({
1768            "error": "MQTT feature not enabled",
1769            "message": "MQTT support is not compiled into this build"
1770        })),
1771    )
1772}
1773
1774// Migration pipeline handlers
1775
1776/// Request to set migration mode
1777#[derive(Debug, Deserialize)]
1778struct SetMigrationModeRequest {
1779    mode: String,
1780}
1781
1782/// Get all migration routes
1783async fn get_migration_routes(
1784    State(state): State<ManagementState>,
1785) -> Result<Json<serde_json::Value>, StatusCode> {
1786    let proxy_config = match &state.proxy_config {
1787        Some(config) => config,
1788        None => {
1789            return Ok(Json(serde_json::json!({
1790                "error": "Migration not configured. Proxy config not available."
1791            })));
1792        }
1793    };
1794
1795    let config = proxy_config.read().await;
1796    let routes = config.get_migration_routes();
1797
1798    Ok(Json(serde_json::json!({
1799        "routes": routes
1800    })))
1801}
1802
1803/// Toggle a route's migration mode
1804async fn toggle_route_migration(
1805    State(state): State<ManagementState>,
1806    Path(pattern): Path<String>,
1807) -> Result<Json<serde_json::Value>, StatusCode> {
1808    let proxy_config = match &state.proxy_config {
1809        Some(config) => config,
1810        None => {
1811            return Ok(Json(serde_json::json!({
1812                "error": "Migration not configured. Proxy config not available."
1813            })));
1814        }
1815    };
1816
1817    let mut config = proxy_config.write().await;
1818    let new_mode = match config.toggle_route_migration(&pattern) {
1819        Some(mode) => mode,
1820        None => {
1821            return Ok(Json(serde_json::json!({
1822                "error": format!("Route pattern not found: {}", pattern)
1823            })));
1824        }
1825    };
1826
1827    Ok(Json(serde_json::json!({
1828        "pattern": pattern,
1829        "mode": format!("{:?}", new_mode).to_lowercase()
1830    })))
1831}
1832
1833/// Set a route's migration mode explicitly
1834async fn set_route_migration_mode(
1835    State(state): State<ManagementState>,
1836    Path(pattern): Path<String>,
1837    Json(request): Json<SetMigrationModeRequest>,
1838) -> Result<Json<serde_json::Value>, StatusCode> {
1839    let proxy_config = match &state.proxy_config {
1840        Some(config) => config,
1841        None => {
1842            return Ok(Json(serde_json::json!({
1843                "error": "Migration not configured. Proxy config not available."
1844            })));
1845        }
1846    };
1847
1848    use mockforge_core::proxy::config::MigrationMode;
1849    let mode = match request.mode.to_lowercase().as_str() {
1850        "mock" => MigrationMode::Mock,
1851        "shadow" => MigrationMode::Shadow,
1852        "real" => MigrationMode::Real,
1853        "auto" => MigrationMode::Auto,
1854        _ => {
1855            return Ok(Json(serde_json::json!({
1856                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1857            })));
1858        }
1859    };
1860
1861    let mut config = proxy_config.write().await;
1862    let updated = config.update_rule_migration_mode(&pattern, mode);
1863
1864    if !updated {
1865        return Ok(Json(serde_json::json!({
1866            "error": format!("Route pattern not found: {}", pattern)
1867        })));
1868    }
1869
1870    Ok(Json(serde_json::json!({
1871        "pattern": pattern,
1872        "mode": format!("{:?}", mode).to_lowercase()
1873    })))
1874}
1875
1876/// Toggle a group's migration mode
1877async fn toggle_group_migration(
1878    State(state): State<ManagementState>,
1879    Path(group): Path<String>,
1880) -> Result<Json<serde_json::Value>, StatusCode> {
1881    let proxy_config = match &state.proxy_config {
1882        Some(config) => config,
1883        None => {
1884            return Ok(Json(serde_json::json!({
1885                "error": "Migration not configured. Proxy config not available."
1886            })));
1887        }
1888    };
1889
1890    let mut config = proxy_config.write().await;
1891    let new_mode = config.toggle_group_migration(&group);
1892
1893    Ok(Json(serde_json::json!({
1894        "group": group,
1895        "mode": format!("{:?}", new_mode).to_lowercase()
1896    })))
1897}
1898
1899/// Set a group's migration mode explicitly
1900async fn set_group_migration_mode(
1901    State(state): State<ManagementState>,
1902    Path(group): Path<String>,
1903    Json(request): Json<SetMigrationModeRequest>,
1904) -> Result<Json<serde_json::Value>, StatusCode> {
1905    let proxy_config = match &state.proxy_config {
1906        Some(config) => config,
1907        None => {
1908            return Ok(Json(serde_json::json!({
1909                "error": "Migration not configured. Proxy config not available."
1910            })));
1911        }
1912    };
1913
1914    use mockforge_core::proxy::config::MigrationMode;
1915    let mode = match request.mode.to_lowercase().as_str() {
1916        "mock" => MigrationMode::Mock,
1917        "shadow" => MigrationMode::Shadow,
1918        "real" => MigrationMode::Real,
1919        "auto" => MigrationMode::Auto,
1920        _ => {
1921            return Ok(Json(serde_json::json!({
1922                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1923            })));
1924        }
1925    };
1926
1927    let mut config = proxy_config.write().await;
1928    config.update_group_migration_mode(&group, mode);
1929
1930    Ok(Json(serde_json::json!({
1931        "group": group,
1932        "mode": format!("{:?}", mode).to_lowercase()
1933    })))
1934}
1935
1936/// Get all migration groups
1937async fn get_migration_groups(
1938    State(state): State<ManagementState>,
1939) -> Result<Json<serde_json::Value>, StatusCode> {
1940    let proxy_config = match &state.proxy_config {
1941        Some(config) => config,
1942        None => {
1943            return Ok(Json(serde_json::json!({
1944                "error": "Migration not configured. Proxy config not available."
1945            })));
1946        }
1947    };
1948
1949    let config = proxy_config.read().await;
1950    let groups = config.get_migration_groups();
1951
1952    // Convert to JSON-serializable format
1953    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1954        .into_iter()
1955        .map(|(name, info)| {
1956            (
1957                name,
1958                serde_json::json!({
1959                    "name": info.name,
1960                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1961                    "route_count": info.route_count
1962                }),
1963            )
1964        })
1965        .collect();
1966
1967    Ok(Json(serde_json::json!(groups_json)))
1968}
1969
1970/// Get overall migration status
1971async fn get_migration_status(
1972    State(state): State<ManagementState>,
1973) -> Result<Json<serde_json::Value>, StatusCode> {
1974    let proxy_config = match &state.proxy_config {
1975        Some(config) => config,
1976        None => {
1977            return Ok(Json(serde_json::json!({
1978                "error": "Migration not configured. Proxy config not available."
1979            })));
1980        }
1981    };
1982
1983    let config = proxy_config.read().await;
1984    let routes = config.get_migration_routes();
1985    let groups = config.get_migration_groups();
1986
1987    let mut mock_count = 0;
1988    let mut shadow_count = 0;
1989    let mut real_count = 0;
1990    let mut auto_count = 0;
1991
1992    for route in &routes {
1993        match route.migration_mode {
1994            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1995            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1996            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1997            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1998        }
1999    }
2000
2001    Ok(Json(serde_json::json!({
2002        "total_routes": routes.len(),
2003        "mock_routes": mock_count,
2004        "shadow_routes": shadow_count,
2005        "real_routes": real_count,
2006        "auto_routes": auto_count,
2007        "total_groups": groups.len(),
2008        "migration_enabled": config.migration_enabled
2009    })))
2010}
2011
2012// ========== Proxy Replacement Rules Management ==========
2013
2014/// Request body for creating/updating proxy replacement rules
2015#[derive(Debug, Deserialize, Serialize)]
2016pub struct ProxyRuleRequest {
2017    /// URL pattern to match (supports wildcards like "/api/users/*")
2018    pub pattern: String,
2019    /// Rule type: "request" or "response"
2020    #[serde(rename = "type")]
2021    pub rule_type: String,
2022    /// Optional status code filter for response rules
2023    #[serde(default)]
2024    pub status_codes: Vec<u16>,
2025    /// Body transformations to apply
2026    pub body_transforms: Vec<BodyTransformRequest>,
2027    /// Whether this rule is enabled
2028    #[serde(default = "default_true")]
2029    pub enabled: bool,
2030}
2031
2032/// Request body for individual body transformations
2033#[derive(Debug, Deserialize, Serialize)]
2034pub struct BodyTransformRequest {
2035    /// JSONPath expression to target (e.g., "$.userId", "$.email")
2036    pub path: String,
2037    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
2038    pub replace: String,
2039    /// Operation to perform: "replace", "add", or "remove"
2040    #[serde(default)]
2041    pub operation: String,
2042}
2043
2044/// Response format for proxy rules
2045#[derive(Debug, Serialize)]
2046pub struct ProxyRuleResponse {
2047    /// Rule ID (index in the array)
2048    pub id: usize,
2049    /// URL pattern
2050    pub pattern: String,
2051    /// Rule type
2052    #[serde(rename = "type")]
2053    pub rule_type: String,
2054    /// Status codes (for response rules)
2055    pub status_codes: Vec<u16>,
2056    /// Body transformations
2057    pub body_transforms: Vec<BodyTransformRequest>,
2058    /// Whether enabled
2059    pub enabled: bool,
2060}
2061
2062/// List all proxy replacement rules
2063async fn list_proxy_rules(
2064    State(state): State<ManagementState>,
2065) -> Result<Json<serde_json::Value>, StatusCode> {
2066    let proxy_config = match &state.proxy_config {
2067        Some(config) => config,
2068        None => {
2069            return Ok(Json(serde_json::json!({
2070                "error": "Proxy not configured. Proxy config not available."
2071            })));
2072        }
2073    };
2074
2075    let config = proxy_config.read().await;
2076
2077    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2078
2079    // Add request replacement rules
2080    for (idx, rule) in config.request_replacements.iter().enumerate() {
2081        rules.push(ProxyRuleResponse {
2082            id: idx,
2083            pattern: rule.pattern.clone(),
2084            rule_type: "request".to_string(),
2085            status_codes: Vec::new(),
2086            body_transforms: rule
2087                .body_transforms
2088                .iter()
2089                .map(|t| BodyTransformRequest {
2090                    path: t.path.clone(),
2091                    replace: t.replace.clone(),
2092                    operation: format!("{:?}", t.operation).to_lowercase(),
2093                })
2094                .collect(),
2095            enabled: rule.enabled,
2096        });
2097    }
2098
2099    // Add response replacement rules
2100    let request_count = config.request_replacements.len();
2101    for (idx, rule) in config.response_replacements.iter().enumerate() {
2102        rules.push(ProxyRuleResponse {
2103            id: request_count + idx,
2104            pattern: rule.pattern.clone(),
2105            rule_type: "response".to_string(),
2106            status_codes: rule.status_codes.clone(),
2107            body_transforms: rule
2108                .body_transforms
2109                .iter()
2110                .map(|t| BodyTransformRequest {
2111                    path: t.path.clone(),
2112                    replace: t.replace.clone(),
2113                    operation: format!("{:?}", t.operation).to_lowercase(),
2114                })
2115                .collect(),
2116            enabled: rule.enabled,
2117        });
2118    }
2119
2120    Ok(Json(serde_json::json!({
2121        "rules": rules
2122    })))
2123}
2124
2125/// Create a new proxy replacement rule
2126async fn create_proxy_rule(
2127    State(state): State<ManagementState>,
2128    Json(request): Json<ProxyRuleRequest>,
2129) -> Result<Json<serde_json::Value>, StatusCode> {
2130    let proxy_config = match &state.proxy_config {
2131        Some(config) => config,
2132        None => {
2133            return Ok(Json(serde_json::json!({
2134                "error": "Proxy not configured. Proxy config not available."
2135            })));
2136        }
2137    };
2138
2139    // Validate request
2140    if request.body_transforms.is_empty() {
2141        return Ok(Json(serde_json::json!({
2142            "error": "At least one body transform is required"
2143        })));
2144    }
2145
2146    let body_transforms: Vec<BodyTransform> = request
2147        .body_transforms
2148        .iter()
2149        .map(|t| {
2150            let op = match t.operation.as_str() {
2151                "replace" => TransformOperation::Replace,
2152                "add" => TransformOperation::Add,
2153                "remove" => TransformOperation::Remove,
2154                _ => TransformOperation::Replace,
2155            };
2156            BodyTransform {
2157                path: t.path.clone(),
2158                replace: t.replace.clone(),
2159                operation: op,
2160            }
2161        })
2162        .collect();
2163
2164    let new_rule = BodyTransformRule {
2165        pattern: request.pattern.clone(),
2166        status_codes: request.status_codes.clone(),
2167        body_transforms,
2168        enabled: request.enabled,
2169    };
2170
2171    let mut config = proxy_config.write().await;
2172
2173    let rule_id = if request.rule_type == "request" {
2174        config.request_replacements.push(new_rule);
2175        config.request_replacements.len() - 1
2176    } else if request.rule_type == "response" {
2177        config.response_replacements.push(new_rule);
2178        config.request_replacements.len() + config.response_replacements.len() - 1
2179    } else {
2180        return Ok(Json(serde_json::json!({
2181            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2182        })));
2183    };
2184
2185    Ok(Json(serde_json::json!({
2186        "id": rule_id,
2187        "message": "Rule created successfully"
2188    })))
2189}
2190
2191/// Get a specific proxy replacement rule
2192async fn get_proxy_rule(
2193    State(state): State<ManagementState>,
2194    Path(id): Path<String>,
2195) -> Result<Json<serde_json::Value>, StatusCode> {
2196    let proxy_config = match &state.proxy_config {
2197        Some(config) => config,
2198        None => {
2199            return Ok(Json(serde_json::json!({
2200                "error": "Proxy not configured. Proxy config not available."
2201            })));
2202        }
2203    };
2204
2205    let config = proxy_config.read().await;
2206    let rule_id: usize = match id.parse() {
2207        Ok(id) => id,
2208        Err(_) => {
2209            return Ok(Json(serde_json::json!({
2210                "error": format!("Invalid rule ID: {}", id)
2211            })));
2212        }
2213    };
2214
2215    let request_count = config.request_replacements.len();
2216
2217    if rule_id < request_count {
2218        // Request rule
2219        let rule = &config.request_replacements[rule_id];
2220        Ok(Json(serde_json::json!({
2221            "id": rule_id,
2222            "pattern": rule.pattern,
2223            "type": "request",
2224            "status_codes": [],
2225            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2226                "path": t.path,
2227                "replace": t.replace,
2228                "operation": format!("{:?}", t.operation).to_lowercase()
2229            })).collect::<Vec<_>>(),
2230            "enabled": rule.enabled
2231        })))
2232    } else if rule_id < request_count + config.response_replacements.len() {
2233        // Response rule
2234        let response_idx = rule_id - request_count;
2235        let rule = &config.response_replacements[response_idx];
2236        Ok(Json(serde_json::json!({
2237            "id": rule_id,
2238            "pattern": rule.pattern,
2239            "type": "response",
2240            "status_codes": rule.status_codes,
2241            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2242                "path": t.path,
2243                "replace": t.replace,
2244                "operation": format!("{:?}", t.operation).to_lowercase()
2245            })).collect::<Vec<_>>(),
2246            "enabled": rule.enabled
2247        })))
2248    } else {
2249        Ok(Json(serde_json::json!({
2250            "error": format!("Rule ID {} not found", rule_id)
2251        })))
2252    }
2253}
2254
2255/// Update a proxy replacement rule
2256async fn update_proxy_rule(
2257    State(state): State<ManagementState>,
2258    Path(id): Path<String>,
2259    Json(request): Json<ProxyRuleRequest>,
2260) -> Result<Json<serde_json::Value>, StatusCode> {
2261    let proxy_config = match &state.proxy_config {
2262        Some(config) => config,
2263        None => {
2264            return Ok(Json(serde_json::json!({
2265                "error": "Proxy not configured. Proxy config not available."
2266            })));
2267        }
2268    };
2269
2270    let mut config = proxy_config.write().await;
2271    let rule_id: usize = match id.parse() {
2272        Ok(id) => id,
2273        Err(_) => {
2274            return Ok(Json(serde_json::json!({
2275                "error": format!("Invalid rule ID: {}", id)
2276            })));
2277        }
2278    };
2279
2280    let body_transforms: Vec<BodyTransform> = request
2281        .body_transforms
2282        .iter()
2283        .map(|t| {
2284            let op = match t.operation.as_str() {
2285                "replace" => TransformOperation::Replace,
2286                "add" => TransformOperation::Add,
2287                "remove" => TransformOperation::Remove,
2288                _ => TransformOperation::Replace,
2289            };
2290            BodyTransform {
2291                path: t.path.clone(),
2292                replace: t.replace.clone(),
2293                operation: op,
2294            }
2295        })
2296        .collect();
2297
2298    let updated_rule = BodyTransformRule {
2299        pattern: request.pattern.clone(),
2300        status_codes: request.status_codes.clone(),
2301        body_transforms,
2302        enabled: request.enabled,
2303    };
2304
2305    let request_count = config.request_replacements.len();
2306
2307    if rule_id < request_count {
2308        // Update request rule
2309        config.request_replacements[rule_id] = updated_rule;
2310    } else if rule_id < request_count + config.response_replacements.len() {
2311        // Update response rule
2312        let response_idx = rule_id - request_count;
2313        config.response_replacements[response_idx] = updated_rule;
2314    } else {
2315        return Ok(Json(serde_json::json!({
2316            "error": format!("Rule ID {} not found", rule_id)
2317        })));
2318    }
2319
2320    Ok(Json(serde_json::json!({
2321        "id": rule_id,
2322        "message": "Rule updated successfully"
2323    })))
2324}
2325
2326/// Delete a proxy replacement rule
2327async fn delete_proxy_rule(
2328    State(state): State<ManagementState>,
2329    Path(id): Path<String>,
2330) -> Result<Json<serde_json::Value>, StatusCode> {
2331    let proxy_config = match &state.proxy_config {
2332        Some(config) => config,
2333        None => {
2334            return Ok(Json(serde_json::json!({
2335                "error": "Proxy not configured. Proxy config not available."
2336            })));
2337        }
2338    };
2339
2340    let mut config = proxy_config.write().await;
2341    let rule_id: usize = match id.parse() {
2342        Ok(id) => id,
2343        Err(_) => {
2344            return Ok(Json(serde_json::json!({
2345                "error": format!("Invalid rule ID: {}", id)
2346            })));
2347        }
2348    };
2349
2350    let request_count = config.request_replacements.len();
2351
2352    if rule_id < request_count {
2353        // Delete request rule
2354        config.request_replacements.remove(rule_id);
2355    } else if rule_id < request_count + config.response_replacements.len() {
2356        // Delete response rule
2357        let response_idx = rule_id - request_count;
2358        config.response_replacements.remove(response_idx);
2359    } else {
2360        return Ok(Json(serde_json::json!({
2361            "error": format!("Rule ID {} not found", rule_id)
2362        })));
2363    }
2364
2365    Ok(Json(serde_json::json!({
2366        "id": rule_id,
2367        "message": "Rule deleted successfully"
2368    })))
2369}
2370
2371/// Get proxy rules and transformation configuration for inspection
2372async fn get_proxy_inspect(
2373    State(state): State<ManagementState>,
2374    Query(params): Query<std::collections::HashMap<String, String>>,
2375) -> Result<Json<serde_json::Value>, StatusCode> {
2376    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2377    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2378
2379    let proxy_config = match &state.proxy_config {
2380        Some(config) => config.read().await,
2381        None => {
2382            return Ok(Json(serde_json::json!({
2383                "error": "Proxy not configured. Proxy config not available."
2384            })));
2385        }
2386    };
2387
2388    let mut rules = Vec::new();
2389    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2390        rules.push(serde_json::json!({
2391            "id": idx,
2392            "kind": "request",
2393            "pattern": rule.pattern,
2394            "enabled": rule.enabled,
2395            "status_codes": rule.status_codes,
2396            "transform_count": rule.body_transforms.len(),
2397            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2398                "path": t.path,
2399                "operation": t.operation,
2400                "replace": t.replace
2401            })).collect::<Vec<_>>()
2402        }));
2403    }
2404    let request_rule_count = rules.len();
2405    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2406        rules.push(serde_json::json!({
2407            "id": request_rule_count + idx,
2408            "kind": "response",
2409            "pattern": rule.pattern,
2410            "enabled": rule.enabled,
2411            "status_codes": rule.status_codes,
2412            "transform_count": rule.body_transforms.len(),
2413            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2414                "path": t.path,
2415                "operation": t.operation,
2416                "replace": t.replace
2417            })).collect::<Vec<_>>()
2418        }));
2419    }
2420
2421    let total = rules.len();
2422    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2423
2424    Ok(Json(serde_json::json!({
2425        "enabled": proxy_config.enabled,
2426        "target_url": proxy_config.target_url,
2427        "prefix": proxy_config.prefix,
2428        "timeout_seconds": proxy_config.timeout_seconds,
2429        "follow_redirects": proxy_config.follow_redirects,
2430        "passthrough_by_default": proxy_config.passthrough_by_default,
2431        "rules": paged_rules,
2432        "request_rule_count": request_rule_count,
2433        "response_rule_count": total.saturating_sub(request_rule_count),
2434        "limit": limit,
2435        "offset": offset,
2436        "total": total
2437    })))
2438}
2439
2440/// Build the management API router
2441pub fn management_router(state: ManagementState) -> Router {
2442    let router = Router::new()
2443        .route("/health", get(health_check))
2444        .route("/stats", get(get_stats))
2445        .route("/config", get(get_config))
2446        .route("/config/validate", post(validate_config))
2447        .route("/config/bulk", post(bulk_update_config))
2448        .route("/mocks", get(list_mocks))
2449        .route("/mocks", post(create_mock))
2450        .route("/mocks/{id}", get(get_mock))
2451        .route("/mocks/{id}", put(update_mock))
2452        .route("/mocks/{id}", delete(delete_mock))
2453        .route("/export", get(export_mocks))
2454        .route("/import", post(import_mocks))
2455        .route("/spec", get(get_openapi_spec));
2456
2457    #[cfg(feature = "smtp")]
2458    let router = router
2459        .route("/smtp/mailbox", get(list_smtp_emails))
2460        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2461        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2462        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2463        .route("/smtp/mailbox/search", get(search_smtp_emails));
2464
2465    #[cfg(not(feature = "smtp"))]
2466    let router = router;
2467
2468    // MQTT routes
2469    #[cfg(feature = "mqtt")]
2470    let router = router
2471        .route("/mqtt/stats", get(get_mqtt_stats))
2472        .route("/mqtt/clients", get(get_mqtt_clients))
2473        .route("/mqtt/topics", get(get_mqtt_topics))
2474        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2475        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2476        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2477        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2478
2479    #[cfg(not(feature = "mqtt"))]
2480    let router = router
2481        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2482        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2483
2484    #[cfg(feature = "kafka")]
2485    let router = router
2486        .route("/kafka/stats", get(get_kafka_stats))
2487        .route("/kafka/topics", get(get_kafka_topics))
2488        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2489        .route("/kafka/groups", get(get_kafka_groups))
2490        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2491        .route("/kafka/produce", post(produce_kafka_message))
2492        .route("/kafka/produce/batch", post(produce_kafka_batch))
2493        .route("/kafka/messages/stream", get(kafka_messages_stream));
2494
2495    #[cfg(not(feature = "kafka"))]
2496    let router = router;
2497
2498    // Migration pipeline routes
2499    let router = router
2500        .route("/migration/routes", get(get_migration_routes))
2501        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2502        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2503        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2504        .route("/migration/groups/{group}", put(set_group_migration_mode))
2505        .route("/migration/groups", get(get_migration_groups))
2506        .route("/migration/status", get(get_migration_status));
2507
2508    // Proxy replacement rules routes
2509    let router = router
2510        .route("/proxy/rules", get(list_proxy_rules))
2511        .route("/proxy/rules", post(create_proxy_rule))
2512        .route("/proxy/rules/{id}", get(get_proxy_rule))
2513        .route("/proxy/rules/{id}", put(update_proxy_rule))
2514        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2515        .route("/proxy/inspect", get(get_proxy_inspect));
2516
2517    // AI-powered features
2518    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2519
2520    // Snapshot diff endpoints
2521    let router = router.nest(
2522        "/snapshot-diff",
2523        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2524    );
2525
2526    #[cfg(feature = "behavioral-cloning")]
2527    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2528
2529    let router = router
2530        .route("/mockai/learn", post(learn_from_examples))
2531        .route("/mockai/rules/explanations", get(list_rule_explanations))
2532        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2533        .route("/chaos/config", get(get_chaos_config))
2534        .route("/chaos/config", post(update_chaos_config))
2535        .route("/network/profiles", get(list_network_profiles))
2536        .route("/network/profile/apply", post(apply_network_profile));
2537
2538    // State machine API routes
2539    let router =
2540        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2541
2542    // Conformance testing API routes
2543    #[cfg(feature = "conformance")]
2544    let router = router.nest_service(
2545        "/conformance",
2546        crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2547    );
2548    #[cfg(not(feature = "conformance"))]
2549    let router = router;
2550
2551    router.with_state(state)
2552}
2553
2554#[cfg(feature = "kafka")]
2555/// Kafka broker statistics
2556#[derive(Debug, Clone, Serialize, Deserialize)]
2557pub struct KafkaBrokerStats {
2558    /// Number of topics
2559    pub topics: usize,
2560    /// Total number of partitions
2561    pub partitions: usize,
2562    /// Number of consumer groups
2563    pub consumer_groups: usize,
2564    /// Total messages produced
2565    pub messages_produced: u64,
2566    /// Total messages consumed
2567    pub messages_consumed: u64,
2568}
2569
2570#[cfg(feature = "kafka")]
2571#[allow(missing_docs)]
2572#[derive(Debug, Clone, Serialize, Deserialize)]
2573pub struct KafkaTopicInfo {
2574    pub name: String,
2575    pub partitions: usize,
2576    pub replication_factor: i32,
2577}
2578
2579#[cfg(feature = "kafka")]
2580#[allow(missing_docs)]
2581#[derive(Debug, Clone, Serialize, Deserialize)]
2582pub struct KafkaConsumerGroupInfo {
2583    pub group_id: String,
2584    pub members: usize,
2585    pub state: String,
2586}
2587
2588#[cfg(feature = "kafka")]
2589/// Get Kafka broker statistics
2590async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2591    if let Some(broker) = &state.kafka_broker {
2592        let topics = broker.topics.read().await;
2593        let consumer_groups = broker.consumer_groups.read().await;
2594
2595        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2596
2597        // Get metrics snapshot for message counts
2598        let metrics_snapshot = broker.metrics().snapshot();
2599
2600        let stats = KafkaBrokerStats {
2601            topics: topics.len(),
2602            partitions: total_partitions,
2603            consumer_groups: consumer_groups.groups().len(),
2604            messages_produced: metrics_snapshot.messages_produced_total,
2605            messages_consumed: metrics_snapshot.messages_consumed_total,
2606        };
2607
2608        Json(stats).into_response()
2609    } else {
2610        (
2611            StatusCode::SERVICE_UNAVAILABLE,
2612            Json(serde_json::json!({
2613                "error": "Kafka broker not available",
2614                "message": "Kafka broker is not enabled or not available."
2615            })),
2616        )
2617            .into_response()
2618    }
2619}
2620
2621#[cfg(feature = "kafka")]
2622/// List Kafka topics
2623async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2624    if let Some(broker) = &state.kafka_broker {
2625        let topics = broker.topics.read().await;
2626        let topic_list: Vec<KafkaTopicInfo> = topics
2627            .iter()
2628            .map(|(name, topic)| KafkaTopicInfo {
2629                name: name.clone(),
2630                partitions: topic.partitions.len(),
2631                replication_factor: topic.config.replication_factor as i32,
2632            })
2633            .collect();
2634
2635        Json(serde_json::json!({
2636            "topics": topic_list
2637        }))
2638        .into_response()
2639    } else {
2640        (
2641            StatusCode::SERVICE_UNAVAILABLE,
2642            Json(serde_json::json!({
2643                "error": "Kafka broker not available",
2644                "message": "Kafka broker is not enabled or not available."
2645            })),
2646        )
2647            .into_response()
2648    }
2649}
2650
2651#[cfg(feature = "kafka")]
2652/// Get Kafka topic details
2653async fn get_kafka_topic(
2654    State(state): State<ManagementState>,
2655    Path(topic_name): Path<String>,
2656) -> impl IntoResponse {
2657    if let Some(broker) = &state.kafka_broker {
2658        let topics = broker.topics.read().await;
2659        if let Some(topic) = topics.get(&topic_name) {
2660            Json(serde_json::json!({
2661                "name": topic_name,
2662                "partitions": topic.partitions.len(),
2663                "replication_factor": topic.config.replication_factor,
2664                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2665                    "id": idx as i32,
2666                    "leader": 0,
2667                    "replicas": vec![0],
2668                    "message_count": partition.messages.len()
2669                })).collect::<Vec<_>>()
2670            })).into_response()
2671        } else {
2672            (
2673                StatusCode::NOT_FOUND,
2674                Json(serde_json::json!({
2675                    "error": "Topic not found",
2676                    "topic": topic_name
2677                })),
2678            )
2679                .into_response()
2680        }
2681    } else {
2682        (
2683            StatusCode::SERVICE_UNAVAILABLE,
2684            Json(serde_json::json!({
2685                "error": "Kafka broker not available",
2686                "message": "Kafka broker is not enabled or not available."
2687            })),
2688        )
2689            .into_response()
2690    }
2691}
2692
2693#[cfg(feature = "kafka")]
2694/// List Kafka consumer groups
2695async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2696    if let Some(broker) = &state.kafka_broker {
2697        let consumer_groups = broker.consumer_groups.read().await;
2698        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2699            .groups()
2700            .iter()
2701            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2702                group_id: group_id.clone(),
2703                members: group.members.len(),
2704                state: "Stable".to_string(), // Simplified - could be more detailed
2705            })
2706            .collect();
2707
2708        Json(serde_json::json!({
2709            "groups": groups
2710        }))
2711        .into_response()
2712    } else {
2713        (
2714            StatusCode::SERVICE_UNAVAILABLE,
2715            Json(serde_json::json!({
2716                "error": "Kafka broker not available",
2717                "message": "Kafka broker is not enabled or not available."
2718            })),
2719        )
2720            .into_response()
2721    }
2722}
2723
2724#[cfg(feature = "kafka")]
2725/// Get Kafka consumer group details
2726async fn get_kafka_group(
2727    State(state): State<ManagementState>,
2728    Path(group_id): Path<String>,
2729) -> impl IntoResponse {
2730    if let Some(broker) = &state.kafka_broker {
2731        let consumer_groups = broker.consumer_groups.read().await;
2732        if let Some(group) = consumer_groups.groups().get(&group_id) {
2733            Json(serde_json::json!({
2734                "group_id": group_id,
2735                "members": group.members.len(),
2736                "state": "Stable",
2737                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2738                    "member_id": member_id,
2739                    "client_id": member.client_id,
2740                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2741                        "topic": a.topic,
2742                        "partitions": a.partitions
2743                    })).collect::<Vec<_>>()
2744                })).collect::<Vec<_>>(),
2745                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2746                    "topic": topic,
2747                    "partition": partition,
2748                    "offset": offset
2749                })).collect::<Vec<_>>()
2750            })).into_response()
2751        } else {
2752            (
2753                StatusCode::NOT_FOUND,
2754                Json(serde_json::json!({
2755                    "error": "Consumer group not found",
2756                    "group_id": group_id
2757                })),
2758            )
2759                .into_response()
2760        }
2761    } else {
2762        (
2763            StatusCode::SERVICE_UNAVAILABLE,
2764            Json(serde_json::json!({
2765                "error": "Kafka broker not available",
2766                "message": "Kafka broker is not enabled or not available."
2767            })),
2768        )
2769            .into_response()
2770    }
2771}
2772
2773// ========== Kafka Produce Handler ==========
2774
2775#[cfg(feature = "kafka")]
2776/// Request body for producing a Kafka message
2777#[derive(Debug, Deserialize)]
2778pub struct KafkaProduceRequest {
2779    /// Topic to produce to
2780    pub topic: String,
2781    /// Message key (optional)
2782    #[serde(default)]
2783    pub key: Option<String>,
2784    /// Message value (JSON string or plain string)
2785    pub value: String,
2786    /// Partition ID (optional, auto-assigned if not provided)
2787    #[serde(default)]
2788    pub partition: Option<i32>,
2789    /// Message headers (optional, key-value pairs)
2790    #[serde(default)]
2791    pub headers: Option<std::collections::HashMap<String, String>>,
2792}
2793
2794#[cfg(feature = "kafka")]
2795/// Produce a message to a Kafka topic
2796async fn produce_kafka_message(
2797    State(state): State<ManagementState>,
2798    Json(request): Json<KafkaProduceRequest>,
2799) -> impl IntoResponse {
2800    if let Some(broker) = &state.kafka_broker {
2801        let mut topics = broker.topics.write().await;
2802
2803        // Get or create the topic
2804        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2805            mockforge_kafka::topics::Topic::new(
2806                request.topic.clone(),
2807                mockforge_kafka::topics::TopicConfig::default(),
2808            )
2809        });
2810
2811        // Determine partition
2812        let partition_id = if let Some(partition) = request.partition {
2813            partition
2814        } else {
2815            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2816        };
2817
2818        // Validate partition exists
2819        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2820            return (
2821                StatusCode::BAD_REQUEST,
2822                Json(serde_json::json!({
2823                    "error": "Invalid partition",
2824                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2825                })),
2826            )
2827                .into_response();
2828        }
2829
2830        // Create the message
2831        let key_clone = request.key.clone();
2832        let headers_clone = request.headers.clone();
2833        let message = mockforge_kafka::partitions::KafkaMessage {
2834            offset: 0, // Will be set by partition.append
2835            timestamp: chrono::Utc::now().timestamp_millis(),
2836            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2837            value: request.value.as_bytes().to_vec(),
2838            headers: headers_clone
2839                .clone()
2840                .unwrap_or_default()
2841                .into_iter()
2842                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2843                .collect(),
2844        };
2845
2846        // Produce to partition
2847        match topic_entry.produce(partition_id, message).await {
2848            Ok(offset) => {
2849                // Record metrics for successful message production
2850                if let Some(broker) = &state.kafka_broker {
2851                    broker.metrics().record_messages_produced(1);
2852                }
2853
2854                // Emit message event for real-time monitoring
2855                #[cfg(feature = "kafka")]
2856                {
2857                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2858                        topic: request.topic.clone(),
2859                        key: key_clone,
2860                        value: request.value.clone(),
2861                        partition: partition_id,
2862                        offset,
2863                        headers: headers_clone,
2864                        timestamp: chrono::Utc::now().to_rfc3339(),
2865                    });
2866                    let _ = state.message_events.send(event);
2867                }
2868
2869                Json(serde_json::json!({
2870                    "success": true,
2871                    "message": format!("Message produced to topic '{}'", request.topic),
2872                    "topic": request.topic,
2873                    "partition": partition_id,
2874                    "offset": offset
2875                }))
2876                .into_response()
2877            }
2878            Err(e) => (
2879                StatusCode::INTERNAL_SERVER_ERROR,
2880                Json(serde_json::json!({
2881                    "error": "Failed to produce message",
2882                    "message": e.to_string()
2883                })),
2884            )
2885                .into_response(),
2886        }
2887    } else {
2888        (
2889            StatusCode::SERVICE_UNAVAILABLE,
2890            Json(serde_json::json!({
2891                "error": "Kafka broker not available",
2892                "message": "Kafka broker is not enabled or not available."
2893            })),
2894        )
2895            .into_response()
2896    }
2897}
2898
2899#[cfg(feature = "kafka")]
2900/// Request body for producing a batch of Kafka messages
2901#[derive(Debug, Deserialize)]
2902pub struct KafkaBatchProduceRequest {
2903    /// List of messages to produce
2904    pub messages: Vec<KafkaProduceRequest>,
2905    /// Delay between messages in milliseconds
2906    #[serde(default = "default_delay")]
2907    pub delay_ms: u64,
2908}
2909
2910#[cfg(feature = "kafka")]
2911/// Produce multiple messages to Kafka topics
2912async fn produce_kafka_batch(
2913    State(state): State<ManagementState>,
2914    Json(request): Json<KafkaBatchProduceRequest>,
2915) -> impl IntoResponse {
2916    if let Some(broker) = &state.kafka_broker {
2917        if request.messages.is_empty() {
2918            return (
2919                StatusCode::BAD_REQUEST,
2920                Json(serde_json::json!({
2921                    "error": "Empty batch",
2922                    "message": "At least one message is required"
2923                })),
2924            )
2925                .into_response();
2926        }
2927
2928        let mut results = Vec::new();
2929
2930        for (index, msg_request) in request.messages.iter().enumerate() {
2931            let mut topics = broker.topics.write().await;
2932
2933            // Get or create the topic
2934            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2935                mockforge_kafka::topics::Topic::new(
2936                    msg_request.topic.clone(),
2937                    mockforge_kafka::topics::TopicConfig::default(),
2938                )
2939            });
2940
2941            // Determine partition
2942            let partition_id = if let Some(partition) = msg_request.partition {
2943                partition
2944            } else {
2945                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2946            };
2947
2948            // Validate partition exists
2949            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2950                results.push(serde_json::json!({
2951                    "index": index,
2952                    "success": false,
2953                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2954                }));
2955                continue;
2956            }
2957
2958            // Create the message
2959            let message = mockforge_kafka::partitions::KafkaMessage {
2960                offset: 0,
2961                timestamp: chrono::Utc::now().timestamp_millis(),
2962                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2963                value: msg_request.value.as_bytes().to_vec(),
2964                headers: msg_request
2965                    .headers
2966                    .clone()
2967                    .unwrap_or_default()
2968                    .into_iter()
2969                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2970                    .collect(),
2971            };
2972
2973            // Produce to partition
2974            match topic_entry.produce(partition_id, message).await {
2975                Ok(offset) => {
2976                    // Record metrics for successful message production
2977                    if let Some(broker) = &state.kafka_broker {
2978                        broker.metrics().record_messages_produced(1);
2979                    }
2980
2981                    // Emit message event
2982                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2983                        topic: msg_request.topic.clone(),
2984                        key: msg_request.key.clone(),
2985                        value: msg_request.value.clone(),
2986                        partition: partition_id,
2987                        offset,
2988                        headers: msg_request.headers.clone(),
2989                        timestamp: chrono::Utc::now().to_rfc3339(),
2990                    });
2991                    let _ = state.message_events.send(event);
2992
2993                    results.push(serde_json::json!({
2994                        "index": index,
2995                        "success": true,
2996                        "topic": msg_request.topic,
2997                        "partition": partition_id,
2998                        "offset": offset
2999                    }));
3000                }
3001                Err(e) => {
3002                    results.push(serde_json::json!({
3003                        "index": index,
3004                        "success": false,
3005                        "error": e.to_string()
3006                    }));
3007                }
3008            }
3009
3010            // Add delay between messages (except for the last one)
3011            if index < request.messages.len() - 1 && request.delay_ms > 0 {
3012                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3013            }
3014        }
3015
3016        let success_count =
3017            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3018
3019        Json(serde_json::json!({
3020            "success": true,
3021            "total": request.messages.len(),
3022            "succeeded": success_count,
3023            "failed": request.messages.len() - success_count,
3024            "results": results
3025        }))
3026        .into_response()
3027    } else {
3028        (
3029            StatusCode::SERVICE_UNAVAILABLE,
3030            Json(serde_json::json!({
3031                "error": "Kafka broker not available",
3032                "message": "Kafka broker is not enabled or not available."
3033            })),
3034        )
3035            .into_response()
3036    }
3037}
3038
3039// ========== Real-time Message Streaming (SSE) ==========
3040
3041#[cfg(feature = "mqtt")]
3042/// SSE stream for MQTT messages
3043async fn mqtt_messages_stream(
3044    State(state): State<ManagementState>,
3045    Query(params): Query<std::collections::HashMap<String, String>>,
3046) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3047    let rx = state.message_events.subscribe();
3048    let topic_filter = params.get("topic").cloned();
3049
3050    let stream = stream::unfold(rx, move |mut rx| {
3051        let topic_filter = topic_filter.clone();
3052
3053        async move {
3054            loop {
3055                match rx.recv().await {
3056                    Ok(MessageEvent::Mqtt(event)) => {
3057                        // Apply topic filter if specified
3058                        if let Some(filter) = &topic_filter {
3059                            if !event.topic.contains(filter) {
3060                                continue;
3061                            }
3062                        }
3063
3064                        let event_json = serde_json::json!({
3065                            "protocol": "mqtt",
3066                            "topic": event.topic,
3067                            "payload": event.payload,
3068                            "qos": event.qos,
3069                            "retain": event.retain,
3070                            "timestamp": event.timestamp,
3071                        });
3072
3073                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3074                            let sse_event = Event::default().event("mqtt_message").data(event_data);
3075                            return Some((Ok(sse_event), rx));
3076                        }
3077                    }
3078                    #[cfg(feature = "kafka")]
3079                    Ok(MessageEvent::Kafka(_)) => {
3080                        // Skip Kafka events in MQTT stream
3081                        continue;
3082                    }
3083                    Err(broadcast::error::RecvError::Closed) => {
3084                        return None;
3085                    }
3086                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3087                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
3088                        continue;
3089                    }
3090                }
3091            }
3092        }
3093    });
3094
3095    Sse::new(stream).keep_alive(
3096        axum::response::sse::KeepAlive::new()
3097            .interval(std::time::Duration::from_secs(15))
3098            .text("keep-alive-text"),
3099    )
3100}
3101
3102#[cfg(feature = "kafka")]
3103/// SSE stream for Kafka messages
3104async fn kafka_messages_stream(
3105    State(state): State<ManagementState>,
3106    Query(params): Query<std::collections::HashMap<String, String>>,
3107) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3108    let rx = state.message_events.subscribe();
3109    let topic_filter = params.get("topic").cloned();
3110
3111    let stream = stream::unfold(rx, move |mut rx| {
3112        let topic_filter = topic_filter.clone();
3113
3114        async move {
3115            loop {
3116                match rx.recv().await {
3117                    #[cfg(feature = "mqtt")]
3118                    Ok(MessageEvent::Mqtt(_)) => {
3119                        // Skip MQTT events in Kafka stream
3120                        continue;
3121                    }
3122                    Ok(MessageEvent::Kafka(event)) => {
3123                        // Apply topic filter if specified
3124                        if let Some(filter) = &topic_filter {
3125                            if !event.topic.contains(filter) {
3126                                continue;
3127                            }
3128                        }
3129
3130                        let event_json = serde_json::json!({
3131                            "protocol": "kafka",
3132                            "topic": event.topic,
3133                            "key": event.key,
3134                            "value": event.value,
3135                            "partition": event.partition,
3136                            "offset": event.offset,
3137                            "headers": event.headers,
3138                            "timestamp": event.timestamp,
3139                        });
3140
3141                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3142                            let sse_event =
3143                                Event::default().event("kafka_message").data(event_data);
3144                            return Some((Ok(sse_event), rx));
3145                        }
3146                    }
3147                    Err(broadcast::error::RecvError::Closed) => {
3148                        return None;
3149                    }
3150                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3151                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3152                        continue;
3153                    }
3154                }
3155            }
3156        }
3157    });
3158
3159    Sse::new(stream).keep_alive(
3160        axum::response::sse::KeepAlive::new()
3161            .interval(std::time::Duration::from_secs(15))
3162            .text("keep-alive-text"),
3163    )
3164}
3165
3166// ========== AI-Powered Features ==========
3167
3168/// Request for AI-powered API specification generation
3169#[derive(Debug, Deserialize)]
3170pub struct GenerateSpecRequest {
3171    /// Natural language description of the API to generate
3172    pub query: String,
3173    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3174    pub spec_type: String,
3175    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3176    pub api_version: Option<String>,
3177}
3178
3179/// Request for OpenAPI generation from recorded traffic
3180#[derive(Debug, Deserialize)]
3181pub struct GenerateOpenApiFromTrafficRequest {
3182    /// Path to recorder database (optional, defaults to ./recordings.db)
3183    #[serde(default)]
3184    pub database_path: Option<String>,
3185    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3186    #[serde(default)]
3187    pub since: Option<String>,
3188    /// End time for filtering (ISO 8601 format)
3189    #[serde(default)]
3190    pub until: Option<String>,
3191    /// Path pattern filter (supports wildcards, e.g., /api/*)
3192    #[serde(default)]
3193    pub path_pattern: Option<String>,
3194    /// Minimum confidence score for including paths (0.0 to 1.0)
3195    #[serde(default = "default_min_confidence")]
3196    pub min_confidence: f64,
3197}
3198
3199fn default_min_confidence() -> f64 {
3200    0.7
3201}
3202
3203/// Generate API specification from natural language using AI
3204#[cfg(feature = "data-faker")]
3205async fn generate_ai_spec(
3206    State(_state): State<ManagementState>,
3207    Json(request): Json<GenerateSpecRequest>,
3208) -> impl IntoResponse {
3209    use mockforge_data::rag::{
3210        config::{LlmProvider, RagConfig},
3211        engine::RagEngine,
3212        storage::DocumentStorage,
3213    };
3214    use std::sync::Arc;
3215
3216    // Build RAG config from environment variables
3217    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3218        .ok()
3219        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3220
3221    // Check if RAG is configured - require API key
3222    if api_key.is_none() {
3223        return (
3224            StatusCode::SERVICE_UNAVAILABLE,
3225            Json(serde_json::json!({
3226                "error": "AI service not configured",
3227                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3228            })),
3229        )
3230            .into_response();
3231    }
3232
3233    // Build RAG configuration
3234    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3235        .unwrap_or_else(|_| "openai".to_string())
3236        .to_lowercase();
3237
3238    let provider = match provider_str.as_str() {
3239        "openai" => LlmProvider::OpenAI,
3240        "anthropic" => LlmProvider::Anthropic,
3241        "ollama" => LlmProvider::Ollama,
3242        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3243        _ => LlmProvider::OpenAI,
3244    };
3245
3246    let api_endpoint =
3247        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3248            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3249            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3250            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3251            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3252        });
3253
3254    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3255        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3256        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3257        LlmProvider::Ollama => "llama2".to_string(),
3258        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3259    });
3260
3261    // Build RagConfig using struct literal with defaults
3262    let rag_config = RagConfig {
3263        provider,
3264        api_endpoint,
3265        api_key,
3266        model,
3267        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3268            .unwrap_or_else(|_| "4096".to_string())
3269            .parse()
3270            .unwrap_or(4096),
3271        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3272            .unwrap_or_else(|_| "0.3".to_string())
3273            .parse()
3274            .unwrap_or(0.3), // Lower temperature for more structured output
3275        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3276            .unwrap_or_else(|_| "60".to_string())
3277            .parse()
3278            .unwrap_or(60),
3279        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3280            .unwrap_or_else(|_| "4000".to_string())
3281            .parse()
3282            .unwrap_or(4000),
3283        ..Default::default()
3284    };
3285
3286    // Build the prompt for spec generation
3287    let spec_type_label = match request.spec_type.as_str() {
3288        "openapi" => "OpenAPI 3.0",
3289        "graphql" => "GraphQL",
3290        "asyncapi" => "AsyncAPI",
3291        _ => "OpenAPI 3.0",
3292    };
3293
3294    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3295
3296    let prompt = format!(
3297        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3298
3299User Requirements:
3300{}
3301
3302Instructions:
33031. Generate a complete, valid {} specification
33042. Include all paths, operations, request/response schemas, and components
33053. Use realistic field names and data types
33064. Include proper descriptions and examples
33075. Follow {} best practices
33086. Return ONLY the specification, no additional explanation
33097. For OpenAPI, use version {}
3310
3311Return the specification in {} format."#,
3312        spec_type_label,
3313        request.query,
3314        spec_type_label,
3315        spec_type_label,
3316        api_version,
3317        if request.spec_type == "graphql" {
3318            "GraphQL SDL"
3319        } else {
3320            "YAML"
3321        }
3322    );
3323
3324    // Create in-memory storage for RAG engine
3325    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3326    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3327    // a simpler approach: create InMemoryStorage directly
3328    use mockforge_data::rag::storage::InMemoryStorage;
3329    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3330
3331    // Create RAG engine
3332    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3333        Ok(engine) => engine,
3334        Err(e) => {
3335            return (
3336                StatusCode::INTERNAL_SERVER_ERROR,
3337                Json(serde_json::json!({
3338                    "error": "Failed to initialize RAG engine",
3339                    "message": e.to_string()
3340                })),
3341            )
3342                .into_response();
3343        }
3344    };
3345
3346    // Generate using RAG engine
3347    match rag_engine.generate(&prompt, None).await {
3348        Ok(generated_text) => {
3349            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3350            let spec = if request.spec_type == "graphql" {
3351                // For GraphQL, extract SDL
3352                extract_graphql_schema(&generated_text)
3353            } else {
3354                // For OpenAPI/AsyncAPI, extract YAML
3355                extract_yaml_spec(&generated_text)
3356            };
3357
3358            Json(serde_json::json!({
3359                "success": true,
3360                "spec": spec,
3361                "spec_type": request.spec_type,
3362            }))
3363            .into_response()
3364        }
3365        Err(e) => (
3366            StatusCode::INTERNAL_SERVER_ERROR,
3367            Json(serde_json::json!({
3368                "error": "AI generation failed",
3369                "message": e.to_string()
3370            })),
3371        )
3372            .into_response(),
3373    }
3374}
3375
3376#[cfg(not(feature = "data-faker"))]
3377async fn generate_ai_spec(
3378    State(_state): State<ManagementState>,
3379    Json(_request): Json<GenerateSpecRequest>,
3380) -> impl IntoResponse {
3381    (
3382        StatusCode::NOT_IMPLEMENTED,
3383        Json(serde_json::json!({
3384            "error": "AI features not enabled",
3385            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3386        })),
3387    )
3388        .into_response()
3389}
3390
3391/// Generate OpenAPI specification from recorded traffic
3392#[cfg(feature = "behavioral-cloning")]
3393async fn generate_openapi_from_traffic(
3394    State(_state): State<ManagementState>,
3395    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3396) -> impl IntoResponse {
3397    use chrono::{DateTime, Utc};
3398    use mockforge_core::intelligent_behavior::{
3399        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3400        IntelligentBehaviorConfig,
3401    };
3402    use mockforge_recorder::{
3403        database::RecorderDatabase,
3404        openapi_export::{QueryFilters, RecordingsToOpenApi},
3405    };
3406    use std::path::PathBuf;
3407
3408    // Determine database path
3409    let db_path = if let Some(ref path) = request.database_path {
3410        PathBuf::from(path)
3411    } else {
3412        std::env::current_dir()
3413            .unwrap_or_else(|_| PathBuf::from("."))
3414            .join("recordings.db")
3415    };
3416
3417    // Open database
3418    let db = match RecorderDatabase::new(&db_path).await {
3419        Ok(db) => db,
3420        Err(e) => {
3421            return (
3422                StatusCode::BAD_REQUEST,
3423                Json(serde_json::json!({
3424                    "error": "Database error",
3425                    "message": format!("Failed to open recorder database: {}", e)
3426                })),
3427            )
3428                .into_response();
3429        }
3430    };
3431
3432    // Parse time filters
3433    let since_dt = if let Some(ref since_str) = request.since {
3434        match DateTime::parse_from_rfc3339(since_str) {
3435            Ok(dt) => Some(dt.with_timezone(&Utc)),
3436            Err(e) => {
3437                return (
3438                    StatusCode::BAD_REQUEST,
3439                    Json(serde_json::json!({
3440                        "error": "Invalid date format",
3441                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3442                    })),
3443                )
3444                    .into_response();
3445            }
3446        }
3447    } else {
3448        None
3449    };
3450
3451    let until_dt = if let Some(ref until_str) = request.until {
3452        match DateTime::parse_from_rfc3339(until_str) {
3453            Ok(dt) => Some(dt.with_timezone(&Utc)),
3454            Err(e) => {
3455                return (
3456                    StatusCode::BAD_REQUEST,
3457                    Json(serde_json::json!({
3458                        "error": "Invalid date format",
3459                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3460                    })),
3461                )
3462                    .into_response();
3463            }
3464        }
3465    } else {
3466        None
3467    };
3468
3469    // Build query filters
3470    let query_filters = QueryFilters {
3471        since: since_dt,
3472        until: until_dt,
3473        path_pattern: request.path_pattern.clone(),
3474        min_status_code: None,
3475        max_requests: Some(1000),
3476    };
3477
3478    // Query HTTP exchanges
3479    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3480    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3481    // dependency, so we need to manually convert to the local version.
3482    let exchanges_from_recorder =
3483        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3484            Ok(exchanges) => exchanges,
3485            Err(e) => {
3486                return (
3487                    StatusCode::INTERNAL_SERVER_ERROR,
3488                    Json(serde_json::json!({
3489                        "error": "Query error",
3490                        "message": format!("Failed to query HTTP exchanges: {}", e)
3491                    })),
3492                )
3493                    .into_response();
3494            }
3495        };
3496
3497    if exchanges_from_recorder.is_empty() {
3498        return (
3499            StatusCode::NOT_FOUND,
3500            Json(serde_json::json!({
3501                "error": "No exchanges found",
3502                "message": "No HTTP exchanges found matching the specified filters"
3503            })),
3504        )
3505            .into_response();
3506    }
3507
3508    // Convert to local HttpExchange type to avoid version mismatch
3509    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3510    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3511        .into_iter()
3512        .map(|e| LocalHttpExchange {
3513            method: e.method,
3514            path: e.path,
3515            query_params: e.query_params,
3516            headers: e.headers,
3517            body: e.body,
3518            body_encoding: e.body_encoding,
3519            status_code: e.status_code,
3520            response_headers: e.response_headers,
3521            response_body: e.response_body,
3522            response_body_encoding: e.response_body_encoding,
3523            timestamp: e.timestamp,
3524        })
3525        .collect();
3526
3527    // Create OpenAPI generator config
3528    let behavior_config = IntelligentBehaviorConfig::default();
3529    let gen_config = OpenApiGenerationConfig {
3530        min_confidence: request.min_confidence,
3531        behavior_model: Some(behavior_config.behavior_model),
3532    };
3533
3534    // Generate OpenAPI spec
3535    let generator = OpenApiSpecGenerator::new(gen_config);
3536    let result = match generator.generate_from_exchanges(exchanges).await {
3537        Ok(result) => result,
3538        Err(e) => {
3539            return (
3540                StatusCode::INTERNAL_SERVER_ERROR,
3541                Json(serde_json::json!({
3542                    "error": "Generation error",
3543                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3544                })),
3545            )
3546                .into_response();
3547        }
3548    };
3549
3550    // Prepare response
3551    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3552        raw.clone()
3553    } else {
3554        match serde_json::to_value(&result.spec.spec) {
3555            Ok(json) => json,
3556            Err(e) => {
3557                return (
3558                    StatusCode::INTERNAL_SERVER_ERROR,
3559                    Json(serde_json::json!({
3560                        "error": "Serialization error",
3561                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3562                    })),
3563                )
3564                    .into_response();
3565            }
3566        }
3567    };
3568
3569    // Build response with metadata
3570    let response = serde_json::json!({
3571        "spec": spec_json,
3572        "metadata": {
3573            "requests_analyzed": result.metadata.requests_analyzed,
3574            "paths_inferred": result.metadata.paths_inferred,
3575            "path_confidence": result.metadata.path_confidence,
3576            "generated_at": result.metadata.generated_at.to_rfc3339(),
3577            "duration_ms": result.metadata.duration_ms,
3578        }
3579    });
3580
3581    Json(response).into_response()
3582}
3583
3584/// List all rule explanations
3585async fn list_rule_explanations(
3586    State(state): State<ManagementState>,
3587    Query(params): Query<std::collections::HashMap<String, String>>,
3588) -> impl IntoResponse {
3589    use mockforge_core::intelligent_behavior::RuleType;
3590
3591    let explanations = state.rule_explanations.read().await;
3592    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3593
3594    // Filter by rule type if provided
3595    if let Some(rule_type_str) = params.get("rule_type") {
3596        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3597            explanations_vec.retain(|e| e.rule_type == rule_type);
3598        }
3599    }
3600
3601    // Filter by minimum confidence if provided
3602    if let Some(min_confidence_str) = params.get("min_confidence") {
3603        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3604            explanations_vec.retain(|e| e.confidence >= min_confidence);
3605        }
3606    }
3607
3608    // Sort by confidence (descending) and then by generated_at (descending)
3609    explanations_vec.sort_by(|a, b| {
3610        b.confidence
3611            .partial_cmp(&a.confidence)
3612            .unwrap_or(std::cmp::Ordering::Equal)
3613            .then_with(|| b.generated_at.cmp(&a.generated_at))
3614    });
3615
3616    Json(serde_json::json!({
3617        "explanations": explanations_vec,
3618        "total": explanations_vec.len(),
3619    }))
3620    .into_response()
3621}
3622
3623/// Get a specific rule explanation by ID
3624async fn get_rule_explanation(
3625    State(state): State<ManagementState>,
3626    Path(rule_id): Path<String>,
3627) -> impl IntoResponse {
3628    let explanations = state.rule_explanations.read().await;
3629
3630    match explanations.get(&rule_id) {
3631        Some(explanation) => Json(serde_json::json!({
3632            "explanation": explanation,
3633        }))
3634        .into_response(),
3635        None => (
3636            StatusCode::NOT_FOUND,
3637            Json(serde_json::json!({
3638                "error": "Rule explanation not found",
3639                "message": format!("No explanation found for rule ID: {}", rule_id)
3640            })),
3641        )
3642            .into_response(),
3643    }
3644}
3645
3646/// Request for learning from examples
3647#[derive(Debug, Deserialize)]
3648pub struct LearnFromExamplesRequest {
3649    /// Example request/response pairs to learn from
3650    pub examples: Vec<ExamplePairRequest>,
3651    /// Optional configuration override
3652    #[serde(default)]
3653    pub config: Option<serde_json::Value>,
3654}
3655
3656/// Example pair request format
3657#[derive(Debug, Deserialize)]
3658pub struct ExamplePairRequest {
3659    /// Request data (method, path, body, etc.)
3660    pub request: serde_json::Value,
3661    /// Response data (status_code, body, etc.)
3662    pub response: serde_json::Value,
3663}
3664
3665/// Learn behavioral rules from example pairs
3666///
3667/// This endpoint accepts example request/response pairs, generates behavioral rules
3668/// with explanations, and stores the explanations for later retrieval.
3669async fn learn_from_examples(
3670    State(state): State<ManagementState>,
3671    Json(request): Json<LearnFromExamplesRequest>,
3672) -> impl IntoResponse {
3673    use mockforge_core::intelligent_behavior::{
3674        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3675        rule_generator::{ExamplePair, RuleGenerator},
3676    };
3677
3678    if request.examples.is_empty() {
3679        return (
3680            StatusCode::BAD_REQUEST,
3681            Json(serde_json::json!({
3682                "error": "No examples provided",
3683                "message": "At least one example pair is required"
3684            })),
3685        )
3686            .into_response();
3687    }
3688
3689    // Convert request examples to ExamplePair format
3690    let example_pairs: Result<Vec<ExamplePair>, String> = request
3691        .examples
3692        .into_iter()
3693        .enumerate()
3694        .map(|(idx, ex)| {
3695            // Parse request JSON to extract method, path, body, etc.
3696            let method = ex
3697                .request
3698                .get("method")
3699                .and_then(|v| v.as_str())
3700                .map(|s| s.to_string())
3701                .unwrap_or_else(|| "GET".to_string());
3702            let path = ex
3703                .request
3704                .get("path")
3705                .and_then(|v| v.as_str())
3706                .map(|s| s.to_string())
3707                .unwrap_or_else(|| "/".to_string());
3708            let request_body = ex.request.get("body").cloned();
3709            let query_params = ex
3710                .request
3711                .get("query_params")
3712                .and_then(|v| v.as_object())
3713                .map(|obj| {
3714                    obj.iter()
3715                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3716                        .collect()
3717                })
3718                .unwrap_or_default();
3719            let headers = ex
3720                .request
3721                .get("headers")
3722                .and_then(|v| v.as_object())
3723                .map(|obj| {
3724                    obj.iter()
3725                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3726                        .collect()
3727                })
3728                .unwrap_or_default();
3729
3730            // Parse response JSON to extract status, body, etc.
3731            let status = ex
3732                .response
3733                .get("status_code")
3734                .or_else(|| ex.response.get("status"))
3735                .and_then(|v| v.as_u64())
3736                .map(|n| n as u16)
3737                .unwrap_or(200);
3738            let response_body = ex.response.get("body").cloned();
3739
3740            Ok(ExamplePair {
3741                method,
3742                path,
3743                request: request_body,
3744                status,
3745                response: response_body,
3746                query_params,
3747                headers,
3748                metadata: {
3749                    let mut meta = std::collections::HashMap::new();
3750                    meta.insert("source".to_string(), "api".to_string());
3751                    meta.insert("example_index".to_string(), idx.to_string());
3752                    meta
3753                },
3754            })
3755        })
3756        .collect();
3757
3758    let example_pairs = match example_pairs {
3759        Ok(pairs) => pairs,
3760        Err(e) => {
3761            return (
3762                StatusCode::BAD_REQUEST,
3763                Json(serde_json::json!({
3764                    "error": "Invalid examples",
3765                    "message": e
3766                })),
3767            )
3768                .into_response();
3769        }
3770    };
3771
3772    // Create behavior config (use provided config or default)
3773    let behavior_config = if let Some(config_json) = request.config {
3774        // Try to deserialize custom config, fall back to default
3775        serde_json::from_value(config_json)
3776            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3777            .behavior_model
3778    } else {
3779        BehaviorModelConfig::default()
3780    };
3781
3782    // Create rule generator
3783    let generator = RuleGenerator::new(behavior_config);
3784
3785    // Generate rules with explanations
3786    let (rules, explanations) =
3787        match generator.generate_rules_with_explanations(example_pairs).await {
3788            Ok(result) => result,
3789            Err(e) => {
3790                return (
3791                    StatusCode::INTERNAL_SERVER_ERROR,
3792                    Json(serde_json::json!({
3793                        "error": "Rule generation failed",
3794                        "message": format!("Failed to generate rules: {}", e)
3795                    })),
3796                )
3797                    .into_response();
3798            }
3799        };
3800
3801    // Store explanations in ManagementState
3802    {
3803        let mut stored_explanations = state.rule_explanations.write().await;
3804        for explanation in &explanations {
3805            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3806        }
3807    }
3808
3809    // Prepare response
3810    let response = serde_json::json!({
3811        "success": true,
3812        "rules_generated": {
3813            "consistency_rules": rules.consistency_rules.len(),
3814            "schemas": rules.schemas.len(),
3815            "state_machines": rules.state_transitions.len(),
3816            "system_prompt": !rules.system_prompt.is_empty(),
3817        },
3818        "explanations": explanations.iter().map(|e| serde_json::json!({
3819            "rule_id": e.rule_id,
3820            "rule_type": e.rule_type,
3821            "confidence": e.confidence,
3822            "reasoning": e.reasoning,
3823        })).collect::<Vec<_>>(),
3824        "total_explanations": explanations.len(),
3825    });
3826
3827    Json(response).into_response()
3828}
3829
3830#[cfg(feature = "data-faker")]
3831fn extract_yaml_spec(text: &str) -> String {
3832    // Try to find YAML code blocks
3833    if let Some(start) = text.find("```yaml") {
3834        let yaml_start = text[start + 7..].trim_start();
3835        if let Some(end) = yaml_start.find("```") {
3836            return yaml_start[..end].trim().to_string();
3837        }
3838    }
3839    if let Some(start) = text.find("```") {
3840        let content_start = text[start + 3..].trim_start();
3841        if let Some(end) = content_start.find("```") {
3842            return content_start[..end].trim().to_string();
3843        }
3844    }
3845
3846    // Check if it starts with openapi: or asyncapi:
3847    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3848        return text.trim().to_string();
3849    }
3850
3851    // Return as-is if no code blocks found
3852    text.trim().to_string()
3853}
3854
3855/// Extract GraphQL schema from text content
3856#[cfg(feature = "data-faker")]
3857fn extract_graphql_schema(text: &str) -> String {
3858    // Try to find GraphQL code blocks
3859    if let Some(start) = text.find("```graphql") {
3860        let schema_start = text[start + 10..].trim_start();
3861        if let Some(end) = schema_start.find("```") {
3862            return schema_start[..end].trim().to_string();
3863        }
3864    }
3865    if let Some(start) = text.find("```") {
3866        let content_start = text[start + 3..].trim_start();
3867        if let Some(end) = content_start.find("```") {
3868            return content_start[..end].trim().to_string();
3869        }
3870    }
3871
3872    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3873    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3874        return text.trim().to_string();
3875    }
3876
3877    text.trim().to_string()
3878}
3879
3880// ========== Chaos Engineering Management ==========
3881
3882/// Get current chaos engineering configuration
3883async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3884    #[cfg(feature = "chaos")]
3885    {
3886        if let Some(chaos_state) = &_state.chaos_api_state {
3887            let config = chaos_state.config.read().await;
3888            // Convert ChaosConfig to JSON response format
3889            Json(serde_json::json!({
3890                "enabled": config.enabled,
3891                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3892                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3893                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3894                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3895            }))
3896            .into_response()
3897        } else {
3898            // Chaos API not available, return default
3899            Json(serde_json::json!({
3900                "enabled": false,
3901                "latency": null,
3902                "fault_injection": null,
3903                "rate_limit": null,
3904                "traffic_shaping": null,
3905            }))
3906            .into_response()
3907        }
3908    }
3909    #[cfg(not(feature = "chaos"))]
3910    {
3911        // Chaos feature not enabled
3912        Json(serde_json::json!({
3913            "enabled": false,
3914            "latency": null,
3915            "fault_injection": null,
3916            "rate_limit": null,
3917            "traffic_shaping": null,
3918        }))
3919        .into_response()
3920    }
3921}
3922
3923/// Request to update chaos configuration
3924#[derive(Debug, Deserialize)]
3925pub struct ChaosConfigUpdate {
3926    /// Whether to enable chaos engineering
3927    pub enabled: Option<bool>,
3928    /// Latency configuration
3929    pub latency: Option<serde_json::Value>,
3930    /// Fault injection configuration
3931    pub fault_injection: Option<serde_json::Value>,
3932    /// Rate limiting configuration
3933    pub rate_limit: Option<serde_json::Value>,
3934    /// Traffic shaping configuration
3935    pub traffic_shaping: Option<serde_json::Value>,
3936}
3937
3938/// Update chaos engineering configuration
3939async fn update_chaos_config(
3940    State(_state): State<ManagementState>,
3941    Json(_config_update): Json<ChaosConfigUpdate>,
3942) -> impl IntoResponse {
3943    #[cfg(feature = "chaos")]
3944    {
3945        if let Some(chaos_state) = &_state.chaos_api_state {
3946            use mockforge_chaos::config::{
3947                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3948            };
3949
3950            let mut config = chaos_state.config.write().await;
3951
3952            // Update enabled flag if provided
3953            if let Some(enabled) = _config_update.enabled {
3954                config.enabled = enabled;
3955            }
3956
3957            // Update latency config if provided
3958            if let Some(latency_json) = _config_update.latency {
3959                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3960                    config.latency = Some(latency);
3961                }
3962            }
3963
3964            // Update fault injection config if provided
3965            if let Some(fault_json) = _config_update.fault_injection {
3966                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3967                    config.fault_injection = Some(fault);
3968                }
3969            }
3970
3971            // Update rate limit config if provided
3972            if let Some(rate_json) = _config_update.rate_limit {
3973                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3974                    config.rate_limit = Some(rate);
3975                }
3976            }
3977
3978            // Update traffic shaping config if provided
3979            if let Some(traffic_json) = _config_update.traffic_shaping {
3980                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3981                    config.traffic_shaping = Some(traffic);
3982                }
3983            }
3984
3985            // Reinitialize middleware injectors with new config
3986            // The middleware will pick up the changes on the next request
3987            drop(config);
3988
3989            info!("Chaos configuration updated successfully");
3990            Json(serde_json::json!({
3991                "success": true,
3992                "message": "Chaos configuration updated and applied"
3993            }))
3994            .into_response()
3995        } else {
3996            (
3997                StatusCode::SERVICE_UNAVAILABLE,
3998                Json(serde_json::json!({
3999                    "success": false,
4000                    "error": "Chaos API not available",
4001                    "message": "Chaos engineering is not enabled or configured"
4002                })),
4003            )
4004                .into_response()
4005        }
4006    }
4007    #[cfg(not(feature = "chaos"))]
4008    {
4009        (
4010            StatusCode::NOT_IMPLEMENTED,
4011            Json(serde_json::json!({
4012                "success": false,
4013                "error": "Chaos feature not enabled",
4014                "message": "Chaos engineering feature is not compiled into this build"
4015            })),
4016        )
4017            .into_response()
4018    }
4019}
4020
4021// ========== Network Profile Management ==========
4022
4023/// List available network profiles
4024async fn list_network_profiles() -> impl IntoResponse {
4025    use mockforge_core::network_profiles::NetworkProfileCatalog;
4026
4027    let catalog = NetworkProfileCatalog::default();
4028    let profiles: Vec<serde_json::Value> = catalog
4029        .list_profiles_with_description()
4030        .iter()
4031        .map(|(name, description)| {
4032            serde_json::json!({
4033                "name": name,
4034                "description": description,
4035            })
4036        })
4037        .collect();
4038
4039    Json(serde_json::json!({
4040        "profiles": profiles
4041    }))
4042    .into_response()
4043}
4044
4045#[derive(Debug, Deserialize)]
4046/// Request to apply a network profile
4047pub struct ApplyNetworkProfileRequest {
4048    /// Name of the network profile to apply
4049    pub profile_name: String,
4050}
4051
4052/// Apply a network profile
4053async fn apply_network_profile(
4054    State(state): State<ManagementState>,
4055    Json(request): Json<ApplyNetworkProfileRequest>,
4056) -> impl IntoResponse {
4057    use mockforge_core::network_profiles::NetworkProfileCatalog;
4058
4059    let catalog = NetworkProfileCatalog::default();
4060    if let Some(profile) = catalog.get(&request.profile_name) {
4061        // Apply profile to server configuration if available
4062        // NetworkProfile contains latency and traffic_shaping configs
4063        if let Some(server_config) = &state.server_config {
4064            let mut config = server_config.write().await;
4065
4066            // Apply network profile's traffic shaping to core config
4067            use mockforge_core::config::NetworkShapingConfig;
4068
4069            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
4070            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
4071            // which has bandwidth and burst_loss fields
4072            let network_shaping = NetworkShapingConfig {
4073                enabled: profile.traffic_shaping.bandwidth.enabled
4074                    || profile.traffic_shaping.burst_loss.enabled,
4075                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4076                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4077                max_connections: 1000, // Default value
4078            };
4079
4080            // Update chaos config if it exists, or create it
4081            // Chaos config is in observability.chaos, not core.chaos
4082            if let Some(ref mut chaos) = config.observability.chaos {
4083                chaos.traffic_shaping = Some(network_shaping);
4084            } else {
4085                // Create minimal chaos config with traffic shaping
4086                use mockforge_core::config::ChaosEngConfig;
4087                config.observability.chaos = Some(ChaosEngConfig {
4088                    enabled: true,
4089                    latency: None,
4090                    fault_injection: None,
4091                    rate_limit: None,
4092                    traffic_shaping: Some(network_shaping),
4093                    scenario: None,
4094                });
4095            }
4096
4097            info!("Network profile '{}' applied to server configuration", request.profile_name);
4098        } else {
4099            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4100        }
4101
4102        // Also update chaos API state if available
4103        #[cfg(feature = "chaos")]
4104        {
4105            if let Some(chaos_state) = &state.chaos_api_state {
4106                use mockforge_chaos::config::TrafficShapingConfig;
4107
4108                let mut chaos_config = chaos_state.config.write().await;
4109                // Apply profile's traffic shaping to chaos API state
4110                let chaos_traffic_shaping = TrafficShapingConfig {
4111                    enabled: profile.traffic_shaping.bandwidth.enabled
4112                        || profile.traffic_shaping.burst_loss.enabled,
4113                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4114                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4115                    max_connections: 0,
4116                    connection_timeout_ms: 30000,
4117                };
4118                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4119                chaos_config.enabled = true; // Enable chaos when applying a profile
4120                drop(chaos_config);
4121                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4122            }
4123        }
4124
4125        Json(serde_json::json!({
4126            "success": true,
4127            "message": format!("Network profile '{}' applied", request.profile_name),
4128            "profile": {
4129                "name": profile.name,
4130                "description": profile.description,
4131            }
4132        }))
4133        .into_response()
4134    } else {
4135        (
4136            StatusCode::NOT_FOUND,
4137            Json(serde_json::json!({
4138                "error": "Profile not found",
4139                "message": format!("Network profile '{}' not found", request.profile_name)
4140            })),
4141        )
4142            .into_response()
4143    }
4144}
4145
4146/// Build the management API router with UI Builder support
4147pub fn management_router_with_ui_builder(
4148    state: ManagementState,
4149    server_config: mockforge_core::config::ServerConfig,
4150) -> Router {
4151    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4152
4153    // Create the base management router
4154    let management = management_router(state);
4155
4156    // Create UI Builder state and router
4157    let ui_builder_state = UIBuilderState::new(server_config);
4158    let ui_builder = create_ui_builder_router(ui_builder_state);
4159
4160    // Nest UI Builder under /ui-builder
4161    management.nest("/ui-builder", ui_builder)
4162}
4163
4164/// Build management router with spec import API
4165pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4166    use crate::spec_import::{spec_import_router, SpecImportState};
4167
4168    // Create base management router
4169    let management = management_router(state);
4170
4171    // Merge with spec import router
4172    Router::new()
4173        .merge(management)
4174        .merge(spec_import_router(SpecImportState::new()))
4175}
4176
4177#[cfg(test)]
4178mod tests {
4179    use super::*;
4180
4181    #[tokio::test]
4182    async fn test_create_and_get_mock() {
4183        let state = ManagementState::new(None, None, 3000);
4184
4185        let mock = MockConfig {
4186            id: "test-1".to_string(),
4187            name: "Test Mock".to_string(),
4188            method: "GET".to_string(),
4189            path: "/test".to_string(),
4190            response: MockResponse {
4191                body: serde_json::json!({"message": "test"}),
4192                headers: None,
4193            },
4194            enabled: true,
4195            latency_ms: None,
4196            status_code: Some(200),
4197            request_match: None,
4198            priority: None,
4199            scenario: None,
4200            required_scenario_state: None,
4201            new_scenario_state: None,
4202        };
4203
4204        // Create mock
4205        {
4206            let mut mocks = state.mocks.write().await;
4207            mocks.push(mock.clone());
4208        }
4209
4210        // Get mock
4211        let mocks = state.mocks.read().await;
4212        let found = mocks.iter().find(|m| m.id == "test-1");
4213        assert!(found.is_some());
4214        assert_eq!(found.unwrap().name, "Test Mock");
4215    }
4216
4217    #[tokio::test]
4218    async fn test_server_stats() {
4219        let state = ManagementState::new(None, None, 3000);
4220
4221        // Add some mocks
4222        {
4223            let mut mocks = state.mocks.write().await;
4224            mocks.push(MockConfig {
4225                id: "1".to_string(),
4226                name: "Mock 1".to_string(),
4227                method: "GET".to_string(),
4228                path: "/test1".to_string(),
4229                response: MockResponse {
4230                    body: serde_json::json!({}),
4231                    headers: None,
4232                },
4233                enabled: true,
4234                latency_ms: None,
4235                status_code: Some(200),
4236                request_match: None,
4237                priority: None,
4238                scenario: None,
4239                required_scenario_state: None,
4240                new_scenario_state: None,
4241            });
4242            mocks.push(MockConfig {
4243                id: "2".to_string(),
4244                name: "Mock 2".to_string(),
4245                method: "POST".to_string(),
4246                path: "/test2".to_string(),
4247                response: MockResponse {
4248                    body: serde_json::json!({}),
4249                    headers: None,
4250                },
4251                enabled: false,
4252                latency_ms: None,
4253                status_code: Some(201),
4254                request_match: None,
4255                priority: None,
4256                scenario: None,
4257                required_scenario_state: None,
4258                new_scenario_state: None,
4259            });
4260        }
4261
4262        let mocks = state.mocks.read().await;
4263        assert_eq!(mocks.len(), 2);
4264        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4265    }
4266
4267    #[test]
4268    fn test_mock_matches_request_with_xpath_absolute_path() {
4269        let mock = MockConfig {
4270            id: "xpath-1".to_string(),
4271            name: "XPath Match".to_string(),
4272            method: "POST".to_string(),
4273            path: "/xml".to_string(),
4274            response: MockResponse {
4275                body: serde_json::json!({"ok": true}),
4276                headers: None,
4277            },
4278            enabled: true,
4279            latency_ms: None,
4280            status_code: Some(200),
4281            request_match: Some(RequestMatchCriteria {
4282                xpath: Some("/root/order/id".to_string()),
4283                ..Default::default()
4284            }),
4285            priority: None,
4286            scenario: None,
4287            required_scenario_state: None,
4288            new_scenario_state: None,
4289        };
4290
4291        let body = br#"<root><order><id>123</id></order></root>"#;
4292        let headers = std::collections::HashMap::new();
4293        let query = std::collections::HashMap::new();
4294
4295        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4296    }
4297
4298    #[test]
4299    fn test_mock_matches_request_with_xpath_text_predicate() {
4300        let mock = MockConfig {
4301            id: "xpath-2".to_string(),
4302            name: "XPath Predicate Match".to_string(),
4303            method: "POST".to_string(),
4304            path: "/xml".to_string(),
4305            response: MockResponse {
4306                body: serde_json::json!({"ok": true}),
4307                headers: None,
4308            },
4309            enabled: true,
4310            latency_ms: None,
4311            status_code: Some(200),
4312            request_match: Some(RequestMatchCriteria {
4313                xpath: Some("//order/id[text()='123']".to_string()),
4314                ..Default::default()
4315            }),
4316            priority: None,
4317            scenario: None,
4318            required_scenario_state: None,
4319            new_scenario_state: None,
4320        };
4321
4322        let body = br#"<root><order><id>123</id></order></root>"#;
4323        let headers = std::collections::HashMap::new();
4324        let query = std::collections::HashMap::new();
4325
4326        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4327    }
4328
4329    #[test]
4330    fn test_mock_matches_request_with_xpath_no_match() {
4331        let mock = MockConfig {
4332            id: "xpath-3".to_string(),
4333            name: "XPath No Match".to_string(),
4334            method: "POST".to_string(),
4335            path: "/xml".to_string(),
4336            response: MockResponse {
4337                body: serde_json::json!({"ok": true}),
4338                headers: None,
4339            },
4340            enabled: true,
4341            latency_ms: None,
4342            status_code: Some(200),
4343            request_match: Some(RequestMatchCriteria {
4344                xpath: Some("//order/id[text()='456']".to_string()),
4345                ..Default::default()
4346            }),
4347            priority: None,
4348            scenario: None,
4349            required_scenario_state: None,
4350            new_scenario_state: None,
4351        };
4352
4353        let body = br#"<root><order><id>123</id></order></root>"#;
4354        let headers = std::collections::HashMap::new();
4355        let query = std::collections::HashMap::new();
4356
4357        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4358    }
4359}