Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125    true
126}
127
128/// Mock response configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131    /// Response body as JSON
132    pub body: serde_json::Value,
133    /// Optional custom response headers
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138/// Request matching criteria for advanced request matching
139#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141    /// Headers that must be present and match (case-insensitive header names)
142    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143    pub headers: std::collections::HashMap<String, String>,
144    /// Query parameters that must be present and match
145    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146    pub query_params: std::collections::HashMap<String, String>,
147    /// Request body pattern (supports exact match or regex)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub body_pattern: Option<String>,
150    /// JSONPath expression for JSON body matching
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub json_path: Option<String>,
153    /// XPath expression for XML body matching
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub xpath: Option<String>,
156    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub custom_matcher: Option<String>,
159}
160
161/// Check if a request matches the given mock configuration
162///
163/// This function implements comprehensive request matching including:
164/// - Method and path matching
165/// - Header matching (with regex support)
166/// - Query parameter matching
167/// - Body pattern matching (exact, regex, JSONPath, XPath)
168/// - Custom matcher expressions
169pub fn mock_matches_request(
170    mock: &MockConfig,
171    method: &str,
172    path: &str,
173    headers: &std::collections::HashMap<String, String>,
174    query_params: &std::collections::HashMap<String, String>,
175    body: Option<&[u8]>,
176) -> bool {
177    use regex::Regex;
178
179    // Check if mock is enabled
180    if !mock.enabled {
181        return false;
182    }
183
184    // Check method (case-insensitive)
185    if mock.method.to_uppercase() != method.to_uppercase() {
186        return false;
187    }
188
189    // Check path pattern (supports wildcards and path parameters)
190    if !path_matches_pattern(&mock.path, path) {
191        return false;
192    }
193
194    // Check request matching criteria if present
195    if let Some(criteria) = &mock.request_match {
196        // Check headers
197        for (key, expected_value) in &criteria.headers {
198            let header_key_lower = key.to_lowercase();
199            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201            if let Some((_, actual_value)) = found {
202                // Try regex match first, then exact match
203                if let Ok(re) = Regex::new(expected_value) {
204                    if !re.is_match(actual_value) {
205                        return false;
206                    }
207                } else if actual_value != expected_value {
208                    return false;
209                }
210            } else {
211                return false; // Header not found
212            }
213        }
214
215        // Check query parameters
216        for (key, expected_value) in &criteria.query_params {
217            if let Some(actual_value) = query_params.get(key) {
218                if actual_value != expected_value {
219                    return false;
220                }
221            } else {
222                return false; // Query param not found
223            }
224        }
225
226        // Check body pattern
227        if let Some(pattern) = &criteria.body_pattern {
228            if let Some(body_bytes) = body {
229                let body_str = String::from_utf8_lossy(body_bytes);
230                // Try regex first, then exact match
231                if let Ok(re) = Regex::new(pattern) {
232                    if !re.is_match(&body_str) {
233                        return false;
234                    }
235                } else if body_str.as_ref() != pattern {
236                    return false;
237                }
238            } else {
239                return false; // Body required but not present
240            }
241        }
242
243        // Check JSONPath (simplified implementation)
244        if let Some(json_path) = &criteria.json_path {
245            if let Some(body_bytes) = body {
246                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248                        // Simple JSONPath check
249                        if !json_path_exists(&json_value, json_path) {
250                            return false;
251                        }
252                    }
253                }
254            }
255        }
256
257        // Check XPath (supports a focused subset)
258        if let Some(xpath) = &criteria.xpath {
259            if let Some(body_bytes) = body {
260                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261                    if !xml_xpath_exists(body_str, xpath) {
262                        return false;
263                    }
264                } else {
265                    return false;
266                }
267            } else {
268                return false; // Body required but not present
269            }
270        }
271
272        // Check custom matcher
273        if let Some(custom) = &criteria.custom_matcher {
274            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275                return false;
276            }
277        }
278    }
279
280    true
281}
282
283/// Check if a path matches a pattern (supports wildcards and path parameters)
284fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285    // Exact match
286    if pattern == path {
287        return true;
288    }
289
290    // Wildcard match
291    if pattern == "*" {
292        return true;
293    }
294
295    // Path parameter matching (e.g., /users/{id} matches /users/123)
296    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299    if pattern_parts.len() != path_parts.len() {
300        // Check for wildcard patterns
301        if pattern.contains('*') {
302            return matches_wildcard_pattern(pattern, path);
303        }
304        return false;
305    }
306
307    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308        // Check for path parameters {param}
309        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310            continue; // Matches any value
311        }
312
313        if pattern_part != path_part {
314            return false;
315        }
316    }
317
318    true
319}
320
321/// Check if path matches a wildcard pattern
322fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323    use regex::Regex;
324
325    // Convert pattern to regex
326    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329        return re.is_match(path);
330    }
331
332    false
333}
334
335/// Check if a JSONPath exists in a JSON value
336///
337/// Supports:
338/// - `$` — root element
339/// - `$.field.subfield` — nested object access
340/// - `$.items[0].name` — array index access
341/// - `$.items[*]` — array wildcard (checks array is non-empty)
342fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343    let path = if json_path == "$" {
344        return true;
345    } else if let Some(p) = json_path.strip_prefix("$.") {
346        p
347    } else if let Some(p) = json_path.strip_prefix('$') {
348        p.strip_prefix('.').unwrap_or(p)
349    } else {
350        json_path
351    };
352
353    let mut current = json;
354    for segment in split_json_path_segments(path) {
355        match segment {
356            JsonPathSegment::Field(name) => {
357                if let Some(obj) = current.as_object() {
358                    if let Some(value) = obj.get(name) {
359                        current = value;
360                    } else {
361                        return false;
362                    }
363                } else {
364                    return false;
365                }
366            }
367            JsonPathSegment::Index(idx) => {
368                if let Some(arr) = current.as_array() {
369                    if let Some(value) = arr.get(idx) {
370                        current = value;
371                    } else {
372                        return false;
373                    }
374                } else {
375                    return false;
376                }
377            }
378            JsonPathSegment::Wildcard => {
379                if let Some(arr) = current.as_array() {
380                    return !arr.is_empty();
381                }
382                return false;
383            }
384        }
385    }
386    true
387}
388
389enum JsonPathSegment<'a> {
390    Field(&'a str),
391    Index(usize),
392    Wildcard,
393}
394
395/// Split a JSONPath (without the leading `$`) into segments
396fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397    let mut segments = Vec::new();
398    for part in path.split('.') {
399        if part.is_empty() {
400            continue;
401        }
402        if let Some(bracket_start) = part.find('[') {
403            let field_name = &part[..bracket_start];
404            if !field_name.is_empty() {
405                segments.push(JsonPathSegment::Field(field_name));
406            }
407            let bracket_content = &part[bracket_start + 1..part.len() - 1];
408            if bracket_content == "*" {
409                segments.push(JsonPathSegment::Wildcard);
410            } else if let Ok(idx) = bracket_content.parse::<usize>() {
411                segments.push(JsonPathSegment::Index(idx));
412            }
413        } else {
414            segments.push(JsonPathSegment::Field(part));
415        }
416    }
417    segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422    name: String,
423    text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427    if segment.is_empty() {
428        return None;
429    }
430
431    let trimmed = segment.trim();
432    if let Some(bracket_start) = trimmed.find('[') {
433        if !trimmed.ends_with(']') {
434            return None;
435        }
436
437        let name = trimmed[..bracket_start].trim();
438        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439        let predicate = predicate.trim();
440
441        // Support simple predicate: [text()="value"] or [text()='value']
442        if let Some(raw) = predicate.strip_prefix("text()=") {
443            let raw = raw.trim();
444            if raw.len() >= 2
445                && ((raw.starts_with('"') && raw.ends_with('"'))
446                    || (raw.starts_with('\'') && raw.ends_with('\'')))
447            {
448                let text = raw[1..raw.len() - 1].to_string();
449                if !name.is_empty() {
450                    return Some(XPathSegment {
451                        name: name.to_string(),
452                        text_equals: Some(text),
453                    });
454                }
455            }
456        }
457
458        None
459    } else {
460        Some(XPathSegment {
461            name: trimmed.to_string(),
462            text_equals: None,
463        })
464    }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468    if !node.is_element() {
469        return false;
470    }
471    if node.tag_name().name() != segment.name {
472        return false;
473    }
474    match &segment.text_equals {
475        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476        None => true,
477    }
478}
479
480/// Check if an XPath expression matches an XML body.
481///
482/// Supported subset:
483/// - Absolute paths: `/root/child/item`
484/// - Descendant search: `//item` and `//parent/child`
485/// - Optional text predicate per segment: `item[text()="value"]`
486fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487    let doc = match roxmltree::Document::parse(xml_body) {
488        Ok(doc) => doc,
489        Err(err) => {
490            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491            return false;
492        }
493    };
494
495    let expr = xpath.trim();
496    if expr.is_empty() {
497        return false;
498    }
499
500    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501        (true, rest)
502    } else if let Some(rest) = expr.strip_prefix('/') {
503        (false, rest)
504    } else {
505        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506        return false;
507    };
508
509    let segments: Vec<XPathSegment> = path_str
510        .split('/')
511        .filter(|s| !s.trim().is_empty())
512        .filter_map(parse_xpath_segment)
513        .collect();
514
515    if segments.is_empty() {
516        return false;
517    }
518
519    if is_descendant {
520        let first = &segments[0];
521        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522            let mut frontier = vec![node];
523            for segment in &segments[1..] {
524                let mut next_frontier = Vec::new();
525                for parent in &frontier {
526                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527                        next_frontier.push(child);
528                    }
529                }
530                if next_frontier.is_empty() {
531                    frontier.clear();
532                    break;
533                }
534                frontier = next_frontier;
535            }
536            if !frontier.is_empty() {
537                return true;
538            }
539        }
540        false
541    } else {
542        let mut frontier = vec![doc.root_element()];
543        for (index, segment) in segments.iter().enumerate() {
544            let mut next_frontier = Vec::new();
545            for parent in &frontier {
546                if index == 0 {
547                    if segment_matches(*parent, segment) {
548                        next_frontier.push(*parent);
549                    }
550                    continue;
551                }
552                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553                    next_frontier.push(child);
554                }
555            }
556            if next_frontier.is_empty() {
557                return false;
558            }
559            frontier = next_frontier;
560        }
561        !frontier.is_empty()
562    }
563}
564
565/// Evaluate a custom matcher expression
566fn evaluate_custom_matcher(
567    expression: &str,
568    method: &str,
569    path: &str,
570    headers: &std::collections::HashMap<String, String>,
571    query_params: &std::collections::HashMap<String, String>,
572    body: Option<&[u8]>,
573) -> bool {
574    use regex::Regex;
575
576    let expr = expression.trim();
577
578    // Handle equality expressions (field == "value")
579    if expr.contains("==") {
580        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581        if parts.len() != 2 {
582            return false;
583        }
584
585        let field = parts[0];
586        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588        match field {
589            "method" => method == expected_value,
590            "path" => path == expected_value,
591            _ if field.starts_with("headers.") => {
592                let header_name = &field[8..];
593                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594            }
595            _ if field.starts_with("query.") => {
596                let param_name = &field[6..];
597                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598            }
599            _ => false,
600        }
601    }
602    // Handle regex match expressions (field =~ "pattern")
603    else if expr.contains("=~") {
604        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605        if parts.len() != 2 {
606            return false;
607        }
608
609        let field = parts[0];
610        let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612        if let Ok(re) = Regex::new(pattern) {
613            match field {
614                "method" => re.is_match(method),
615                "path" => re.is_match(path),
616                _ if field.starts_with("headers.") => {
617                    let header_name = &field[8..];
618                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619                }
620                _ if field.starts_with("query.") => {
621                    let param_name = &field[6..];
622                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623                }
624                _ => false,
625            }
626        } else {
627            false
628        }
629    }
630    // Handle contains expressions (field contains "value")
631    else if expr.contains("contains") {
632        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633        if parts.len() != 2 {
634            return false;
635        }
636
637        let field = parts[0];
638        let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640        match field {
641            "path" => path.contains(search_value),
642            _ if field.starts_with("headers.") => {
643                let header_name = &field[8..];
644                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645            }
646            _ if field.starts_with("body") => {
647                if let Some(body_bytes) = body {
648                    let body_str = String::from_utf8_lossy(body_bytes);
649                    body_str.contains(search_value)
650                } else {
651                    false
652                }
653            }
654            _ => false,
655        }
656    } else {
657        // Unknown expression format
658        tracing::warn!("Unknown custom matcher expression format: {}", expr);
659        false
660    }
661}
662
663/// Server statistics
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666    /// Server uptime in seconds
667    pub uptime_seconds: u64,
668    /// Total number of requests processed
669    pub total_requests: u64,
670    /// Number of active mock configurations
671    pub active_mocks: usize,
672    /// Number of currently enabled mocks
673    pub enabled_mocks: usize,
674    /// Number of registered API routes
675    pub registered_routes: usize,
676}
677
678/// Server configuration info
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681    /// MockForge version string
682    pub version: String,
683    /// Server port number
684    pub port: u16,
685    /// Whether an OpenAPI spec is loaded
686    pub has_openapi_spec: bool,
687    /// Optional path to the OpenAPI spec file
688    #[serde(skip_serializing_if = "Option::is_none")]
689    pub spec_path: Option<String>,
690}
691
692/// Shared state for the management API
693#[derive(Clone)]
694pub struct ManagementState {
695    /// Collection of mock configurations
696    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697    /// Optional OpenAPI specification
698    pub spec: Option<Arc<OpenApiSpec>>,
699    /// Optional path to the OpenAPI spec file
700    pub spec_path: Option<String>,
701    /// Server port number
702    pub port: u16,
703    /// Server start time for uptime calculation
704    pub start_time: std::time::Instant,
705    /// Counter for total requests processed
706    pub request_counter: Arc<RwLock<u64>>,
707    /// Optional proxy configuration for migration pipeline
708    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709    /// Optional SMTP registry for email mocking
710    #[cfg(feature = "smtp")]
711    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712    /// Optional MQTT broker for message mocking
713    #[cfg(feature = "mqtt")]
714    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715    /// Optional Kafka broker for event streaming
716    #[cfg(feature = "kafka")]
717    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718    /// Broadcast channel for message events (MQTT & Kafka)
719    #[cfg(any(feature = "mqtt", feature = "kafka"))]
720    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721    /// State machine manager for scenario state machines
722    pub state_machine_manager:
723        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724    /// Optional WebSocket broadcast channel for real-time updates
725    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726    /// Lifecycle hook registry for extensibility
727    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728    /// Rule explanations storage (in-memory for now)
729    pub rule_explanations: Arc<
730        RwLock<
731            std::collections::HashMap<
732                String,
733                mockforge_core::intelligent_behavior::RuleExplanation,
734            >,
735        >,
736    >,
737    /// Optional chaos API state for chaos config management
738    #[cfg(feature = "chaos")]
739    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740    /// Optional server configuration for profile application
741    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742}
743
744impl ManagementState {
745    /// Create a new management state
746    ///
747    /// # Arguments
748    /// * `spec` - Optional OpenAPI specification
749    /// * `spec_path` - Optional path to the OpenAPI spec file
750    /// * `port` - Server port number
751    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
752        Self {
753            mocks: Arc::new(RwLock::new(Vec::new())),
754            spec,
755            spec_path,
756            port,
757            start_time: std::time::Instant::now(),
758            request_counter: Arc::new(RwLock::new(0)),
759            proxy_config: None,
760            #[cfg(feature = "smtp")]
761            smtp_registry: None,
762            #[cfg(feature = "mqtt")]
763            mqtt_broker: None,
764            #[cfg(feature = "kafka")]
765            kafka_broker: None,
766            #[cfg(any(feature = "mqtt", feature = "kafka"))]
767            message_events: {
768                let capacity = get_message_broadcast_capacity();
769                let (tx, _) = broadcast::channel(capacity);
770                Arc::new(tx)
771            },
772            state_machine_manager: Arc::new(RwLock::new(
773                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
774            )),
775            ws_broadcast: None,
776            lifecycle_hooks: None,
777            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
778            #[cfg(feature = "chaos")]
779            chaos_api_state: None,
780            server_config: None,
781        }
782    }
783
784    /// Add lifecycle hook registry to management state
785    pub fn with_lifecycle_hooks(
786        mut self,
787        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
788    ) -> Self {
789        self.lifecycle_hooks = Some(hooks);
790        self
791    }
792
793    /// Add WebSocket broadcast channel to management state
794    pub fn with_ws_broadcast(
795        mut self,
796        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
797    ) -> Self {
798        self.ws_broadcast = Some(ws_broadcast);
799        self
800    }
801
802    /// Add proxy configuration to management state
803    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
804        self.proxy_config = Some(proxy_config);
805        self
806    }
807
808    #[cfg(feature = "smtp")]
809    /// Add SMTP registry to management state
810    pub fn with_smtp_registry(
811        mut self,
812        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
813    ) -> Self {
814        self.smtp_registry = Some(smtp_registry);
815        self
816    }
817
818    #[cfg(feature = "mqtt")]
819    /// Add MQTT broker to management state
820    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
821        self.mqtt_broker = Some(mqtt_broker);
822        self
823    }
824
825    #[cfg(feature = "kafka")]
826    /// Add Kafka broker to management state
827    pub fn with_kafka_broker(
828        mut self,
829        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
830    ) -> Self {
831        self.kafka_broker = Some(kafka_broker);
832        self
833    }
834
835    #[cfg(feature = "chaos")]
836    /// Add chaos API state to management state
837    pub fn with_chaos_api_state(
838        mut self,
839        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
840    ) -> Self {
841        self.chaos_api_state = Some(chaos_api_state);
842        self
843    }
844
845    /// Add server configuration to management state
846    pub fn with_server_config(
847        mut self,
848        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
849    ) -> Self {
850        self.server_config = Some(server_config);
851        self
852    }
853}
854
855/// List all mocks
856async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
857    let mocks = state.mocks.read().await;
858    Json(serde_json::json!({
859        "mocks": *mocks,
860        "total": mocks.len(),
861        "enabled": mocks.iter().filter(|m| m.enabled).count()
862    }))
863}
864
865/// Get a specific mock by ID
866async fn get_mock(
867    State(state): State<ManagementState>,
868    Path(id): Path<String>,
869) -> Result<Json<MockConfig>, StatusCode> {
870    let mocks = state.mocks.read().await;
871    mocks
872        .iter()
873        .find(|m| m.id == id)
874        .cloned()
875        .map(Json)
876        .ok_or(StatusCode::NOT_FOUND)
877}
878
879/// Create a new mock
880async fn create_mock(
881    State(state): State<ManagementState>,
882    Json(mut mock): Json<MockConfig>,
883) -> Result<Json<MockConfig>, StatusCode> {
884    let mut mocks = state.mocks.write().await;
885
886    // Generate ID if not provided
887    if mock.id.is_empty() {
888        mock.id = uuid::Uuid::new_v4().to_string();
889    }
890
891    // Check for duplicate ID
892    if mocks.iter().any(|m| m.id == mock.id) {
893        return Err(StatusCode::CONFLICT);
894    }
895
896    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
897
898    // Invoke lifecycle hooks
899    if let Some(hooks) = &state.lifecycle_hooks {
900        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
901            id: mock.id.clone(),
902            name: mock.name.clone(),
903            config: serde_json::to_value(&mock).unwrap_or_default(),
904        };
905        hooks.invoke_mock_created(&event).await;
906    }
907
908    mocks.push(mock.clone());
909
910    // Broadcast WebSocket event
911    if let Some(tx) = &state.ws_broadcast {
912        let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
913    }
914
915    Ok(Json(mock))
916}
917
918/// Update an existing mock
919async fn update_mock(
920    State(state): State<ManagementState>,
921    Path(id): Path<String>,
922    Json(updated_mock): Json<MockConfig>,
923) -> Result<Json<MockConfig>, StatusCode> {
924    let mut mocks = state.mocks.write().await;
925
926    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
927
928    // Get old mock for comparison
929    let old_mock = mocks[position].clone();
930
931    info!("Updating mock: {}", id);
932    mocks[position] = updated_mock.clone();
933
934    // Invoke lifecycle hooks
935    if let Some(hooks) = &state.lifecycle_hooks {
936        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
937            id: updated_mock.id.clone(),
938            name: updated_mock.name.clone(),
939            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
940        };
941        hooks.invoke_mock_updated(&event).await;
942
943        // Check if enabled state changed
944        if old_mock.enabled != updated_mock.enabled {
945            let state_event = if updated_mock.enabled {
946                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
947                    id: updated_mock.id.clone(),
948                }
949            } else {
950                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
951                    id: updated_mock.id.clone(),
952                }
953            };
954            hooks.invoke_mock_state_changed(&state_event).await;
955        }
956    }
957
958    // Broadcast WebSocket event
959    if let Some(tx) = &state.ws_broadcast {
960        let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
961    }
962
963    Ok(Json(updated_mock))
964}
965
966/// Delete a mock
967async fn delete_mock(
968    State(state): State<ManagementState>,
969    Path(id): Path<String>,
970) -> Result<StatusCode, StatusCode> {
971    let mut mocks = state.mocks.write().await;
972
973    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
974
975    // Get mock info before deletion for lifecycle hooks
976    let deleted_mock = mocks[position].clone();
977
978    info!("Deleting mock: {}", id);
979    mocks.remove(position);
980
981    // Invoke lifecycle hooks
982    if let Some(hooks) = &state.lifecycle_hooks {
983        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
984            id: deleted_mock.id.clone(),
985            name: deleted_mock.name.clone(),
986        };
987        hooks.invoke_mock_deleted(&event).await;
988    }
989
990    // Broadcast WebSocket event
991    if let Some(tx) = &state.ws_broadcast {
992        let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
993    }
994
995    Ok(StatusCode::NO_CONTENT)
996}
997
998/// Request to validate configuration
999#[derive(Debug, Deserialize)]
1000pub struct ValidateConfigRequest {
1001    /// Configuration to validate (as JSON)
1002    pub config: serde_json::Value,
1003    /// Format of the configuration ("json" or "yaml")
1004    #[serde(default = "default_format")]
1005    pub format: String,
1006}
1007
1008fn default_format() -> String {
1009    "json".to_string()
1010}
1011
1012/// Validate configuration without applying it
1013async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1014    use mockforge_core::config::ServerConfig;
1015
1016    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1017        "yaml" | "yml" => {
1018            let yaml_str = match serde_json::to_string(&request.config) {
1019                Ok(s) => s,
1020                Err(e) => {
1021                    return (
1022                        StatusCode::BAD_REQUEST,
1023                        Json(serde_json::json!({
1024                            "valid": false,
1025                            "error": format!("Failed to convert to string: {}", e),
1026                            "message": "Configuration validation failed"
1027                        })),
1028                    )
1029                        .into_response();
1030                }
1031            };
1032            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1033        }
1034        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1035    };
1036
1037    match config_result {
1038        Ok(_) => Json(serde_json::json!({
1039            "valid": true,
1040            "message": "Configuration is valid"
1041        }))
1042        .into_response(),
1043        Err(e) => (
1044            StatusCode::BAD_REQUEST,
1045            Json(serde_json::json!({
1046                "valid": false,
1047                "error": format!("Invalid configuration: {}", e),
1048                "message": "Configuration validation failed"
1049            })),
1050        )
1051            .into_response(),
1052    }
1053}
1054
1055/// Request for bulk configuration update
1056#[derive(Debug, Deserialize)]
1057pub struct BulkConfigUpdateRequest {
1058    /// Partial configuration updates (only specified fields will be updated)
1059    pub updates: serde_json::Value,
1060}
1061
1062/// Bulk update configuration
1063///
1064/// This endpoint allows updating multiple configuration options at once.
1065/// Only the specified fields in the updates object will be modified.
1066///
1067/// Configuration updates are applied to the server configuration if available
1068/// in ManagementState. Changes take effect immediately for supported settings.
1069async fn bulk_update_config(
1070    State(state): State<ManagementState>,
1071    Json(request): Json<BulkConfigUpdateRequest>,
1072) -> impl IntoResponse {
1073    // Validate the updates structure
1074    if !request.updates.is_object() {
1075        return (
1076            StatusCode::BAD_REQUEST,
1077            Json(serde_json::json!({
1078                "error": "Invalid request",
1079                "message": "Updates must be a JSON object"
1080            })),
1081        )
1082            .into_response();
1083    }
1084
1085    // Try to validate as partial ServerConfig
1086    use mockforge_core::config::ServerConfig;
1087
1088    // Create a minimal valid config and try to merge updates
1089    let base_config = ServerConfig::default();
1090    let base_json = match serde_json::to_value(&base_config) {
1091        Ok(v) => v,
1092        Err(e) => {
1093            return (
1094                StatusCode::INTERNAL_SERVER_ERROR,
1095                Json(serde_json::json!({
1096                    "error": "Internal error",
1097                    "message": format!("Failed to serialize base config: {}", e)
1098                })),
1099            )
1100                .into_response();
1101        }
1102    };
1103
1104    // Merge updates into base config (simplified merge)
1105    let mut merged = base_json.clone();
1106    if let (Some(merged_obj), Some(updates_obj)) =
1107        (merged.as_object_mut(), request.updates.as_object())
1108    {
1109        for (key, value) in updates_obj {
1110            merged_obj.insert(key.clone(), value.clone());
1111        }
1112    }
1113
1114    // Validate the merged config
1115    match serde_json::from_value::<ServerConfig>(merged) {
1116        Ok(validated_config) => {
1117            // Apply config if server_config is available in ManagementState
1118            if let Some(ref config_lock) = state.server_config {
1119                let mut config = config_lock.write().await;
1120                *config = validated_config;
1121                Json(serde_json::json!({
1122                    "success": true,
1123                    "message": "Bulk configuration update applied successfully",
1124                    "updates_received": request.updates,
1125                    "validated": true,
1126                    "applied": true
1127                }))
1128                .into_response()
1129            } else {
1130                Json(serde_json::json!({
1131                    "success": true,
1132                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1133                    "updates_received": request.updates,
1134                    "validated": true,
1135                    "applied": false
1136                }))
1137                .into_response()
1138            }
1139        }
1140        Err(e) => (
1141            StatusCode::BAD_REQUEST,
1142            Json(serde_json::json!({
1143                "error": "Invalid configuration",
1144                "message": format!("Configuration validation failed: {}", e),
1145                "validated": false
1146            })),
1147        )
1148            .into_response(),
1149    }
1150}
1151
1152/// Get server statistics
1153async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1154    let mocks = state.mocks.read().await;
1155    let request_count = *state.request_counter.read().await;
1156
1157    Json(ServerStats {
1158        uptime_seconds: state.start_time.elapsed().as_secs(),
1159        total_requests: request_count,
1160        active_mocks: mocks.len(),
1161        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1162        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1163    })
1164}
1165
1166/// Get server configuration
1167async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1168    Json(ServerConfig {
1169        version: env!("CARGO_PKG_VERSION").to_string(),
1170        port: state.port,
1171        has_openapi_spec: state.spec.is_some(),
1172        spec_path: state.spec_path.clone(),
1173    })
1174}
1175
1176/// Health check endpoint
1177async fn health_check() -> Json<serde_json::Value> {
1178    Json(serde_json::json!({
1179        "status": "healthy",
1180        "service": "mockforge-management",
1181        "timestamp": chrono::Utc::now().to_rfc3339()
1182    }))
1183}
1184
1185/// Export format for mock configurations
1186#[derive(Debug, Clone, Serialize, Deserialize)]
1187#[serde(rename_all = "lowercase")]
1188pub enum ExportFormat {
1189    /// JSON format
1190    Json,
1191    /// YAML format
1192    Yaml,
1193}
1194
1195/// Export mocks in specified format
1196async fn export_mocks(
1197    State(state): State<ManagementState>,
1198    Query(params): Query<std::collections::HashMap<String, String>>,
1199) -> Result<(StatusCode, String), StatusCode> {
1200    let mocks = state.mocks.read().await;
1201
1202    let format = params
1203        .get("format")
1204        .map(|f| match f.as_str() {
1205            "yaml" | "yml" => ExportFormat::Yaml,
1206            _ => ExportFormat::Json,
1207        })
1208        .unwrap_or(ExportFormat::Json);
1209
1210    match format {
1211        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1212            .map(|json| (StatusCode::OK, json))
1213            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1214        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1215            .map(|yaml| (StatusCode::OK, yaml))
1216            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1217    }
1218}
1219
1220/// Import mocks from JSON/YAML
1221async fn import_mocks(
1222    State(state): State<ManagementState>,
1223    Json(mocks): Json<Vec<MockConfig>>,
1224) -> impl IntoResponse {
1225    let mut current_mocks = state.mocks.write().await;
1226    current_mocks.clear();
1227    current_mocks.extend(mocks);
1228    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1229}
1230
1231#[cfg(feature = "smtp")]
1232/// List SMTP emails in mailbox
1233async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1234    if let Some(ref smtp_registry) = state.smtp_registry {
1235        match smtp_registry.get_emails() {
1236            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1237            Err(e) => (
1238                StatusCode::INTERNAL_SERVER_ERROR,
1239                Json(serde_json::json!({
1240                    "error": "Failed to retrieve emails",
1241                    "message": e.to_string()
1242                })),
1243            ),
1244        }
1245    } else {
1246        (
1247            StatusCode::NOT_IMPLEMENTED,
1248            Json(serde_json::json!({
1249                "error": "SMTP mailbox management not available",
1250                "message": "SMTP server is not enabled or registry not available."
1251            })),
1252        )
1253    }
1254}
1255
1256/// Get specific SMTP email
1257#[cfg(feature = "smtp")]
1258async fn get_smtp_email(
1259    State(state): State<ManagementState>,
1260    Path(id): Path<String>,
1261) -> impl IntoResponse {
1262    if let Some(ref smtp_registry) = state.smtp_registry {
1263        match smtp_registry.get_email_by_id(&id) {
1264            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1265            Ok(None) => (
1266                StatusCode::NOT_FOUND,
1267                Json(serde_json::json!({
1268                    "error": "Email not found",
1269                    "id": id
1270                })),
1271            ),
1272            Err(e) => (
1273                StatusCode::INTERNAL_SERVER_ERROR,
1274                Json(serde_json::json!({
1275                    "error": "Failed to retrieve email",
1276                    "message": e.to_string()
1277                })),
1278            ),
1279        }
1280    } else {
1281        (
1282            StatusCode::NOT_IMPLEMENTED,
1283            Json(serde_json::json!({
1284                "error": "SMTP mailbox management not available",
1285                "message": "SMTP server is not enabled or registry not available."
1286            })),
1287        )
1288    }
1289}
1290
1291/// Clear SMTP mailbox
1292#[cfg(feature = "smtp")]
1293async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1294    if let Some(ref smtp_registry) = state.smtp_registry {
1295        match smtp_registry.clear_mailbox() {
1296            Ok(()) => (
1297                StatusCode::OK,
1298                Json(serde_json::json!({
1299                    "message": "Mailbox cleared successfully"
1300                })),
1301            ),
1302            Err(e) => (
1303                StatusCode::INTERNAL_SERVER_ERROR,
1304                Json(serde_json::json!({
1305                    "error": "Failed to clear mailbox",
1306                    "message": e.to_string()
1307                })),
1308            ),
1309        }
1310    } else {
1311        (
1312            StatusCode::NOT_IMPLEMENTED,
1313            Json(serde_json::json!({
1314                "error": "SMTP mailbox management not available",
1315                "message": "SMTP server is not enabled or registry not available."
1316            })),
1317        )
1318    }
1319}
1320
1321/// Export SMTP mailbox
1322#[cfg(feature = "smtp")]
1323async fn export_smtp_mailbox(
1324    Query(params): Query<std::collections::HashMap<String, String>>,
1325) -> impl IntoResponse {
1326    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1327    (
1328        StatusCode::NOT_IMPLEMENTED,
1329        Json(serde_json::json!({
1330            "error": "SMTP mailbox management not available via HTTP API",
1331            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1332            "requested_format": format
1333        })),
1334    )
1335}
1336
1337/// Search SMTP emails
1338#[cfg(feature = "smtp")]
1339async fn search_smtp_emails(
1340    State(state): State<ManagementState>,
1341    Query(params): Query<std::collections::HashMap<String, String>>,
1342) -> impl IntoResponse {
1343    if let Some(ref smtp_registry) = state.smtp_registry {
1344        let filters = EmailSearchFilters {
1345            sender: params.get("sender").cloned(),
1346            recipient: params.get("recipient").cloned(),
1347            subject: params.get("subject").cloned(),
1348            body: params.get("body").cloned(),
1349            since: params
1350                .get("since")
1351                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1352                .map(|dt| dt.with_timezone(&chrono::Utc)),
1353            until: params
1354                .get("until")
1355                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1356                .map(|dt| dt.with_timezone(&chrono::Utc)),
1357            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1358            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1359        };
1360
1361        match smtp_registry.search_emails(filters) {
1362            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1363            Err(e) => (
1364                StatusCode::INTERNAL_SERVER_ERROR,
1365                Json(serde_json::json!({
1366                    "error": "Failed to search emails",
1367                    "message": e.to_string()
1368                })),
1369            ),
1370        }
1371    } else {
1372        (
1373            StatusCode::NOT_IMPLEMENTED,
1374            Json(serde_json::json!({
1375                "error": "SMTP mailbox management not available",
1376                "message": "SMTP server is not enabled or registry not available."
1377            })),
1378        )
1379    }
1380}
1381
1382/// MQTT broker statistics
1383#[cfg(feature = "mqtt")]
1384#[derive(Debug, Clone, Serialize, Deserialize)]
1385pub struct MqttBrokerStats {
1386    /// Number of connected MQTT clients
1387    pub connected_clients: usize,
1388    /// Number of active MQTT topics
1389    pub active_topics: usize,
1390    /// Number of retained messages
1391    pub retained_messages: usize,
1392    /// Total number of subscriptions
1393    pub total_subscriptions: usize,
1394}
1395
1396/// MQTT management handlers
1397#[cfg(feature = "mqtt")]
1398async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1399    if let Some(broker) = &state.mqtt_broker {
1400        let connected_clients = broker.get_connected_clients().await.len();
1401        let active_topics = broker.get_active_topics().await.len();
1402        let stats = broker.get_topic_stats().await;
1403
1404        let broker_stats = MqttBrokerStats {
1405            connected_clients,
1406            active_topics,
1407            retained_messages: stats.retained_messages,
1408            total_subscriptions: stats.total_subscriptions,
1409        };
1410
1411        Json(broker_stats).into_response()
1412    } else {
1413        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1414    }
1415}
1416
1417#[cfg(feature = "mqtt")]
1418async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1419    if let Some(broker) = &state.mqtt_broker {
1420        let clients = broker.get_connected_clients().await;
1421        Json(serde_json::json!({
1422            "clients": clients
1423        }))
1424        .into_response()
1425    } else {
1426        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1427    }
1428}
1429
1430#[cfg(feature = "mqtt")]
1431async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1432    if let Some(broker) = &state.mqtt_broker {
1433        let topics = broker.get_active_topics().await;
1434        Json(serde_json::json!({
1435            "topics": topics
1436        }))
1437        .into_response()
1438    } else {
1439        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1440    }
1441}
1442
1443#[cfg(feature = "mqtt")]
1444async fn disconnect_mqtt_client(
1445    State(state): State<ManagementState>,
1446    Path(client_id): Path<String>,
1447) -> impl IntoResponse {
1448    if let Some(broker) = &state.mqtt_broker {
1449        match broker.disconnect_client(&client_id).await {
1450            Ok(_) => {
1451                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1452            }
1453            Err(e) => {
1454                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1455                    .into_response()
1456            }
1457        }
1458    } else {
1459        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1460    }
1461}
1462
1463// ========== MQTT Publish Handler ==========
1464
1465#[cfg(feature = "mqtt")]
1466/// Request to publish a single MQTT message
1467#[derive(Debug, Deserialize)]
1468pub struct MqttPublishRequest {
1469    /// Topic to publish to
1470    pub topic: String,
1471    /// Message payload (string or JSON)
1472    pub payload: String,
1473    /// QoS level (0, 1, or 2)
1474    #[serde(default = "default_qos")]
1475    pub qos: u8,
1476    /// Whether to retain the message
1477    #[serde(default)]
1478    pub retain: bool,
1479}
1480
1481#[cfg(feature = "mqtt")]
1482fn default_qos() -> u8 {
1483    0
1484}
1485
1486#[cfg(feature = "mqtt")]
1487/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1488async fn publish_mqtt_message_handler(
1489    State(state): State<ManagementState>,
1490    Json(request): Json<serde_json::Value>,
1491) -> impl IntoResponse {
1492    // Extract fields from JSON manually
1493    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1494    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1495    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1496    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1497
1498    if topic.is_none() || payload.is_none() {
1499        return (
1500            StatusCode::BAD_REQUEST,
1501            Json(serde_json::json!({
1502                "error": "Invalid request",
1503                "message": "Missing required fields: topic and payload"
1504            })),
1505        );
1506    }
1507
1508    let topic = topic.unwrap();
1509    let payload = payload.unwrap();
1510
1511    if let Some(broker) = &state.mqtt_broker {
1512        // Validate QoS
1513        if qos > 2 {
1514            return (
1515                StatusCode::BAD_REQUEST,
1516                Json(serde_json::json!({
1517                    "error": "Invalid QoS",
1518                    "message": "QoS must be 0, 1, or 2"
1519                })),
1520            );
1521        }
1522
1523        // Convert payload to bytes
1524        let payload_bytes = payload.as_bytes().to_vec();
1525        let client_id = "mockforge-management-api".to_string();
1526
1527        let publish_result = broker
1528            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1529            .await
1530            .map_err(|e| format!("{}", e));
1531
1532        match publish_result {
1533            Ok(_) => {
1534                // Emit message event for real-time monitoring
1535                let event = MessageEvent::Mqtt(MqttMessageEvent {
1536                    topic: topic.clone(),
1537                    payload: payload.clone(),
1538                    qos,
1539                    retain,
1540                    timestamp: chrono::Utc::now().to_rfc3339(),
1541                });
1542                let _ = state.message_events.send(event);
1543
1544                (
1545                    StatusCode::OK,
1546                    Json(serde_json::json!({
1547                        "success": true,
1548                        "message": format!("Message published to topic '{}'", topic),
1549                        "topic": topic,
1550                        "qos": qos,
1551                        "retain": retain
1552                    })),
1553                )
1554            }
1555            Err(error_msg) => (
1556                StatusCode::INTERNAL_SERVER_ERROR,
1557                Json(serde_json::json!({
1558                    "error": "Failed to publish message",
1559                    "message": error_msg
1560                })),
1561            ),
1562        }
1563    } else {
1564        (
1565            StatusCode::SERVICE_UNAVAILABLE,
1566            Json(serde_json::json!({
1567                "error": "MQTT broker not available",
1568                "message": "MQTT broker is not enabled or not available."
1569            })),
1570        )
1571    }
1572}
1573
1574#[cfg(not(feature = "mqtt"))]
1575/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1576async fn publish_mqtt_message_handler(
1577    State(_state): State<ManagementState>,
1578    Json(_request): Json<serde_json::Value>,
1579) -> impl IntoResponse {
1580    (
1581        StatusCode::SERVICE_UNAVAILABLE,
1582        Json(serde_json::json!({
1583            "error": "MQTT feature not enabled",
1584            "message": "MQTT support is not compiled into this build"
1585        })),
1586    )
1587}
1588
1589#[cfg(feature = "mqtt")]
1590/// Request to publish multiple MQTT messages
1591#[derive(Debug, Deserialize)]
1592pub struct MqttBatchPublishRequest {
1593    /// List of messages to publish
1594    pub messages: Vec<MqttPublishRequest>,
1595    /// Delay between messages in milliseconds
1596    #[serde(default = "default_delay")]
1597    pub delay_ms: u64,
1598}
1599
1600#[cfg(feature = "mqtt")]
1601fn default_delay() -> u64 {
1602    100
1603}
1604
1605#[cfg(feature = "mqtt")]
1606/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1607async fn publish_mqtt_batch_handler(
1608    State(state): State<ManagementState>,
1609    Json(request): Json<serde_json::Value>,
1610) -> impl IntoResponse {
1611    // Extract fields from JSON manually
1612    let messages_json = request.get("messages").and_then(|v| v.as_array());
1613    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1614
1615    if messages_json.is_none() {
1616        return (
1617            StatusCode::BAD_REQUEST,
1618            Json(serde_json::json!({
1619                "error": "Invalid request",
1620                "message": "Missing required field: messages"
1621            })),
1622        );
1623    }
1624
1625    let messages_json = messages_json.unwrap();
1626
1627    if let Some(broker) = &state.mqtt_broker {
1628        if messages_json.is_empty() {
1629            return (
1630                StatusCode::BAD_REQUEST,
1631                Json(serde_json::json!({
1632                    "error": "Empty batch",
1633                    "message": "At least one message is required"
1634                })),
1635            );
1636        }
1637
1638        let mut results = Vec::new();
1639        let client_id = "mockforge-management-api".to_string();
1640
1641        for (index, msg_json) in messages_json.iter().enumerate() {
1642            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1643            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1644            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1645            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1646
1647            if topic.is_none() || payload.is_none() {
1648                results.push(serde_json::json!({
1649                    "index": index,
1650                    "success": false,
1651                    "error": "Missing required fields: topic and payload"
1652                }));
1653                continue;
1654            }
1655
1656            let topic = topic.unwrap();
1657            let payload = payload.unwrap();
1658
1659            // Validate QoS
1660            if qos > 2 {
1661                results.push(serde_json::json!({
1662                    "index": index,
1663                    "success": false,
1664                    "error": "Invalid QoS (must be 0, 1, or 2)"
1665                }));
1666                continue;
1667            }
1668
1669            // Convert payload to bytes
1670            let payload_bytes = payload.as_bytes().to_vec();
1671
1672            let publish_result = broker
1673                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1674                .await
1675                .map_err(|e| format!("{}", e));
1676
1677            match publish_result {
1678                Ok(_) => {
1679                    // Emit message event
1680                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1681                        topic: topic.clone(),
1682                        payload: payload.clone(),
1683                        qos,
1684                        retain,
1685                        timestamp: chrono::Utc::now().to_rfc3339(),
1686                    });
1687                    let _ = state.message_events.send(event);
1688
1689                    results.push(serde_json::json!({
1690                        "index": index,
1691                        "success": true,
1692                        "topic": topic,
1693                        "qos": qos
1694                    }));
1695                }
1696                Err(error_msg) => {
1697                    results.push(serde_json::json!({
1698                        "index": index,
1699                        "success": false,
1700                        "error": error_msg
1701                    }));
1702                }
1703            }
1704
1705            // Add delay between messages (except for the last one)
1706            if index < messages_json.len() - 1 && delay_ms > 0 {
1707                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1708            }
1709        }
1710
1711        let success_count =
1712            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1713
1714        (
1715            StatusCode::OK,
1716            Json(serde_json::json!({
1717                "success": true,
1718                "total": messages_json.len(),
1719                "succeeded": success_count,
1720                "failed": messages_json.len() - success_count,
1721                "results": results
1722            })),
1723        )
1724    } else {
1725        (
1726            StatusCode::SERVICE_UNAVAILABLE,
1727            Json(serde_json::json!({
1728                "error": "MQTT broker not available",
1729                "message": "MQTT broker is not enabled or not available."
1730            })),
1731        )
1732    }
1733}
1734
1735#[cfg(not(feature = "mqtt"))]
1736/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1737async fn publish_mqtt_batch_handler(
1738    State(_state): State<ManagementState>,
1739    Json(_request): Json<serde_json::Value>,
1740) -> impl IntoResponse {
1741    (
1742        StatusCode::SERVICE_UNAVAILABLE,
1743        Json(serde_json::json!({
1744            "error": "MQTT feature not enabled",
1745            "message": "MQTT support is not compiled into this build"
1746        })),
1747    )
1748}
1749
1750// Migration pipeline handlers
1751
1752/// Request to set migration mode
1753#[derive(Debug, Deserialize)]
1754struct SetMigrationModeRequest {
1755    mode: String,
1756}
1757
1758/// Get all migration routes
1759async fn get_migration_routes(
1760    State(state): State<ManagementState>,
1761) -> Result<Json<serde_json::Value>, StatusCode> {
1762    let proxy_config = match &state.proxy_config {
1763        Some(config) => config,
1764        None => {
1765            return Ok(Json(serde_json::json!({
1766                "error": "Migration not configured. Proxy config not available."
1767            })));
1768        }
1769    };
1770
1771    let config = proxy_config.read().await;
1772    let routes = config.get_migration_routes();
1773
1774    Ok(Json(serde_json::json!({
1775        "routes": routes
1776    })))
1777}
1778
1779/// Toggle a route's migration mode
1780async fn toggle_route_migration(
1781    State(state): State<ManagementState>,
1782    Path(pattern): Path<String>,
1783) -> Result<Json<serde_json::Value>, StatusCode> {
1784    let proxy_config = match &state.proxy_config {
1785        Some(config) => config,
1786        None => {
1787            return Ok(Json(serde_json::json!({
1788                "error": "Migration not configured. Proxy config not available."
1789            })));
1790        }
1791    };
1792
1793    let mut config = proxy_config.write().await;
1794    let new_mode = match config.toggle_route_migration(&pattern) {
1795        Some(mode) => mode,
1796        None => {
1797            return Ok(Json(serde_json::json!({
1798                "error": format!("Route pattern not found: {}", pattern)
1799            })));
1800        }
1801    };
1802
1803    Ok(Json(serde_json::json!({
1804        "pattern": pattern,
1805        "mode": format!("{:?}", new_mode).to_lowercase()
1806    })))
1807}
1808
1809/// Set a route's migration mode explicitly
1810async fn set_route_migration_mode(
1811    State(state): State<ManagementState>,
1812    Path(pattern): Path<String>,
1813    Json(request): Json<SetMigrationModeRequest>,
1814) -> Result<Json<serde_json::Value>, StatusCode> {
1815    let proxy_config = match &state.proxy_config {
1816        Some(config) => config,
1817        None => {
1818            return Ok(Json(serde_json::json!({
1819                "error": "Migration not configured. Proxy config not available."
1820            })));
1821        }
1822    };
1823
1824    use mockforge_core::proxy::config::MigrationMode;
1825    let mode = match request.mode.to_lowercase().as_str() {
1826        "mock" => MigrationMode::Mock,
1827        "shadow" => MigrationMode::Shadow,
1828        "real" => MigrationMode::Real,
1829        "auto" => MigrationMode::Auto,
1830        _ => {
1831            return Ok(Json(serde_json::json!({
1832                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1833            })));
1834        }
1835    };
1836
1837    let mut config = proxy_config.write().await;
1838    let updated = config.update_rule_migration_mode(&pattern, mode);
1839
1840    if !updated {
1841        return Ok(Json(serde_json::json!({
1842            "error": format!("Route pattern not found: {}", pattern)
1843        })));
1844    }
1845
1846    Ok(Json(serde_json::json!({
1847        "pattern": pattern,
1848        "mode": format!("{:?}", mode).to_lowercase()
1849    })))
1850}
1851
1852/// Toggle a group's migration mode
1853async fn toggle_group_migration(
1854    State(state): State<ManagementState>,
1855    Path(group): Path<String>,
1856) -> Result<Json<serde_json::Value>, StatusCode> {
1857    let proxy_config = match &state.proxy_config {
1858        Some(config) => config,
1859        None => {
1860            return Ok(Json(serde_json::json!({
1861                "error": "Migration not configured. Proxy config not available."
1862            })));
1863        }
1864    };
1865
1866    let mut config = proxy_config.write().await;
1867    let new_mode = config.toggle_group_migration(&group);
1868
1869    Ok(Json(serde_json::json!({
1870        "group": group,
1871        "mode": format!("{:?}", new_mode).to_lowercase()
1872    })))
1873}
1874
1875/// Set a group's migration mode explicitly
1876async fn set_group_migration_mode(
1877    State(state): State<ManagementState>,
1878    Path(group): Path<String>,
1879    Json(request): Json<SetMigrationModeRequest>,
1880) -> Result<Json<serde_json::Value>, StatusCode> {
1881    let proxy_config = match &state.proxy_config {
1882        Some(config) => config,
1883        None => {
1884            return Ok(Json(serde_json::json!({
1885                "error": "Migration not configured. Proxy config not available."
1886            })));
1887        }
1888    };
1889
1890    use mockforge_core::proxy::config::MigrationMode;
1891    let mode = match request.mode.to_lowercase().as_str() {
1892        "mock" => MigrationMode::Mock,
1893        "shadow" => MigrationMode::Shadow,
1894        "real" => MigrationMode::Real,
1895        "auto" => MigrationMode::Auto,
1896        _ => {
1897            return Ok(Json(serde_json::json!({
1898                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1899            })));
1900        }
1901    };
1902
1903    let mut config = proxy_config.write().await;
1904    config.update_group_migration_mode(&group, mode);
1905
1906    Ok(Json(serde_json::json!({
1907        "group": group,
1908        "mode": format!("{:?}", mode).to_lowercase()
1909    })))
1910}
1911
1912/// Get all migration groups
1913async fn get_migration_groups(
1914    State(state): State<ManagementState>,
1915) -> Result<Json<serde_json::Value>, StatusCode> {
1916    let proxy_config = match &state.proxy_config {
1917        Some(config) => config,
1918        None => {
1919            return Ok(Json(serde_json::json!({
1920                "error": "Migration not configured. Proxy config not available."
1921            })));
1922        }
1923    };
1924
1925    let config = proxy_config.read().await;
1926    let groups = config.get_migration_groups();
1927
1928    // Convert to JSON-serializable format
1929    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1930        .into_iter()
1931        .map(|(name, info)| {
1932            (
1933                name,
1934                serde_json::json!({
1935                    "name": info.name,
1936                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1937                    "route_count": info.route_count
1938                }),
1939            )
1940        })
1941        .collect();
1942
1943    Ok(Json(serde_json::json!(groups_json)))
1944}
1945
1946/// Get overall migration status
1947async fn get_migration_status(
1948    State(state): State<ManagementState>,
1949) -> Result<Json<serde_json::Value>, StatusCode> {
1950    let proxy_config = match &state.proxy_config {
1951        Some(config) => config,
1952        None => {
1953            return Ok(Json(serde_json::json!({
1954                "error": "Migration not configured. Proxy config not available."
1955            })));
1956        }
1957    };
1958
1959    let config = proxy_config.read().await;
1960    let routes = config.get_migration_routes();
1961    let groups = config.get_migration_groups();
1962
1963    let mut mock_count = 0;
1964    let mut shadow_count = 0;
1965    let mut real_count = 0;
1966    let mut auto_count = 0;
1967
1968    for route in &routes {
1969        match route.migration_mode {
1970            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1971            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1972            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1973            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1974        }
1975    }
1976
1977    Ok(Json(serde_json::json!({
1978        "total_routes": routes.len(),
1979        "mock_routes": mock_count,
1980        "shadow_routes": shadow_count,
1981        "real_routes": real_count,
1982        "auto_routes": auto_count,
1983        "total_groups": groups.len(),
1984        "migration_enabled": config.migration_enabled
1985    })))
1986}
1987
1988// ========== Proxy Replacement Rules Management ==========
1989
1990/// Request body for creating/updating proxy replacement rules
1991#[derive(Debug, Deserialize, Serialize)]
1992pub struct ProxyRuleRequest {
1993    /// URL pattern to match (supports wildcards like "/api/users/*")
1994    pub pattern: String,
1995    /// Rule type: "request" or "response"
1996    #[serde(rename = "type")]
1997    pub rule_type: String,
1998    /// Optional status code filter for response rules
1999    #[serde(default)]
2000    pub status_codes: Vec<u16>,
2001    /// Body transformations to apply
2002    pub body_transforms: Vec<BodyTransformRequest>,
2003    /// Whether this rule is enabled
2004    #[serde(default = "default_true")]
2005    pub enabled: bool,
2006}
2007
2008/// Request body for individual body transformations
2009#[derive(Debug, Deserialize, Serialize)]
2010pub struct BodyTransformRequest {
2011    /// JSONPath expression to target (e.g., "$.userId", "$.email")
2012    pub path: String,
2013    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
2014    pub replace: String,
2015    /// Operation to perform: "replace", "add", or "remove"
2016    #[serde(default)]
2017    pub operation: String,
2018}
2019
2020/// Response format for proxy rules
2021#[derive(Debug, Serialize)]
2022pub struct ProxyRuleResponse {
2023    /// Rule ID (index in the array)
2024    pub id: usize,
2025    /// URL pattern
2026    pub pattern: String,
2027    /// Rule type
2028    #[serde(rename = "type")]
2029    pub rule_type: String,
2030    /// Status codes (for response rules)
2031    pub status_codes: Vec<u16>,
2032    /// Body transformations
2033    pub body_transforms: Vec<BodyTransformRequest>,
2034    /// Whether enabled
2035    pub enabled: bool,
2036}
2037
2038/// List all proxy replacement rules
2039async fn list_proxy_rules(
2040    State(state): State<ManagementState>,
2041) -> Result<Json<serde_json::Value>, StatusCode> {
2042    let proxy_config = match &state.proxy_config {
2043        Some(config) => config,
2044        None => {
2045            return Ok(Json(serde_json::json!({
2046                "error": "Proxy not configured. Proxy config not available."
2047            })));
2048        }
2049    };
2050
2051    let config = proxy_config.read().await;
2052
2053    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2054
2055    // Add request replacement rules
2056    for (idx, rule) in config.request_replacements.iter().enumerate() {
2057        rules.push(ProxyRuleResponse {
2058            id: idx,
2059            pattern: rule.pattern.clone(),
2060            rule_type: "request".to_string(),
2061            status_codes: Vec::new(),
2062            body_transforms: rule
2063                .body_transforms
2064                .iter()
2065                .map(|t| BodyTransformRequest {
2066                    path: t.path.clone(),
2067                    replace: t.replace.clone(),
2068                    operation: format!("{:?}", t.operation).to_lowercase(),
2069                })
2070                .collect(),
2071            enabled: rule.enabled,
2072        });
2073    }
2074
2075    // Add response replacement rules
2076    let request_count = config.request_replacements.len();
2077    for (idx, rule) in config.response_replacements.iter().enumerate() {
2078        rules.push(ProxyRuleResponse {
2079            id: request_count + idx,
2080            pattern: rule.pattern.clone(),
2081            rule_type: "response".to_string(),
2082            status_codes: rule.status_codes.clone(),
2083            body_transforms: rule
2084                .body_transforms
2085                .iter()
2086                .map(|t| BodyTransformRequest {
2087                    path: t.path.clone(),
2088                    replace: t.replace.clone(),
2089                    operation: format!("{:?}", t.operation).to_lowercase(),
2090                })
2091                .collect(),
2092            enabled: rule.enabled,
2093        });
2094    }
2095
2096    Ok(Json(serde_json::json!({
2097        "rules": rules
2098    })))
2099}
2100
2101/// Create a new proxy replacement rule
2102async fn create_proxy_rule(
2103    State(state): State<ManagementState>,
2104    Json(request): Json<ProxyRuleRequest>,
2105) -> Result<Json<serde_json::Value>, StatusCode> {
2106    let proxy_config = match &state.proxy_config {
2107        Some(config) => config,
2108        None => {
2109            return Ok(Json(serde_json::json!({
2110                "error": "Proxy not configured. Proxy config not available."
2111            })));
2112        }
2113    };
2114
2115    // Validate request
2116    if request.body_transforms.is_empty() {
2117        return Ok(Json(serde_json::json!({
2118            "error": "At least one body transform is required"
2119        })));
2120    }
2121
2122    let body_transforms: Vec<BodyTransform> = request
2123        .body_transforms
2124        .iter()
2125        .map(|t| {
2126            let op = match t.operation.as_str() {
2127                "replace" => TransformOperation::Replace,
2128                "add" => TransformOperation::Add,
2129                "remove" => TransformOperation::Remove,
2130                _ => TransformOperation::Replace,
2131            };
2132            BodyTransform {
2133                path: t.path.clone(),
2134                replace: t.replace.clone(),
2135                operation: op,
2136            }
2137        })
2138        .collect();
2139
2140    let new_rule = BodyTransformRule {
2141        pattern: request.pattern.clone(),
2142        status_codes: request.status_codes.clone(),
2143        body_transforms,
2144        enabled: request.enabled,
2145    };
2146
2147    let mut config = proxy_config.write().await;
2148
2149    let rule_id = if request.rule_type == "request" {
2150        config.request_replacements.push(new_rule);
2151        config.request_replacements.len() - 1
2152    } else if request.rule_type == "response" {
2153        config.response_replacements.push(new_rule);
2154        config.request_replacements.len() + config.response_replacements.len() - 1
2155    } else {
2156        return Ok(Json(serde_json::json!({
2157            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2158        })));
2159    };
2160
2161    Ok(Json(serde_json::json!({
2162        "id": rule_id,
2163        "message": "Rule created successfully"
2164    })))
2165}
2166
2167/// Get a specific proxy replacement rule
2168async fn get_proxy_rule(
2169    State(state): State<ManagementState>,
2170    Path(id): Path<String>,
2171) -> Result<Json<serde_json::Value>, StatusCode> {
2172    let proxy_config = match &state.proxy_config {
2173        Some(config) => config,
2174        None => {
2175            return Ok(Json(serde_json::json!({
2176                "error": "Proxy not configured. Proxy config not available."
2177            })));
2178        }
2179    };
2180
2181    let config = proxy_config.read().await;
2182    let rule_id: usize = match id.parse() {
2183        Ok(id) => id,
2184        Err(_) => {
2185            return Ok(Json(serde_json::json!({
2186                "error": format!("Invalid rule ID: {}", id)
2187            })));
2188        }
2189    };
2190
2191    let request_count = config.request_replacements.len();
2192
2193    if rule_id < request_count {
2194        // Request rule
2195        let rule = &config.request_replacements[rule_id];
2196        Ok(Json(serde_json::json!({
2197            "id": rule_id,
2198            "pattern": rule.pattern,
2199            "type": "request",
2200            "status_codes": [],
2201            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2202                "path": t.path,
2203                "replace": t.replace,
2204                "operation": format!("{:?}", t.operation).to_lowercase()
2205            })).collect::<Vec<_>>(),
2206            "enabled": rule.enabled
2207        })))
2208    } else if rule_id < request_count + config.response_replacements.len() {
2209        // Response rule
2210        let response_idx = rule_id - request_count;
2211        let rule = &config.response_replacements[response_idx];
2212        Ok(Json(serde_json::json!({
2213            "id": rule_id,
2214            "pattern": rule.pattern,
2215            "type": "response",
2216            "status_codes": rule.status_codes,
2217            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2218                "path": t.path,
2219                "replace": t.replace,
2220                "operation": format!("{:?}", t.operation).to_lowercase()
2221            })).collect::<Vec<_>>(),
2222            "enabled": rule.enabled
2223        })))
2224    } else {
2225        Ok(Json(serde_json::json!({
2226            "error": format!("Rule ID {} not found", rule_id)
2227        })))
2228    }
2229}
2230
2231/// Update a proxy replacement rule
2232async fn update_proxy_rule(
2233    State(state): State<ManagementState>,
2234    Path(id): Path<String>,
2235    Json(request): Json<ProxyRuleRequest>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237    let proxy_config = match &state.proxy_config {
2238        Some(config) => config,
2239        None => {
2240            return Ok(Json(serde_json::json!({
2241                "error": "Proxy not configured. Proxy config not available."
2242            })));
2243        }
2244    };
2245
2246    let mut config = proxy_config.write().await;
2247    let rule_id: usize = match id.parse() {
2248        Ok(id) => id,
2249        Err(_) => {
2250            return Ok(Json(serde_json::json!({
2251                "error": format!("Invalid rule ID: {}", id)
2252            })));
2253        }
2254    };
2255
2256    let body_transforms: Vec<BodyTransform> = request
2257        .body_transforms
2258        .iter()
2259        .map(|t| {
2260            let op = match t.operation.as_str() {
2261                "replace" => TransformOperation::Replace,
2262                "add" => TransformOperation::Add,
2263                "remove" => TransformOperation::Remove,
2264                _ => TransformOperation::Replace,
2265            };
2266            BodyTransform {
2267                path: t.path.clone(),
2268                replace: t.replace.clone(),
2269                operation: op,
2270            }
2271        })
2272        .collect();
2273
2274    let updated_rule = BodyTransformRule {
2275        pattern: request.pattern.clone(),
2276        status_codes: request.status_codes.clone(),
2277        body_transforms,
2278        enabled: request.enabled,
2279    };
2280
2281    let request_count = config.request_replacements.len();
2282
2283    if rule_id < request_count {
2284        // Update request rule
2285        config.request_replacements[rule_id] = updated_rule;
2286    } else if rule_id < request_count + config.response_replacements.len() {
2287        // Update response rule
2288        let response_idx = rule_id - request_count;
2289        config.response_replacements[response_idx] = updated_rule;
2290    } else {
2291        return Ok(Json(serde_json::json!({
2292            "error": format!("Rule ID {} not found", rule_id)
2293        })));
2294    }
2295
2296    Ok(Json(serde_json::json!({
2297        "id": rule_id,
2298        "message": "Rule updated successfully"
2299    })))
2300}
2301
2302/// Delete a proxy replacement rule
2303async fn delete_proxy_rule(
2304    State(state): State<ManagementState>,
2305    Path(id): Path<String>,
2306) -> Result<Json<serde_json::Value>, StatusCode> {
2307    let proxy_config = match &state.proxy_config {
2308        Some(config) => config,
2309        None => {
2310            return Ok(Json(serde_json::json!({
2311                "error": "Proxy not configured. Proxy config not available."
2312            })));
2313        }
2314    };
2315
2316    let mut config = proxy_config.write().await;
2317    let rule_id: usize = match id.parse() {
2318        Ok(id) => id,
2319        Err(_) => {
2320            return Ok(Json(serde_json::json!({
2321                "error": format!("Invalid rule ID: {}", id)
2322            })));
2323        }
2324    };
2325
2326    let request_count = config.request_replacements.len();
2327
2328    if rule_id < request_count {
2329        // Delete request rule
2330        config.request_replacements.remove(rule_id);
2331    } else if rule_id < request_count + config.response_replacements.len() {
2332        // Delete response rule
2333        let response_idx = rule_id - request_count;
2334        config.response_replacements.remove(response_idx);
2335    } else {
2336        return Ok(Json(serde_json::json!({
2337            "error": format!("Rule ID {} not found", rule_id)
2338        })));
2339    }
2340
2341    Ok(Json(serde_json::json!({
2342        "id": rule_id,
2343        "message": "Rule deleted successfully"
2344    })))
2345}
2346
2347/// Get proxy rules and transformation configuration for inspection
2348async fn get_proxy_inspect(
2349    State(state): State<ManagementState>,
2350    Query(params): Query<std::collections::HashMap<String, String>>,
2351) -> Result<Json<serde_json::Value>, StatusCode> {
2352    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2353    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2354
2355    let proxy_config = match &state.proxy_config {
2356        Some(config) => config.read().await,
2357        None => {
2358            return Ok(Json(serde_json::json!({
2359                "error": "Proxy not configured. Proxy config not available."
2360            })));
2361        }
2362    };
2363
2364    let mut rules = Vec::new();
2365    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2366        rules.push(serde_json::json!({
2367            "id": idx,
2368            "kind": "request",
2369            "pattern": rule.pattern,
2370            "enabled": rule.enabled,
2371            "status_codes": rule.status_codes,
2372            "transform_count": rule.body_transforms.len(),
2373            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2374                "path": t.path,
2375                "operation": t.operation,
2376                "replace": t.replace
2377            })).collect::<Vec<_>>()
2378        }));
2379    }
2380    let request_rule_count = rules.len();
2381    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2382        rules.push(serde_json::json!({
2383            "id": request_rule_count + idx,
2384            "kind": "response",
2385            "pattern": rule.pattern,
2386            "enabled": rule.enabled,
2387            "status_codes": rule.status_codes,
2388            "transform_count": rule.body_transforms.len(),
2389            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2390                "path": t.path,
2391                "operation": t.operation,
2392                "replace": t.replace
2393            })).collect::<Vec<_>>()
2394        }));
2395    }
2396
2397    let total = rules.len();
2398    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2399
2400    Ok(Json(serde_json::json!({
2401        "enabled": proxy_config.enabled,
2402        "target_url": proxy_config.target_url,
2403        "prefix": proxy_config.prefix,
2404        "timeout_seconds": proxy_config.timeout_seconds,
2405        "follow_redirects": proxy_config.follow_redirects,
2406        "passthrough_by_default": proxy_config.passthrough_by_default,
2407        "rules": paged_rules,
2408        "request_rule_count": request_rule_count,
2409        "response_rule_count": total.saturating_sub(request_rule_count),
2410        "limit": limit,
2411        "offset": offset,
2412        "total": total
2413    })))
2414}
2415
2416/// Build the management API router
2417pub fn management_router(state: ManagementState) -> Router {
2418    let router = Router::new()
2419        .route("/health", get(health_check))
2420        .route("/stats", get(get_stats))
2421        .route("/config", get(get_config))
2422        .route("/config/validate", post(validate_config))
2423        .route("/config/bulk", post(bulk_update_config))
2424        .route("/mocks", get(list_mocks))
2425        .route("/mocks", post(create_mock))
2426        .route("/mocks/{id}", get(get_mock))
2427        .route("/mocks/{id}", put(update_mock))
2428        .route("/mocks/{id}", delete(delete_mock))
2429        .route("/export", get(export_mocks))
2430        .route("/import", post(import_mocks));
2431
2432    #[cfg(feature = "smtp")]
2433    let router = router
2434        .route("/smtp/mailbox", get(list_smtp_emails))
2435        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2436        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2437        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2438        .route("/smtp/mailbox/search", get(search_smtp_emails));
2439
2440    #[cfg(not(feature = "smtp"))]
2441    let router = router;
2442
2443    // MQTT routes
2444    #[cfg(feature = "mqtt")]
2445    let router = router
2446        .route("/mqtt/stats", get(get_mqtt_stats))
2447        .route("/mqtt/clients", get(get_mqtt_clients))
2448        .route("/mqtt/topics", get(get_mqtt_topics))
2449        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2450        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2451        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2452        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2453
2454    #[cfg(not(feature = "mqtt"))]
2455    let router = router
2456        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2457        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2458
2459    #[cfg(feature = "kafka")]
2460    let router = router
2461        .route("/kafka/stats", get(get_kafka_stats))
2462        .route("/kafka/topics", get(get_kafka_topics))
2463        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2464        .route("/kafka/groups", get(get_kafka_groups))
2465        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2466        .route("/kafka/produce", post(produce_kafka_message))
2467        .route("/kafka/produce/batch", post(produce_kafka_batch))
2468        .route("/kafka/messages/stream", get(kafka_messages_stream));
2469
2470    #[cfg(not(feature = "kafka"))]
2471    let router = router;
2472
2473    // Migration pipeline routes
2474    let router = router
2475        .route("/migration/routes", get(get_migration_routes))
2476        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2477        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2478        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2479        .route("/migration/groups/{group}", put(set_group_migration_mode))
2480        .route("/migration/groups", get(get_migration_groups))
2481        .route("/migration/status", get(get_migration_status));
2482
2483    // Proxy replacement rules routes
2484    let router = router
2485        .route("/proxy/rules", get(list_proxy_rules))
2486        .route("/proxy/rules", post(create_proxy_rule))
2487        .route("/proxy/rules/{id}", get(get_proxy_rule))
2488        .route("/proxy/rules/{id}", put(update_proxy_rule))
2489        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2490        .route("/proxy/inspect", get(get_proxy_inspect));
2491
2492    // AI-powered features
2493    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2494
2495    // Snapshot diff endpoints
2496    let router = router.nest(
2497        "/snapshot-diff",
2498        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2499    );
2500
2501    #[cfg(feature = "behavioral-cloning")]
2502    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2503
2504    let router = router
2505        .route("/mockai/learn", post(learn_from_examples))
2506        .route("/mockai/rules/explanations", get(list_rule_explanations))
2507        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2508        .route("/chaos/config", get(get_chaos_config))
2509        .route("/chaos/config", post(update_chaos_config))
2510        .route("/network/profiles", get(list_network_profiles))
2511        .route("/network/profile/apply", post(apply_network_profile));
2512
2513    // State machine API routes
2514    let router =
2515        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2516
2517    router.with_state(state)
2518}
2519
2520#[cfg(feature = "kafka")]
2521/// Kafka broker statistics
2522#[derive(Debug, Clone, Serialize, Deserialize)]
2523pub struct KafkaBrokerStats {
2524    /// Number of topics
2525    pub topics: usize,
2526    /// Total number of partitions
2527    pub partitions: usize,
2528    /// Number of consumer groups
2529    pub consumer_groups: usize,
2530    /// Total messages produced
2531    pub messages_produced: u64,
2532    /// Total messages consumed
2533    pub messages_consumed: u64,
2534}
2535
2536#[cfg(feature = "kafka")]
2537#[allow(missing_docs)]
2538#[derive(Debug, Clone, Serialize, Deserialize)]
2539pub struct KafkaTopicInfo {
2540    pub name: String,
2541    pub partitions: usize,
2542    pub replication_factor: i32,
2543}
2544
2545#[cfg(feature = "kafka")]
2546#[allow(missing_docs)]
2547#[derive(Debug, Clone, Serialize, Deserialize)]
2548pub struct KafkaConsumerGroupInfo {
2549    pub group_id: String,
2550    pub members: usize,
2551    pub state: String,
2552}
2553
2554#[cfg(feature = "kafka")]
2555/// Get Kafka broker statistics
2556async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2557    if let Some(broker) = &state.kafka_broker {
2558        let topics = broker.topics.read().await;
2559        let consumer_groups = broker.consumer_groups.read().await;
2560
2561        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2562
2563        // Get metrics snapshot for message counts
2564        let metrics_snapshot = broker.metrics().snapshot();
2565
2566        let stats = KafkaBrokerStats {
2567            topics: topics.len(),
2568            partitions: total_partitions,
2569            consumer_groups: consumer_groups.groups().len(),
2570            messages_produced: metrics_snapshot.messages_produced_total,
2571            messages_consumed: metrics_snapshot.messages_consumed_total,
2572        };
2573
2574        Json(stats).into_response()
2575    } else {
2576        (
2577            StatusCode::SERVICE_UNAVAILABLE,
2578            Json(serde_json::json!({
2579                "error": "Kafka broker not available",
2580                "message": "Kafka broker is not enabled or not available."
2581            })),
2582        )
2583            .into_response()
2584    }
2585}
2586
2587#[cfg(feature = "kafka")]
2588/// List Kafka topics
2589async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2590    if let Some(broker) = &state.kafka_broker {
2591        let topics = broker.topics.read().await;
2592        let topic_list: Vec<KafkaTopicInfo> = topics
2593            .iter()
2594            .map(|(name, topic)| KafkaTopicInfo {
2595                name: name.clone(),
2596                partitions: topic.partitions.len(),
2597                replication_factor: topic.config.replication_factor as i32,
2598            })
2599            .collect();
2600
2601        Json(serde_json::json!({
2602            "topics": topic_list
2603        }))
2604        .into_response()
2605    } else {
2606        (
2607            StatusCode::SERVICE_UNAVAILABLE,
2608            Json(serde_json::json!({
2609                "error": "Kafka broker not available",
2610                "message": "Kafka broker is not enabled or not available."
2611            })),
2612        )
2613            .into_response()
2614    }
2615}
2616
2617#[cfg(feature = "kafka")]
2618/// Get Kafka topic details
2619async fn get_kafka_topic(
2620    State(state): State<ManagementState>,
2621    Path(topic_name): Path<String>,
2622) -> impl IntoResponse {
2623    if let Some(broker) = &state.kafka_broker {
2624        let topics = broker.topics.read().await;
2625        if let Some(topic) = topics.get(&topic_name) {
2626            Json(serde_json::json!({
2627                "name": topic_name,
2628                "partitions": topic.partitions.len(),
2629                "replication_factor": topic.config.replication_factor,
2630                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2631                    "id": idx as i32,
2632                    "leader": 0,
2633                    "replicas": vec![0],
2634                    "message_count": partition.messages.len()
2635                })).collect::<Vec<_>>()
2636            })).into_response()
2637        } else {
2638            (
2639                StatusCode::NOT_FOUND,
2640                Json(serde_json::json!({
2641                    "error": "Topic not found",
2642                    "topic": topic_name
2643                })),
2644            )
2645                .into_response()
2646        }
2647    } else {
2648        (
2649            StatusCode::SERVICE_UNAVAILABLE,
2650            Json(serde_json::json!({
2651                "error": "Kafka broker not available",
2652                "message": "Kafka broker is not enabled or not available."
2653            })),
2654        )
2655            .into_response()
2656    }
2657}
2658
2659#[cfg(feature = "kafka")]
2660/// List Kafka consumer groups
2661async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2662    if let Some(broker) = &state.kafka_broker {
2663        let consumer_groups = broker.consumer_groups.read().await;
2664        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2665            .groups()
2666            .iter()
2667            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2668                group_id: group_id.clone(),
2669                members: group.members.len(),
2670                state: "Stable".to_string(), // Simplified - could be more detailed
2671            })
2672            .collect();
2673
2674        Json(serde_json::json!({
2675            "groups": groups
2676        }))
2677        .into_response()
2678    } else {
2679        (
2680            StatusCode::SERVICE_UNAVAILABLE,
2681            Json(serde_json::json!({
2682                "error": "Kafka broker not available",
2683                "message": "Kafka broker is not enabled or not available."
2684            })),
2685        )
2686            .into_response()
2687    }
2688}
2689
2690#[cfg(feature = "kafka")]
2691/// Get Kafka consumer group details
2692async fn get_kafka_group(
2693    State(state): State<ManagementState>,
2694    Path(group_id): Path<String>,
2695) -> impl IntoResponse {
2696    if let Some(broker) = &state.kafka_broker {
2697        let consumer_groups = broker.consumer_groups.read().await;
2698        if let Some(group) = consumer_groups.groups().get(&group_id) {
2699            Json(serde_json::json!({
2700                "group_id": group_id,
2701                "members": group.members.len(),
2702                "state": "Stable",
2703                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2704                    "member_id": member_id,
2705                    "client_id": member.client_id,
2706                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2707                        "topic": a.topic,
2708                        "partitions": a.partitions
2709                    })).collect::<Vec<_>>()
2710                })).collect::<Vec<_>>(),
2711                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2712                    "topic": topic,
2713                    "partition": partition,
2714                    "offset": offset
2715                })).collect::<Vec<_>>()
2716            })).into_response()
2717        } else {
2718            (
2719                StatusCode::NOT_FOUND,
2720                Json(serde_json::json!({
2721                    "error": "Consumer group not found",
2722                    "group_id": group_id
2723                })),
2724            )
2725                .into_response()
2726        }
2727    } else {
2728        (
2729            StatusCode::SERVICE_UNAVAILABLE,
2730            Json(serde_json::json!({
2731                "error": "Kafka broker not available",
2732                "message": "Kafka broker is not enabled or not available."
2733            })),
2734        )
2735            .into_response()
2736    }
2737}
2738
2739// ========== Kafka Produce Handler ==========
2740
2741#[cfg(feature = "kafka")]
2742/// Request body for producing a Kafka message
2743#[derive(Debug, Deserialize)]
2744pub struct KafkaProduceRequest {
2745    /// Topic to produce to
2746    pub topic: String,
2747    /// Message key (optional)
2748    #[serde(default)]
2749    pub key: Option<String>,
2750    /// Message value (JSON string or plain string)
2751    pub value: String,
2752    /// Partition ID (optional, auto-assigned if not provided)
2753    #[serde(default)]
2754    pub partition: Option<i32>,
2755    /// Message headers (optional, key-value pairs)
2756    #[serde(default)]
2757    pub headers: Option<std::collections::HashMap<String, String>>,
2758}
2759
2760#[cfg(feature = "kafka")]
2761/// Produce a message to a Kafka topic
2762async fn produce_kafka_message(
2763    State(state): State<ManagementState>,
2764    Json(request): Json<KafkaProduceRequest>,
2765) -> impl IntoResponse {
2766    if let Some(broker) = &state.kafka_broker {
2767        let mut topics = broker.topics.write().await;
2768
2769        // Get or create the topic
2770        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2771            mockforge_kafka::topics::Topic::new(
2772                request.topic.clone(),
2773                mockforge_kafka::topics::TopicConfig::default(),
2774            )
2775        });
2776
2777        // Determine partition
2778        let partition_id = if let Some(partition) = request.partition {
2779            partition
2780        } else {
2781            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2782        };
2783
2784        // Validate partition exists
2785        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2786            return (
2787                StatusCode::BAD_REQUEST,
2788                Json(serde_json::json!({
2789                    "error": "Invalid partition",
2790                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2791                })),
2792            )
2793                .into_response();
2794        }
2795
2796        // Create the message
2797        let key_clone = request.key.clone();
2798        let headers_clone = request.headers.clone();
2799        let message = mockforge_kafka::partitions::KafkaMessage {
2800            offset: 0, // Will be set by partition.append
2801            timestamp: chrono::Utc::now().timestamp_millis(),
2802            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2803            value: request.value.as_bytes().to_vec(),
2804            headers: headers_clone
2805                .clone()
2806                .unwrap_or_default()
2807                .into_iter()
2808                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2809                .collect(),
2810        };
2811
2812        // Produce to partition
2813        match topic_entry.produce(partition_id, message).await {
2814            Ok(offset) => {
2815                // Record metrics for successful message production
2816                if let Some(broker) = &state.kafka_broker {
2817                    broker.metrics().record_messages_produced(1);
2818                }
2819
2820                // Emit message event for real-time monitoring
2821                #[cfg(feature = "kafka")]
2822                {
2823                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2824                        topic: request.topic.clone(),
2825                        key: key_clone,
2826                        value: request.value.clone(),
2827                        partition: partition_id,
2828                        offset,
2829                        headers: headers_clone,
2830                        timestamp: chrono::Utc::now().to_rfc3339(),
2831                    });
2832                    let _ = state.message_events.send(event);
2833                }
2834
2835                Json(serde_json::json!({
2836                    "success": true,
2837                    "message": format!("Message produced to topic '{}'", request.topic),
2838                    "topic": request.topic,
2839                    "partition": partition_id,
2840                    "offset": offset
2841                }))
2842                .into_response()
2843            }
2844            Err(e) => (
2845                StatusCode::INTERNAL_SERVER_ERROR,
2846                Json(serde_json::json!({
2847                    "error": "Failed to produce message",
2848                    "message": e.to_string()
2849                })),
2850            )
2851                .into_response(),
2852        }
2853    } else {
2854        (
2855            StatusCode::SERVICE_UNAVAILABLE,
2856            Json(serde_json::json!({
2857                "error": "Kafka broker not available",
2858                "message": "Kafka broker is not enabled or not available."
2859            })),
2860        )
2861            .into_response()
2862    }
2863}
2864
2865#[cfg(feature = "kafka")]
2866/// Request body for producing a batch of Kafka messages
2867#[derive(Debug, Deserialize)]
2868pub struct KafkaBatchProduceRequest {
2869    /// List of messages to produce
2870    pub messages: Vec<KafkaProduceRequest>,
2871    /// Delay between messages in milliseconds
2872    #[serde(default = "default_delay")]
2873    pub delay_ms: u64,
2874}
2875
2876#[cfg(feature = "kafka")]
2877/// Produce multiple messages to Kafka topics
2878async fn produce_kafka_batch(
2879    State(state): State<ManagementState>,
2880    Json(request): Json<KafkaBatchProduceRequest>,
2881) -> impl IntoResponse {
2882    if let Some(broker) = &state.kafka_broker {
2883        if request.messages.is_empty() {
2884            return (
2885                StatusCode::BAD_REQUEST,
2886                Json(serde_json::json!({
2887                    "error": "Empty batch",
2888                    "message": "At least one message is required"
2889                })),
2890            )
2891                .into_response();
2892        }
2893
2894        let mut results = Vec::new();
2895
2896        for (index, msg_request) in request.messages.iter().enumerate() {
2897            let mut topics = broker.topics.write().await;
2898
2899            // Get or create the topic
2900            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2901                mockforge_kafka::topics::Topic::new(
2902                    msg_request.topic.clone(),
2903                    mockforge_kafka::topics::TopicConfig::default(),
2904                )
2905            });
2906
2907            // Determine partition
2908            let partition_id = if let Some(partition) = msg_request.partition {
2909                partition
2910            } else {
2911                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2912            };
2913
2914            // Validate partition exists
2915            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2916                results.push(serde_json::json!({
2917                    "index": index,
2918                    "success": false,
2919                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2920                }));
2921                continue;
2922            }
2923
2924            // Create the message
2925            let message = mockforge_kafka::partitions::KafkaMessage {
2926                offset: 0,
2927                timestamp: chrono::Utc::now().timestamp_millis(),
2928                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2929                value: msg_request.value.as_bytes().to_vec(),
2930                headers: msg_request
2931                    .headers
2932                    .clone()
2933                    .unwrap_or_default()
2934                    .into_iter()
2935                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
2936                    .collect(),
2937            };
2938
2939            // Produce to partition
2940            match topic_entry.produce(partition_id, message).await {
2941                Ok(offset) => {
2942                    // Record metrics for successful message production
2943                    if let Some(broker) = &state.kafka_broker {
2944                        broker.metrics().record_messages_produced(1);
2945                    }
2946
2947                    // Emit message event
2948                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2949                        topic: msg_request.topic.clone(),
2950                        key: msg_request.key.clone(),
2951                        value: msg_request.value.clone(),
2952                        partition: partition_id,
2953                        offset,
2954                        headers: msg_request.headers.clone(),
2955                        timestamp: chrono::Utc::now().to_rfc3339(),
2956                    });
2957                    let _ = state.message_events.send(event);
2958
2959                    results.push(serde_json::json!({
2960                        "index": index,
2961                        "success": true,
2962                        "topic": msg_request.topic,
2963                        "partition": partition_id,
2964                        "offset": offset
2965                    }));
2966                }
2967                Err(e) => {
2968                    results.push(serde_json::json!({
2969                        "index": index,
2970                        "success": false,
2971                        "error": e.to_string()
2972                    }));
2973                }
2974            }
2975
2976            // Add delay between messages (except for the last one)
2977            if index < request.messages.len() - 1 && request.delay_ms > 0 {
2978                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2979            }
2980        }
2981
2982        let success_count =
2983            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2984
2985        Json(serde_json::json!({
2986            "success": true,
2987            "total": request.messages.len(),
2988            "succeeded": success_count,
2989            "failed": request.messages.len() - success_count,
2990            "results": results
2991        }))
2992        .into_response()
2993    } else {
2994        (
2995            StatusCode::SERVICE_UNAVAILABLE,
2996            Json(serde_json::json!({
2997                "error": "Kafka broker not available",
2998                "message": "Kafka broker is not enabled or not available."
2999            })),
3000        )
3001            .into_response()
3002    }
3003}
3004
3005// ========== Real-time Message Streaming (SSE) ==========
3006
3007#[cfg(feature = "mqtt")]
3008/// SSE stream for MQTT messages
3009async fn mqtt_messages_stream(
3010    State(state): State<ManagementState>,
3011    Query(params): Query<std::collections::HashMap<String, String>>,
3012) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3013    let rx = state.message_events.subscribe();
3014    let topic_filter = params.get("topic").cloned();
3015
3016    let stream = stream::unfold(rx, move |mut rx| {
3017        let topic_filter = topic_filter.clone();
3018
3019        async move {
3020            loop {
3021                match rx.recv().await {
3022                    Ok(MessageEvent::Mqtt(event)) => {
3023                        // Apply topic filter if specified
3024                        if let Some(filter) = &topic_filter {
3025                            if !event.topic.contains(filter) {
3026                                continue;
3027                            }
3028                        }
3029
3030                        let event_json = serde_json::json!({
3031                            "protocol": "mqtt",
3032                            "topic": event.topic,
3033                            "payload": event.payload,
3034                            "qos": event.qos,
3035                            "retain": event.retain,
3036                            "timestamp": event.timestamp,
3037                        });
3038
3039                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3040                            let sse_event = Event::default().event("mqtt_message").data(event_data);
3041                            return Some((Ok(sse_event), rx));
3042                        }
3043                    }
3044                    #[cfg(feature = "kafka")]
3045                    Ok(MessageEvent::Kafka(_)) => {
3046                        // Skip Kafka events in MQTT stream
3047                        continue;
3048                    }
3049                    Err(broadcast::error::RecvError::Closed) => {
3050                        return None;
3051                    }
3052                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3053                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
3054                        continue;
3055                    }
3056                }
3057            }
3058        }
3059    });
3060
3061    Sse::new(stream).keep_alive(
3062        axum::response::sse::KeepAlive::new()
3063            .interval(std::time::Duration::from_secs(15))
3064            .text("keep-alive-text"),
3065    )
3066}
3067
3068#[cfg(feature = "kafka")]
3069/// SSE stream for Kafka messages
3070async fn kafka_messages_stream(
3071    State(state): State<ManagementState>,
3072    Query(params): Query<std::collections::HashMap<String, String>>,
3073) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3074    let rx = state.message_events.subscribe();
3075    let topic_filter = params.get("topic").cloned();
3076
3077    let stream = stream::unfold(rx, move |mut rx| {
3078        let topic_filter = topic_filter.clone();
3079
3080        async move {
3081            loop {
3082                match rx.recv().await {
3083                    #[cfg(feature = "mqtt")]
3084                    Ok(MessageEvent::Mqtt(_)) => {
3085                        // Skip MQTT events in Kafka stream
3086                        continue;
3087                    }
3088                    Ok(MessageEvent::Kafka(event)) => {
3089                        // Apply topic filter if specified
3090                        if let Some(filter) = &topic_filter {
3091                            if !event.topic.contains(filter) {
3092                                continue;
3093                            }
3094                        }
3095
3096                        let event_json = serde_json::json!({
3097                            "protocol": "kafka",
3098                            "topic": event.topic,
3099                            "key": event.key,
3100                            "value": event.value,
3101                            "partition": event.partition,
3102                            "offset": event.offset,
3103                            "headers": event.headers,
3104                            "timestamp": event.timestamp,
3105                        });
3106
3107                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3108                            let sse_event =
3109                                Event::default().event("kafka_message").data(event_data);
3110                            return Some((Ok(sse_event), rx));
3111                        }
3112                    }
3113                    Err(broadcast::error::RecvError::Closed) => {
3114                        return None;
3115                    }
3116                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3117                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3118                        continue;
3119                    }
3120                }
3121            }
3122        }
3123    });
3124
3125    Sse::new(stream).keep_alive(
3126        axum::response::sse::KeepAlive::new()
3127            .interval(std::time::Duration::from_secs(15))
3128            .text("keep-alive-text"),
3129    )
3130}
3131
3132// ========== AI-Powered Features ==========
3133
3134/// Request for AI-powered API specification generation
3135#[derive(Debug, Deserialize)]
3136pub struct GenerateSpecRequest {
3137    /// Natural language description of the API to generate
3138    pub query: String,
3139    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3140    pub spec_type: String,
3141    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3142    pub api_version: Option<String>,
3143}
3144
3145/// Request for OpenAPI generation from recorded traffic
3146#[derive(Debug, Deserialize)]
3147pub struct GenerateOpenApiFromTrafficRequest {
3148    /// Path to recorder database (optional, defaults to ./recordings.db)
3149    #[serde(default)]
3150    pub database_path: Option<String>,
3151    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3152    #[serde(default)]
3153    pub since: Option<String>,
3154    /// End time for filtering (ISO 8601 format)
3155    #[serde(default)]
3156    pub until: Option<String>,
3157    /// Path pattern filter (supports wildcards, e.g., /api/*)
3158    #[serde(default)]
3159    pub path_pattern: Option<String>,
3160    /// Minimum confidence score for including paths (0.0 to 1.0)
3161    #[serde(default = "default_min_confidence")]
3162    pub min_confidence: f64,
3163}
3164
3165fn default_min_confidence() -> f64 {
3166    0.7
3167}
3168
3169/// Generate API specification from natural language using AI
3170#[cfg(feature = "data-faker")]
3171async fn generate_ai_spec(
3172    State(_state): State<ManagementState>,
3173    Json(request): Json<GenerateSpecRequest>,
3174) -> impl IntoResponse {
3175    use mockforge_data::rag::{
3176        config::{LlmProvider, RagConfig},
3177        engine::RagEngine,
3178        storage::DocumentStorage,
3179    };
3180    use std::sync::Arc;
3181
3182    // Build RAG config from environment variables
3183    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3184        .ok()
3185        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3186
3187    // Check if RAG is configured - require API key
3188    if api_key.is_none() {
3189        return (
3190            StatusCode::SERVICE_UNAVAILABLE,
3191            Json(serde_json::json!({
3192                "error": "AI service not configured",
3193                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3194            })),
3195        )
3196            .into_response();
3197    }
3198
3199    // Build RAG configuration
3200    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3201        .unwrap_or_else(|_| "openai".to_string())
3202        .to_lowercase();
3203
3204    let provider = match provider_str.as_str() {
3205        "openai" => LlmProvider::OpenAI,
3206        "anthropic" => LlmProvider::Anthropic,
3207        "ollama" => LlmProvider::Ollama,
3208        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3209        _ => LlmProvider::OpenAI,
3210    };
3211
3212    let api_endpoint =
3213        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3214            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3215            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3216            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3217            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3218        });
3219
3220    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3221        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3222        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3223        LlmProvider::Ollama => "llama2".to_string(),
3224        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3225    });
3226
3227    // Build RagConfig using struct literal with defaults
3228    let rag_config = RagConfig {
3229        provider,
3230        api_endpoint,
3231        api_key,
3232        model,
3233        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3234            .unwrap_or_else(|_| "4096".to_string())
3235            .parse()
3236            .unwrap_or(4096),
3237        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3238            .unwrap_or_else(|_| "0.3".to_string())
3239            .parse()
3240            .unwrap_or(0.3), // Lower temperature for more structured output
3241        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3242            .unwrap_or_else(|_| "60".to_string())
3243            .parse()
3244            .unwrap_or(60),
3245        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3246            .unwrap_or_else(|_| "4000".to_string())
3247            .parse()
3248            .unwrap_or(4000),
3249        ..Default::default()
3250    };
3251
3252    // Build the prompt for spec generation
3253    let spec_type_label = match request.spec_type.as_str() {
3254        "openapi" => "OpenAPI 3.0",
3255        "graphql" => "GraphQL",
3256        "asyncapi" => "AsyncAPI",
3257        _ => "OpenAPI 3.0",
3258    };
3259
3260    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3261
3262    let prompt = format!(
3263        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3264
3265User Requirements:
3266{}
3267
3268Instructions:
32691. Generate a complete, valid {} specification
32702. Include all paths, operations, request/response schemas, and components
32713. Use realistic field names and data types
32724. Include proper descriptions and examples
32735. Follow {} best practices
32746. Return ONLY the specification, no additional explanation
32757. For OpenAPI, use version {}
3276
3277Return the specification in {} format."#,
3278        spec_type_label,
3279        request.query,
3280        spec_type_label,
3281        spec_type_label,
3282        api_version,
3283        if request.spec_type == "graphql" {
3284            "GraphQL SDL"
3285        } else {
3286            "YAML"
3287        }
3288    );
3289
3290    // Create in-memory storage for RAG engine
3291    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3292    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3293    // a simpler approach: create InMemoryStorage directly
3294    use mockforge_data::rag::storage::InMemoryStorage;
3295    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3296
3297    // Create RAG engine
3298    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3299        Ok(engine) => engine,
3300        Err(e) => {
3301            return (
3302                StatusCode::INTERNAL_SERVER_ERROR,
3303                Json(serde_json::json!({
3304                    "error": "Failed to initialize RAG engine",
3305                    "message": e.to_string()
3306                })),
3307            )
3308                .into_response();
3309        }
3310    };
3311
3312    // Generate using RAG engine
3313    match rag_engine.generate(&prompt, None).await {
3314        Ok(generated_text) => {
3315            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3316            let spec = if request.spec_type == "graphql" {
3317                // For GraphQL, extract SDL
3318                extract_graphql_schema(&generated_text)
3319            } else {
3320                // For OpenAPI/AsyncAPI, extract YAML
3321                extract_yaml_spec(&generated_text)
3322            };
3323
3324            Json(serde_json::json!({
3325                "success": true,
3326                "spec": spec,
3327                "spec_type": request.spec_type,
3328            }))
3329            .into_response()
3330        }
3331        Err(e) => (
3332            StatusCode::INTERNAL_SERVER_ERROR,
3333            Json(serde_json::json!({
3334                "error": "AI generation failed",
3335                "message": e.to_string()
3336            })),
3337        )
3338            .into_response(),
3339    }
3340}
3341
3342#[cfg(not(feature = "data-faker"))]
3343async fn generate_ai_spec(
3344    State(_state): State<ManagementState>,
3345    Json(_request): Json<GenerateSpecRequest>,
3346) -> impl IntoResponse {
3347    (
3348        StatusCode::NOT_IMPLEMENTED,
3349        Json(serde_json::json!({
3350            "error": "AI features not enabled",
3351            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3352        })),
3353    )
3354        .into_response()
3355}
3356
3357/// Generate OpenAPI specification from recorded traffic
3358#[cfg(feature = "behavioral-cloning")]
3359async fn generate_openapi_from_traffic(
3360    State(_state): State<ManagementState>,
3361    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3362) -> impl IntoResponse {
3363    use chrono::{DateTime, Utc};
3364    use mockforge_core::intelligent_behavior::{
3365        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3366        IntelligentBehaviorConfig,
3367    };
3368    use mockforge_recorder::{
3369        database::RecorderDatabase,
3370        openapi_export::{QueryFilters, RecordingsToOpenApi},
3371    };
3372    use std::path::PathBuf;
3373
3374    // Determine database path
3375    let db_path = if let Some(ref path) = request.database_path {
3376        PathBuf::from(path)
3377    } else {
3378        std::env::current_dir()
3379            .unwrap_or_else(|_| PathBuf::from("."))
3380            .join("recordings.db")
3381    };
3382
3383    // Open database
3384    let db = match RecorderDatabase::new(&db_path).await {
3385        Ok(db) => db,
3386        Err(e) => {
3387            return (
3388                StatusCode::BAD_REQUEST,
3389                Json(serde_json::json!({
3390                    "error": "Database error",
3391                    "message": format!("Failed to open recorder database: {}", e)
3392                })),
3393            )
3394                .into_response();
3395        }
3396    };
3397
3398    // Parse time filters
3399    let since_dt = if let Some(ref since_str) = request.since {
3400        match DateTime::parse_from_rfc3339(since_str) {
3401            Ok(dt) => Some(dt.with_timezone(&Utc)),
3402            Err(e) => {
3403                return (
3404                    StatusCode::BAD_REQUEST,
3405                    Json(serde_json::json!({
3406                        "error": "Invalid date format",
3407                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3408                    })),
3409                )
3410                    .into_response();
3411            }
3412        }
3413    } else {
3414        None
3415    };
3416
3417    let until_dt = if let Some(ref until_str) = request.until {
3418        match DateTime::parse_from_rfc3339(until_str) {
3419            Ok(dt) => Some(dt.with_timezone(&Utc)),
3420            Err(e) => {
3421                return (
3422                    StatusCode::BAD_REQUEST,
3423                    Json(serde_json::json!({
3424                        "error": "Invalid date format",
3425                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3426                    })),
3427                )
3428                    .into_response();
3429            }
3430        }
3431    } else {
3432        None
3433    };
3434
3435    // Build query filters
3436    let query_filters = QueryFilters {
3437        since: since_dt,
3438        until: until_dt,
3439        path_pattern: request.path_pattern.clone(),
3440        min_status_code: None,
3441        max_requests: Some(1000),
3442    };
3443
3444    // Query HTTP exchanges
3445    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3446    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3447    // dependency, so we need to manually convert to the local version.
3448    let exchanges_from_recorder =
3449        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3450            Ok(exchanges) => exchanges,
3451            Err(e) => {
3452                return (
3453                    StatusCode::INTERNAL_SERVER_ERROR,
3454                    Json(serde_json::json!({
3455                        "error": "Query error",
3456                        "message": format!("Failed to query HTTP exchanges: {}", e)
3457                    })),
3458                )
3459                    .into_response();
3460            }
3461        };
3462
3463    if exchanges_from_recorder.is_empty() {
3464        return (
3465            StatusCode::NOT_FOUND,
3466            Json(serde_json::json!({
3467                "error": "No exchanges found",
3468                "message": "No HTTP exchanges found matching the specified filters"
3469            })),
3470        )
3471            .into_response();
3472    }
3473
3474    // Convert to local HttpExchange type to avoid version mismatch
3475    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3476    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3477        .into_iter()
3478        .map(|e| LocalHttpExchange {
3479            method: e.method,
3480            path: e.path,
3481            query_params: e.query_params,
3482            headers: e.headers,
3483            body: e.body,
3484            body_encoding: e.body_encoding,
3485            status_code: e.status_code,
3486            response_headers: e.response_headers,
3487            response_body: e.response_body,
3488            response_body_encoding: e.response_body_encoding,
3489            timestamp: e.timestamp,
3490        })
3491        .collect();
3492
3493    // Create OpenAPI generator config
3494    let behavior_config = IntelligentBehaviorConfig::default();
3495    let gen_config = OpenApiGenerationConfig {
3496        min_confidence: request.min_confidence,
3497        behavior_model: Some(behavior_config.behavior_model),
3498    };
3499
3500    // Generate OpenAPI spec
3501    let generator = OpenApiSpecGenerator::new(gen_config);
3502    let result = match generator.generate_from_exchanges(exchanges).await {
3503        Ok(result) => result,
3504        Err(e) => {
3505            return (
3506                StatusCode::INTERNAL_SERVER_ERROR,
3507                Json(serde_json::json!({
3508                    "error": "Generation error",
3509                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3510                })),
3511            )
3512                .into_response();
3513        }
3514    };
3515
3516    // Prepare response
3517    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3518        raw.clone()
3519    } else {
3520        match serde_json::to_value(&result.spec.spec) {
3521            Ok(json) => json,
3522            Err(e) => {
3523                return (
3524                    StatusCode::INTERNAL_SERVER_ERROR,
3525                    Json(serde_json::json!({
3526                        "error": "Serialization error",
3527                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3528                    })),
3529                )
3530                    .into_response();
3531            }
3532        }
3533    };
3534
3535    // Build response with metadata
3536    let response = serde_json::json!({
3537        "spec": spec_json,
3538        "metadata": {
3539            "requests_analyzed": result.metadata.requests_analyzed,
3540            "paths_inferred": result.metadata.paths_inferred,
3541            "path_confidence": result.metadata.path_confidence,
3542            "generated_at": result.metadata.generated_at.to_rfc3339(),
3543            "duration_ms": result.metadata.duration_ms,
3544        }
3545    });
3546
3547    Json(response).into_response()
3548}
3549
3550/// List all rule explanations
3551async fn list_rule_explanations(
3552    State(state): State<ManagementState>,
3553    Query(params): Query<std::collections::HashMap<String, String>>,
3554) -> impl IntoResponse {
3555    use mockforge_core::intelligent_behavior::RuleType;
3556
3557    let explanations = state.rule_explanations.read().await;
3558    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3559
3560    // Filter by rule type if provided
3561    if let Some(rule_type_str) = params.get("rule_type") {
3562        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3563            explanations_vec.retain(|e| e.rule_type == rule_type);
3564        }
3565    }
3566
3567    // Filter by minimum confidence if provided
3568    if let Some(min_confidence_str) = params.get("min_confidence") {
3569        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3570            explanations_vec.retain(|e| e.confidence >= min_confidence);
3571        }
3572    }
3573
3574    // Sort by confidence (descending) and then by generated_at (descending)
3575    explanations_vec.sort_by(|a, b| {
3576        b.confidence
3577            .partial_cmp(&a.confidence)
3578            .unwrap_or(std::cmp::Ordering::Equal)
3579            .then_with(|| b.generated_at.cmp(&a.generated_at))
3580    });
3581
3582    Json(serde_json::json!({
3583        "explanations": explanations_vec,
3584        "total": explanations_vec.len(),
3585    }))
3586    .into_response()
3587}
3588
3589/// Get a specific rule explanation by ID
3590async fn get_rule_explanation(
3591    State(state): State<ManagementState>,
3592    Path(rule_id): Path<String>,
3593) -> impl IntoResponse {
3594    let explanations = state.rule_explanations.read().await;
3595
3596    match explanations.get(&rule_id) {
3597        Some(explanation) => Json(serde_json::json!({
3598            "explanation": explanation,
3599        }))
3600        .into_response(),
3601        None => (
3602            StatusCode::NOT_FOUND,
3603            Json(serde_json::json!({
3604                "error": "Rule explanation not found",
3605                "message": format!("No explanation found for rule ID: {}", rule_id)
3606            })),
3607        )
3608            .into_response(),
3609    }
3610}
3611
3612/// Request for learning from examples
3613#[derive(Debug, Deserialize)]
3614pub struct LearnFromExamplesRequest {
3615    /// Example request/response pairs to learn from
3616    pub examples: Vec<ExamplePairRequest>,
3617    /// Optional configuration override
3618    #[serde(default)]
3619    pub config: Option<serde_json::Value>,
3620}
3621
3622/// Example pair request format
3623#[derive(Debug, Deserialize)]
3624pub struct ExamplePairRequest {
3625    /// Request data (method, path, body, etc.)
3626    pub request: serde_json::Value,
3627    /// Response data (status_code, body, etc.)
3628    pub response: serde_json::Value,
3629}
3630
3631/// Learn behavioral rules from example pairs
3632///
3633/// This endpoint accepts example request/response pairs, generates behavioral rules
3634/// with explanations, and stores the explanations for later retrieval.
3635async fn learn_from_examples(
3636    State(state): State<ManagementState>,
3637    Json(request): Json<LearnFromExamplesRequest>,
3638) -> impl IntoResponse {
3639    use mockforge_core::intelligent_behavior::{
3640        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3641        rule_generator::{ExamplePair, RuleGenerator},
3642    };
3643
3644    if request.examples.is_empty() {
3645        return (
3646            StatusCode::BAD_REQUEST,
3647            Json(serde_json::json!({
3648                "error": "No examples provided",
3649                "message": "At least one example pair is required"
3650            })),
3651        )
3652            .into_response();
3653    }
3654
3655    // Convert request examples to ExamplePair format
3656    let example_pairs: Result<Vec<ExamplePair>, String> = request
3657        .examples
3658        .into_iter()
3659        .enumerate()
3660        .map(|(idx, ex)| {
3661            // Parse request JSON to extract method, path, body, etc.
3662            let method = ex
3663                .request
3664                .get("method")
3665                .and_then(|v| v.as_str())
3666                .map(|s| s.to_string())
3667                .unwrap_or_else(|| "GET".to_string());
3668            let path = ex
3669                .request
3670                .get("path")
3671                .and_then(|v| v.as_str())
3672                .map(|s| s.to_string())
3673                .unwrap_or_else(|| "/".to_string());
3674            let request_body = ex.request.get("body").cloned();
3675            let query_params = ex
3676                .request
3677                .get("query_params")
3678                .and_then(|v| v.as_object())
3679                .map(|obj| {
3680                    obj.iter()
3681                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3682                        .collect()
3683                })
3684                .unwrap_or_default();
3685            let headers = ex
3686                .request
3687                .get("headers")
3688                .and_then(|v| v.as_object())
3689                .map(|obj| {
3690                    obj.iter()
3691                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3692                        .collect()
3693                })
3694                .unwrap_or_default();
3695
3696            // Parse response JSON to extract status, body, etc.
3697            let status = ex
3698                .response
3699                .get("status_code")
3700                .or_else(|| ex.response.get("status"))
3701                .and_then(|v| v.as_u64())
3702                .map(|n| n as u16)
3703                .unwrap_or(200);
3704            let response_body = ex.response.get("body").cloned();
3705
3706            Ok(ExamplePair {
3707                method,
3708                path,
3709                request: request_body,
3710                status,
3711                response: response_body,
3712                query_params,
3713                headers,
3714                metadata: {
3715                    let mut meta = std::collections::HashMap::new();
3716                    meta.insert("source".to_string(), "api".to_string());
3717                    meta.insert("example_index".to_string(), idx.to_string());
3718                    meta
3719                },
3720            })
3721        })
3722        .collect();
3723
3724    let example_pairs = match example_pairs {
3725        Ok(pairs) => pairs,
3726        Err(e) => {
3727            return (
3728                StatusCode::BAD_REQUEST,
3729                Json(serde_json::json!({
3730                    "error": "Invalid examples",
3731                    "message": e
3732                })),
3733            )
3734                .into_response();
3735        }
3736    };
3737
3738    // Create behavior config (use provided config or default)
3739    let behavior_config = if let Some(config_json) = request.config {
3740        // Try to deserialize custom config, fall back to default
3741        serde_json::from_value(config_json)
3742            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3743            .behavior_model
3744    } else {
3745        BehaviorModelConfig::default()
3746    };
3747
3748    // Create rule generator
3749    let generator = RuleGenerator::new(behavior_config);
3750
3751    // Generate rules with explanations
3752    let (rules, explanations) =
3753        match generator.generate_rules_with_explanations(example_pairs).await {
3754            Ok(result) => result,
3755            Err(e) => {
3756                return (
3757                    StatusCode::INTERNAL_SERVER_ERROR,
3758                    Json(serde_json::json!({
3759                        "error": "Rule generation failed",
3760                        "message": format!("Failed to generate rules: {}", e)
3761                    })),
3762                )
3763                    .into_response();
3764            }
3765        };
3766
3767    // Store explanations in ManagementState
3768    {
3769        let mut stored_explanations = state.rule_explanations.write().await;
3770        for explanation in &explanations {
3771            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3772        }
3773    }
3774
3775    // Prepare response
3776    let response = serde_json::json!({
3777        "success": true,
3778        "rules_generated": {
3779            "consistency_rules": rules.consistency_rules.len(),
3780            "schemas": rules.schemas.len(),
3781            "state_machines": rules.state_transitions.len(),
3782            "system_prompt": !rules.system_prompt.is_empty(),
3783        },
3784        "explanations": explanations.iter().map(|e| serde_json::json!({
3785            "rule_id": e.rule_id,
3786            "rule_type": e.rule_type,
3787            "confidence": e.confidence,
3788            "reasoning": e.reasoning,
3789        })).collect::<Vec<_>>(),
3790        "total_explanations": explanations.len(),
3791    });
3792
3793    Json(response).into_response()
3794}
3795
3796#[cfg(feature = "data-faker")]
3797fn extract_yaml_spec(text: &str) -> String {
3798    // Try to find YAML code blocks
3799    if let Some(start) = text.find("```yaml") {
3800        let yaml_start = text[start + 7..].trim_start();
3801        if let Some(end) = yaml_start.find("```") {
3802            return yaml_start[..end].trim().to_string();
3803        }
3804    }
3805    if let Some(start) = text.find("```") {
3806        let content_start = text[start + 3..].trim_start();
3807        if let Some(end) = content_start.find("```") {
3808            return content_start[..end].trim().to_string();
3809        }
3810    }
3811
3812    // Check if it starts with openapi: or asyncapi:
3813    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3814        return text.trim().to_string();
3815    }
3816
3817    // Return as-is if no code blocks found
3818    text.trim().to_string()
3819}
3820
3821/// Extract GraphQL schema from text content
3822#[cfg(feature = "data-faker")]
3823fn extract_graphql_schema(text: &str) -> String {
3824    // Try to find GraphQL code blocks
3825    if let Some(start) = text.find("```graphql") {
3826        let schema_start = text[start + 10..].trim_start();
3827        if let Some(end) = schema_start.find("```") {
3828            return schema_start[..end].trim().to_string();
3829        }
3830    }
3831    if let Some(start) = text.find("```") {
3832        let content_start = text[start + 3..].trim_start();
3833        if let Some(end) = content_start.find("```") {
3834            return content_start[..end].trim().to_string();
3835        }
3836    }
3837
3838    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3839    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3840        return text.trim().to_string();
3841    }
3842
3843    text.trim().to_string()
3844}
3845
3846// ========== Chaos Engineering Management ==========
3847
3848/// Get current chaos engineering configuration
3849async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3850    #[cfg(feature = "chaos")]
3851    {
3852        if let Some(chaos_state) = &_state.chaos_api_state {
3853            let config = chaos_state.config.read().await;
3854            // Convert ChaosConfig to JSON response format
3855            Json(serde_json::json!({
3856                "enabled": config.enabled,
3857                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3858                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3859                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3860                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3861            }))
3862            .into_response()
3863        } else {
3864            // Chaos API not available, return default
3865            Json(serde_json::json!({
3866                "enabled": false,
3867                "latency": null,
3868                "fault_injection": null,
3869                "rate_limit": null,
3870                "traffic_shaping": null,
3871            }))
3872            .into_response()
3873        }
3874    }
3875    #[cfg(not(feature = "chaos"))]
3876    {
3877        // Chaos feature not enabled
3878        Json(serde_json::json!({
3879            "enabled": false,
3880            "latency": null,
3881            "fault_injection": null,
3882            "rate_limit": null,
3883            "traffic_shaping": null,
3884        }))
3885        .into_response()
3886    }
3887}
3888
3889/// Request to update chaos configuration
3890#[derive(Debug, Deserialize)]
3891pub struct ChaosConfigUpdate {
3892    /// Whether to enable chaos engineering
3893    pub enabled: Option<bool>,
3894    /// Latency configuration
3895    pub latency: Option<serde_json::Value>,
3896    /// Fault injection configuration
3897    pub fault_injection: Option<serde_json::Value>,
3898    /// Rate limiting configuration
3899    pub rate_limit: Option<serde_json::Value>,
3900    /// Traffic shaping configuration
3901    pub traffic_shaping: Option<serde_json::Value>,
3902}
3903
3904/// Update chaos engineering configuration
3905async fn update_chaos_config(
3906    State(_state): State<ManagementState>,
3907    Json(_config_update): Json<ChaosConfigUpdate>,
3908) -> impl IntoResponse {
3909    #[cfg(feature = "chaos")]
3910    {
3911        if let Some(chaos_state) = &_state.chaos_api_state {
3912            use mockforge_chaos::config::{
3913                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3914            };
3915
3916            let mut config = chaos_state.config.write().await;
3917
3918            // Update enabled flag if provided
3919            if let Some(enabled) = _config_update.enabled {
3920                config.enabled = enabled;
3921            }
3922
3923            // Update latency config if provided
3924            if let Some(latency_json) = _config_update.latency {
3925                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3926                    config.latency = Some(latency);
3927                }
3928            }
3929
3930            // Update fault injection config if provided
3931            if let Some(fault_json) = _config_update.fault_injection {
3932                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3933                    config.fault_injection = Some(fault);
3934                }
3935            }
3936
3937            // Update rate limit config if provided
3938            if let Some(rate_json) = _config_update.rate_limit {
3939                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3940                    config.rate_limit = Some(rate);
3941                }
3942            }
3943
3944            // Update traffic shaping config if provided
3945            if let Some(traffic_json) = _config_update.traffic_shaping {
3946                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3947                    config.traffic_shaping = Some(traffic);
3948                }
3949            }
3950
3951            // Reinitialize middleware injectors with new config
3952            // The middleware will pick up the changes on the next request
3953            drop(config);
3954
3955            info!("Chaos configuration updated successfully");
3956            Json(serde_json::json!({
3957                "success": true,
3958                "message": "Chaos configuration updated and applied"
3959            }))
3960            .into_response()
3961        } else {
3962            (
3963                StatusCode::SERVICE_UNAVAILABLE,
3964                Json(serde_json::json!({
3965                    "success": false,
3966                    "error": "Chaos API not available",
3967                    "message": "Chaos engineering is not enabled or configured"
3968                })),
3969            )
3970                .into_response()
3971        }
3972    }
3973    #[cfg(not(feature = "chaos"))]
3974    {
3975        (
3976            StatusCode::NOT_IMPLEMENTED,
3977            Json(serde_json::json!({
3978                "success": false,
3979                "error": "Chaos feature not enabled",
3980                "message": "Chaos engineering feature is not compiled into this build"
3981            })),
3982        )
3983            .into_response()
3984    }
3985}
3986
3987// ========== Network Profile Management ==========
3988
3989/// List available network profiles
3990async fn list_network_profiles() -> impl IntoResponse {
3991    use mockforge_core::network_profiles::NetworkProfileCatalog;
3992
3993    let catalog = NetworkProfileCatalog::default();
3994    let profiles: Vec<serde_json::Value> = catalog
3995        .list_profiles_with_description()
3996        .iter()
3997        .map(|(name, description)| {
3998            serde_json::json!({
3999                "name": name,
4000                "description": description,
4001            })
4002        })
4003        .collect();
4004
4005    Json(serde_json::json!({
4006        "profiles": profiles
4007    }))
4008    .into_response()
4009}
4010
4011#[derive(Debug, Deserialize)]
4012/// Request to apply a network profile
4013pub struct ApplyNetworkProfileRequest {
4014    /// Name of the network profile to apply
4015    pub profile_name: String,
4016}
4017
4018/// Apply a network profile
4019async fn apply_network_profile(
4020    State(state): State<ManagementState>,
4021    Json(request): Json<ApplyNetworkProfileRequest>,
4022) -> impl IntoResponse {
4023    use mockforge_core::network_profiles::NetworkProfileCatalog;
4024
4025    let catalog = NetworkProfileCatalog::default();
4026    if let Some(profile) = catalog.get(&request.profile_name) {
4027        // Apply profile to server configuration if available
4028        // NetworkProfile contains latency and traffic_shaping configs
4029        if let Some(server_config) = &state.server_config {
4030            let mut config = server_config.write().await;
4031
4032            // Apply network profile's traffic shaping to core config
4033            use mockforge_core::config::NetworkShapingConfig;
4034
4035            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
4036            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
4037            // which has bandwidth and burst_loss fields
4038            let network_shaping = NetworkShapingConfig {
4039                enabled: profile.traffic_shaping.bandwidth.enabled
4040                    || profile.traffic_shaping.burst_loss.enabled,
4041                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4042                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4043                max_connections: 1000, // Default value
4044            };
4045
4046            // Update chaos config if it exists, or create it
4047            // Chaos config is in observability.chaos, not core.chaos
4048            if let Some(ref mut chaos) = config.observability.chaos {
4049                chaos.traffic_shaping = Some(network_shaping);
4050            } else {
4051                // Create minimal chaos config with traffic shaping
4052                use mockforge_core::config::ChaosEngConfig;
4053                config.observability.chaos = Some(ChaosEngConfig {
4054                    enabled: true,
4055                    latency: None,
4056                    fault_injection: None,
4057                    rate_limit: None,
4058                    traffic_shaping: Some(network_shaping),
4059                    scenario: None,
4060                });
4061            }
4062
4063            info!("Network profile '{}' applied to server configuration", request.profile_name);
4064        } else {
4065            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4066        }
4067
4068        // Also update chaos API state if available
4069        #[cfg(feature = "chaos")]
4070        {
4071            if let Some(chaos_state) = &state.chaos_api_state {
4072                use mockforge_chaos::config::TrafficShapingConfig;
4073
4074                let mut chaos_config = chaos_state.config.write().await;
4075                // Apply profile's traffic shaping to chaos API state
4076                let chaos_traffic_shaping = TrafficShapingConfig {
4077                    enabled: profile.traffic_shaping.bandwidth.enabled
4078                        || profile.traffic_shaping.burst_loss.enabled,
4079                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4080                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4081                    max_connections: 0,
4082                    connection_timeout_ms: 30000,
4083                };
4084                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4085                chaos_config.enabled = true; // Enable chaos when applying a profile
4086                drop(chaos_config);
4087                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4088            }
4089        }
4090
4091        Json(serde_json::json!({
4092            "success": true,
4093            "message": format!("Network profile '{}' applied", request.profile_name),
4094            "profile": {
4095                "name": profile.name,
4096                "description": profile.description,
4097            }
4098        }))
4099        .into_response()
4100    } else {
4101        (
4102            StatusCode::NOT_FOUND,
4103            Json(serde_json::json!({
4104                "error": "Profile not found",
4105                "message": format!("Network profile '{}' not found", request.profile_name)
4106            })),
4107        )
4108            .into_response()
4109    }
4110}
4111
4112/// Build the management API router with UI Builder support
4113pub fn management_router_with_ui_builder(
4114    state: ManagementState,
4115    server_config: mockforge_core::config::ServerConfig,
4116) -> Router {
4117    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4118
4119    // Create the base management router
4120    let management = management_router(state);
4121
4122    // Create UI Builder state and router
4123    let ui_builder_state = UIBuilderState::new(server_config);
4124    let ui_builder = create_ui_builder_router(ui_builder_state);
4125
4126    // Nest UI Builder under /ui-builder
4127    management.nest("/ui-builder", ui_builder)
4128}
4129
4130/// Build management router with spec import API
4131pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4132    use crate::spec_import::{spec_import_router, SpecImportState};
4133
4134    // Create base management router
4135    let management = management_router(state);
4136
4137    // Merge with spec import router
4138    Router::new()
4139        .merge(management)
4140        .merge(spec_import_router(SpecImportState::new()))
4141}
4142
4143#[cfg(test)]
4144mod tests {
4145    use super::*;
4146
4147    #[tokio::test]
4148    async fn test_create_and_get_mock() {
4149        let state = ManagementState::new(None, None, 3000);
4150
4151        let mock = MockConfig {
4152            id: "test-1".to_string(),
4153            name: "Test Mock".to_string(),
4154            method: "GET".to_string(),
4155            path: "/test".to_string(),
4156            response: MockResponse {
4157                body: serde_json::json!({"message": "test"}),
4158                headers: None,
4159            },
4160            enabled: true,
4161            latency_ms: None,
4162            status_code: Some(200),
4163            request_match: None,
4164            priority: None,
4165            scenario: None,
4166            required_scenario_state: None,
4167            new_scenario_state: None,
4168        };
4169
4170        // Create mock
4171        {
4172            let mut mocks = state.mocks.write().await;
4173            mocks.push(mock.clone());
4174        }
4175
4176        // Get mock
4177        let mocks = state.mocks.read().await;
4178        let found = mocks.iter().find(|m| m.id == "test-1");
4179        assert!(found.is_some());
4180        assert_eq!(found.unwrap().name, "Test Mock");
4181    }
4182
4183    #[tokio::test]
4184    async fn test_server_stats() {
4185        let state = ManagementState::new(None, None, 3000);
4186
4187        // Add some mocks
4188        {
4189            let mut mocks = state.mocks.write().await;
4190            mocks.push(MockConfig {
4191                id: "1".to_string(),
4192                name: "Mock 1".to_string(),
4193                method: "GET".to_string(),
4194                path: "/test1".to_string(),
4195                response: MockResponse {
4196                    body: serde_json::json!({}),
4197                    headers: None,
4198                },
4199                enabled: true,
4200                latency_ms: None,
4201                status_code: Some(200),
4202                request_match: None,
4203                priority: None,
4204                scenario: None,
4205                required_scenario_state: None,
4206                new_scenario_state: None,
4207            });
4208            mocks.push(MockConfig {
4209                id: "2".to_string(),
4210                name: "Mock 2".to_string(),
4211                method: "POST".to_string(),
4212                path: "/test2".to_string(),
4213                response: MockResponse {
4214                    body: serde_json::json!({}),
4215                    headers: None,
4216                },
4217                enabled: false,
4218                latency_ms: None,
4219                status_code: Some(201),
4220                request_match: None,
4221                priority: None,
4222                scenario: None,
4223                required_scenario_state: None,
4224                new_scenario_state: None,
4225            });
4226        }
4227
4228        let mocks = state.mocks.read().await;
4229        assert_eq!(mocks.len(), 2);
4230        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4231    }
4232
4233    #[test]
4234    fn test_mock_matches_request_with_xpath_absolute_path() {
4235        let mock = MockConfig {
4236            id: "xpath-1".to_string(),
4237            name: "XPath Match".to_string(),
4238            method: "POST".to_string(),
4239            path: "/xml".to_string(),
4240            response: MockResponse {
4241                body: serde_json::json!({"ok": true}),
4242                headers: None,
4243            },
4244            enabled: true,
4245            latency_ms: None,
4246            status_code: Some(200),
4247            request_match: Some(RequestMatchCriteria {
4248                xpath: Some("/root/order/id".to_string()),
4249                ..Default::default()
4250            }),
4251            priority: None,
4252            scenario: None,
4253            required_scenario_state: None,
4254            new_scenario_state: None,
4255        };
4256
4257        let body = br#"<root><order><id>123</id></order></root>"#;
4258        let headers = std::collections::HashMap::new();
4259        let query = std::collections::HashMap::new();
4260
4261        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4262    }
4263
4264    #[test]
4265    fn test_mock_matches_request_with_xpath_text_predicate() {
4266        let mock = MockConfig {
4267            id: "xpath-2".to_string(),
4268            name: "XPath Predicate Match".to_string(),
4269            method: "POST".to_string(),
4270            path: "/xml".to_string(),
4271            response: MockResponse {
4272                body: serde_json::json!({"ok": true}),
4273                headers: None,
4274            },
4275            enabled: true,
4276            latency_ms: None,
4277            status_code: Some(200),
4278            request_match: Some(RequestMatchCriteria {
4279                xpath: Some("//order/id[text()='123']".to_string()),
4280                ..Default::default()
4281            }),
4282            priority: None,
4283            scenario: None,
4284            required_scenario_state: None,
4285            new_scenario_state: None,
4286        };
4287
4288        let body = br#"<root><order><id>123</id></order></root>"#;
4289        let headers = std::collections::HashMap::new();
4290        let query = std::collections::HashMap::new();
4291
4292        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4293    }
4294
4295    #[test]
4296    fn test_mock_matches_request_with_xpath_no_match() {
4297        let mock = MockConfig {
4298            id: "xpath-3".to_string(),
4299            name: "XPath No Match".to_string(),
4300            method: "POST".to_string(),
4301            path: "/xml".to_string(),
4302            response: MockResponse {
4303                body: serde_json::json!({"ok": true}),
4304                headers: None,
4305            },
4306            enabled: true,
4307            latency_ms: None,
4308            status_code: Some(200),
4309            request_match: Some(RequestMatchCriteria {
4310                xpath: Some("//order/id[text()='456']".to_string()),
4311                ..Default::default()
4312            }),
4313            priority: None,
4314            scenario: None,
4315            required_scenario_state: None,
4316            new_scenario_state: None,
4317        };
4318
4319        let body = br#"<root><order><id>123</id></order></root>"#;
4320        let headers = std::collections::HashMap::new();
4321        let query = std::collections::HashMap::new();
4322
4323        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4324    }
4325}