Skip to main content

mockforge_http/
management.rs

1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3/// Management API for MockForge
4///
5/// Provides REST endpoints for controlling mocks, server configuration,
6/// and integration with developer tools (VS Code extension, CI/CD, etc.)
7use axum::{
8    extract::{Path, Query, State},
9    http::StatusCode,
10    response::{IntoResponse, Json},
11    routing::{delete, get, post, put},
12    Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18    BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29/// Default broadcast channel capacity for message events
30#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33/// Get the broadcast channel capacity from environment or use default
34#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36    std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37        .ok()
38        .and_then(|s| s.parse().ok())
39        .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42/// Message event types for real-time monitoring
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47    #[cfg(feature = "mqtt")]
48    /// MQTT message event
49    Mqtt(MqttMessageEvent),
50    #[cfg(feature = "kafka")]
51    /// Kafka message event
52    Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56/// MQTT message event for real-time monitoring
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59    /// MQTT topic name
60    pub topic: String,
61    /// Message payload content
62    pub payload: String,
63    /// Quality of Service level (0, 1, or 2)
64    pub qos: u8,
65    /// Whether the message is retained
66    pub retain: bool,
67    /// RFC3339 formatted timestamp
68    pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75    pub topic: String,
76    pub key: Option<String>,
77    pub value: String,
78    pub partition: i32,
79    pub offset: i64,
80    pub headers: Option<std::collections::HashMap<String, String>>,
81    pub timestamp: String,
82}
83
84/// Mock configuration representation
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87    /// Unique identifier for the mock
88    #[serde(default, skip_serializing_if = "String::is_empty")]
89    pub id: String,
90    /// Human-readable name for the mock
91    pub name: String,
92    /// HTTP method (GET, POST, etc.)
93    pub method: String,
94    /// API path pattern to match
95    pub path: String,
96    /// Response configuration
97    pub response: MockResponse,
98    /// Whether this mock is currently enabled
99    #[serde(default = "default_true")]
100    pub enabled: bool,
101    /// Optional latency to inject in milliseconds
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub latency_ms: Option<u64>,
104    /// Optional HTTP status code override
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub status_code: Option<u16>,
107    /// Request matching criteria (headers, query params, body patterns)
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub request_match: Option<RequestMatchCriteria>,
110    /// Priority for mock ordering (higher priority mocks are matched first)
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub priority: Option<i32>,
113    /// Scenario name for stateful mocking
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub scenario: Option<String>,
116    /// Required scenario state for this mock to be active
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub required_scenario_state: Option<String>,
119    /// New scenario state after this mock is matched
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub new_scenario_state: Option<String>,
122    /// Optimistic locking version — starts at 1 on creation, increments on each update.
123    /// When updating, the submitted version must match the stored version
124    /// or the update is rejected with HTTP 409 Conflict.
125    #[serde(default = "default_version")]
126    pub version: u64,
127}
128
129fn default_true() -> bool {
130    true
131}
132
133fn default_version() -> u64 {
134    0
135}
136
137/// Mock response configuration
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct MockResponse {
140    /// Response body as JSON
141    pub body: serde_json::Value,
142    /// Optional custom response headers
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub headers: Option<std::collections::HashMap<String, String>>,
145}
146
147/// Request matching criteria for advanced request matching
148#[derive(Debug, Clone, Serialize, Deserialize, Default)]
149pub struct RequestMatchCriteria {
150    /// Headers that must be present and match (case-insensitive header names)
151    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
152    pub headers: std::collections::HashMap<String, String>,
153    /// Query parameters that must be present and match
154    #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
155    pub query_params: std::collections::HashMap<String, String>,
156    /// Request body pattern (supports exact match or regex)
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub body_pattern: Option<String>,
159    /// JSONPath expression for JSON body matching
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub json_path: Option<String>,
162    /// XPath expression for XML body matching
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub xpath: Option<String>,
165    /// Custom matcher expression (e.g., "headers.content-type == \"application/json\"")
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub custom_matcher: Option<String>,
168}
169
170/// Check if a request matches the given mock configuration
171///
172/// This function implements comprehensive request matching including:
173/// - Method and path matching
174/// - Header matching (with regex support)
175/// - Query parameter matching
176/// - Body pattern matching (exact, regex, JSONPath, XPath)
177/// - Custom matcher expressions
178pub fn mock_matches_request(
179    mock: &MockConfig,
180    method: &str,
181    path: &str,
182    headers: &std::collections::HashMap<String, String>,
183    query_params: &std::collections::HashMap<String, String>,
184    body: Option<&[u8]>,
185) -> bool {
186    use regex::Regex;
187
188    // Check if mock is enabled
189    if !mock.enabled {
190        return false;
191    }
192
193    // Check method (case-insensitive)
194    if mock.method.to_uppercase() != method.to_uppercase() {
195        return false;
196    }
197
198    // Check path pattern (supports wildcards and path parameters)
199    if !path_matches_pattern(&mock.path, path) {
200        return false;
201    }
202
203    // Check request matching criteria if present
204    if let Some(criteria) = &mock.request_match {
205        // Check headers
206        for (key, expected_value) in &criteria.headers {
207            let header_key_lower = key.to_lowercase();
208            let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
209
210            if let Some((_, actual_value)) = found {
211                // Try regex match first, then exact match
212                if let Ok(re) = Regex::new(expected_value) {
213                    if !re.is_match(actual_value) {
214                        return false;
215                    }
216                } else if actual_value != expected_value {
217                    return false;
218                }
219            } else {
220                return false; // Header not found
221            }
222        }
223
224        // Check query parameters
225        for (key, expected_value) in &criteria.query_params {
226            if let Some(actual_value) = query_params.get(key) {
227                if actual_value != expected_value {
228                    return false;
229                }
230            } else {
231                return false; // Query param not found
232            }
233        }
234
235        // Check body pattern
236        if let Some(pattern) = &criteria.body_pattern {
237            if let Some(body_bytes) = body {
238                let body_str = String::from_utf8_lossy(body_bytes);
239                // Try regex first, then exact match
240                if let Ok(re) = Regex::new(pattern) {
241                    if !re.is_match(&body_str) {
242                        return false;
243                    }
244                } else if body_str.as_ref() != pattern {
245                    return false;
246                }
247            } else {
248                return false; // Body required but not present
249            }
250        }
251
252        // Check JSONPath (simplified implementation)
253        if let Some(json_path) = &criteria.json_path {
254            if let Some(body_bytes) = body {
255                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
256                    if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
257                        // Simple JSONPath check
258                        if !json_path_exists(&json_value, json_path) {
259                            return false;
260                        }
261                    }
262                }
263            }
264        }
265
266        // Check XPath (supports a focused subset)
267        if let Some(xpath) = &criteria.xpath {
268            if let Some(body_bytes) = body {
269                if let Ok(body_str) = std::str::from_utf8(body_bytes) {
270                    if !xml_xpath_exists(body_str, xpath) {
271                        return false;
272                    }
273                } else {
274                    return false;
275                }
276            } else {
277                return false; // Body required but not present
278            }
279        }
280
281        // Check custom matcher
282        if let Some(custom) = &criteria.custom_matcher {
283            if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
284                return false;
285            }
286        }
287    }
288
289    true
290}
291
292/// Check if a path matches a pattern (supports wildcards and path parameters)
293fn path_matches_pattern(pattern: &str, path: &str) -> bool {
294    // Exact match
295    if pattern == path {
296        return true;
297    }
298
299    // Wildcard match
300    if pattern == "*" {
301        return true;
302    }
303
304    // Path parameter matching (e.g., /users/{id} matches /users/123)
305    let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
306    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
307
308    if pattern_parts.len() != path_parts.len() {
309        // Check for wildcard patterns
310        if pattern.contains('*') {
311            return matches_wildcard_pattern(pattern, path);
312        }
313        return false;
314    }
315
316    for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
317        // Check for path parameters {param}
318        if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
319            continue; // Matches any value
320        }
321
322        if pattern_part != path_part {
323            return false;
324        }
325    }
326
327    true
328}
329
330/// Check if path matches a wildcard pattern
331fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
332    use regex::Regex;
333
334    // Convert pattern to regex
335    let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
336
337    if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
338        return re.is_match(path);
339    }
340
341    false
342}
343
344/// Check if a JSONPath exists in a JSON value
345///
346/// Supports:
347/// - `$` — root element
348/// - `$.field.subfield` — nested object access
349/// - `$.items[0].name` — array index access
350/// - `$.items[*]` — array wildcard (checks array is non-empty)
351fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
352    let path = if json_path == "$" {
353        return true;
354    } else if let Some(p) = json_path.strip_prefix("$.") {
355        p
356    } else if let Some(p) = json_path.strip_prefix('$') {
357        p.strip_prefix('.').unwrap_or(p)
358    } else {
359        json_path
360    };
361
362    let mut current = json;
363    for segment in split_json_path_segments(path) {
364        match segment {
365            JsonPathSegment::Field(name) => {
366                if let Some(obj) = current.as_object() {
367                    if let Some(value) = obj.get(name) {
368                        current = value;
369                    } else {
370                        return false;
371                    }
372                } else {
373                    return false;
374                }
375            }
376            JsonPathSegment::Index(idx) => {
377                if let Some(arr) = current.as_array() {
378                    if let Some(value) = arr.get(idx) {
379                        current = value;
380                    } else {
381                        return false;
382                    }
383                } else {
384                    return false;
385                }
386            }
387            JsonPathSegment::Wildcard => {
388                if let Some(arr) = current.as_array() {
389                    return !arr.is_empty();
390                }
391                return false;
392            }
393        }
394    }
395    true
396}
397
398enum JsonPathSegment<'a> {
399    Field(&'a str),
400    Index(usize),
401    Wildcard,
402}
403
404/// Split a JSONPath (without the leading `$`) into segments
405fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
406    let mut segments = Vec::new();
407    for part in path.split('.') {
408        if part.is_empty() {
409            continue;
410        }
411        if let Some(bracket_start) = part.find('[') {
412            let field_name = &part[..bracket_start];
413            if !field_name.is_empty() {
414                segments.push(JsonPathSegment::Field(field_name));
415            }
416            let bracket_content = &part[bracket_start + 1..part.len() - 1];
417            if bracket_content == "*" {
418                segments.push(JsonPathSegment::Wildcard);
419            } else if let Ok(idx) = bracket_content.parse::<usize>() {
420                segments.push(JsonPathSegment::Index(idx));
421            }
422        } else {
423            segments.push(JsonPathSegment::Field(part));
424        }
425    }
426    segments
427}
428
429#[derive(Debug, Clone, PartialEq, Eq)]
430struct XPathSegment {
431    name: String,
432    text_equals: Option<String>,
433}
434
435fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
436    if segment.is_empty() {
437        return None;
438    }
439
440    let trimmed = segment.trim();
441    if let Some(bracket_start) = trimmed.find('[') {
442        if !trimmed.ends_with(']') {
443            return None;
444        }
445
446        let name = trimmed[..bracket_start].trim();
447        let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
448        let predicate = predicate.trim();
449
450        // Support simple predicate: [text()="value"] or [text()='value']
451        if let Some(raw) = predicate.strip_prefix("text()=") {
452            let raw = raw.trim();
453            if raw.len() >= 2
454                && ((raw.starts_with('"') && raw.ends_with('"'))
455                    || (raw.starts_with('\'') && raw.ends_with('\'')))
456            {
457                let text = raw[1..raw.len() - 1].to_string();
458                if !name.is_empty() {
459                    return Some(XPathSegment {
460                        name: name.to_string(),
461                        text_equals: Some(text),
462                    });
463                }
464            }
465        }
466
467        None
468    } else {
469        Some(XPathSegment {
470            name: trimmed.to_string(),
471            text_equals: None,
472        })
473    }
474}
475
476fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
477    if !node.is_element() {
478        return false;
479    }
480    if node.tag_name().name() != segment.name {
481        return false;
482    }
483    match &segment.text_equals {
484        Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
485        None => true,
486    }
487}
488
489/// Check if an XPath expression matches an XML body.
490///
491/// Supported subset:
492/// - Absolute paths: `/root/child/item`
493/// - Descendant search: `//item` and `//parent/child`
494/// - Optional text predicate per segment: `item[text()="value"]`
495fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
496    let doc = match roxmltree::Document::parse(xml_body) {
497        Ok(doc) => doc,
498        Err(err) => {
499            tracing::warn!("Failed to parse XML for XPath matching: {}", err);
500            return false;
501        }
502    };
503
504    let expr = xpath.trim();
505    if expr.is_empty() {
506        return false;
507    }
508
509    let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
510        (true, rest)
511    } else if let Some(rest) = expr.strip_prefix('/') {
512        (false, rest)
513    } else {
514        tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
515        return false;
516    };
517
518    let segments: Vec<XPathSegment> = path_str
519        .split('/')
520        .filter(|s| !s.trim().is_empty())
521        .filter_map(parse_xpath_segment)
522        .collect();
523
524    if segments.is_empty() {
525        return false;
526    }
527
528    if is_descendant {
529        let first = &segments[0];
530        for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
531            let mut frontier = vec![node];
532            for segment in &segments[1..] {
533                let mut next_frontier = Vec::new();
534                for parent in &frontier {
535                    for child in parent.children().filter(|n| segment_matches(*n, segment)) {
536                        next_frontier.push(child);
537                    }
538                }
539                if next_frontier.is_empty() {
540                    frontier.clear();
541                    break;
542                }
543                frontier = next_frontier;
544            }
545            if !frontier.is_empty() {
546                return true;
547            }
548        }
549        false
550    } else {
551        let mut frontier = vec![doc.root_element()];
552        for (index, segment) in segments.iter().enumerate() {
553            let mut next_frontier = Vec::new();
554            for parent in &frontier {
555                if index == 0 {
556                    if segment_matches(*parent, segment) {
557                        next_frontier.push(*parent);
558                    }
559                    continue;
560                }
561                for child in parent.children().filter(|n| segment_matches(*n, segment)) {
562                    next_frontier.push(child);
563                }
564            }
565            if next_frontier.is_empty() {
566                return false;
567            }
568            frontier = next_frontier;
569        }
570        !frontier.is_empty()
571    }
572}
573
574/// Evaluate a custom matcher expression
575fn evaluate_custom_matcher(
576    expression: &str,
577    method: &str,
578    path: &str,
579    headers: &std::collections::HashMap<String, String>,
580    query_params: &std::collections::HashMap<String, String>,
581    body: Option<&[u8]>,
582) -> bool {
583    use regex::Regex;
584
585    let expr = expression.trim();
586
587    // Handle equality expressions (field == "value")
588    if expr.contains("==") {
589        let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
590        if parts.len() != 2 {
591            return false;
592        }
593
594        let field = parts[0];
595        let expected_value = parts[1].trim_matches('"').trim_matches('\'');
596
597        match field {
598            "method" => method == expected_value,
599            "path" => path == expected_value,
600            _ if field.starts_with("headers.") => {
601                let header_name = &field[8..];
602                headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
603            }
604            _ if field.starts_with("query.") => {
605                let param_name = &field[6..];
606                query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
607            }
608            _ => false,
609        }
610    }
611    // Handle regex match expressions (field =~ "pattern")
612    else if expr.contains("=~") {
613        let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
614        if parts.len() != 2 {
615            return false;
616        }
617
618        let field = parts[0];
619        let pattern = parts[1].trim_matches('"').trim_matches('\'');
620
621        if let Ok(re) = Regex::new(pattern) {
622            match field {
623                "method" => re.is_match(method),
624                "path" => re.is_match(path),
625                _ if field.starts_with("headers.") => {
626                    let header_name = &field[8..];
627                    headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
628                }
629                _ if field.starts_with("query.") => {
630                    let param_name = &field[6..];
631                    query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
632                }
633                _ => false,
634            }
635        } else {
636            false
637        }
638    }
639    // Handle contains expressions (field contains "value")
640    else if expr.contains("contains") {
641        let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
642        if parts.len() != 2 {
643            return false;
644        }
645
646        let field = parts[0];
647        let search_value = parts[1].trim_matches('"').trim_matches('\'');
648
649        match field {
650            "path" => path.contains(search_value),
651            _ if field.starts_with("headers.") => {
652                let header_name = &field[8..];
653                headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
654            }
655            _ if field.starts_with("body") => {
656                if let Some(body_bytes) = body {
657                    let body_str = String::from_utf8_lossy(body_bytes);
658                    body_str.contains(search_value)
659                } else {
660                    false
661                }
662            }
663            _ => false,
664        }
665    } else {
666        // Unknown expression format
667        tracing::warn!("Unknown custom matcher expression format: {}", expr);
668        false
669    }
670}
671
672/// Server statistics
673#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct ServerStats {
675    /// Server uptime in seconds
676    pub uptime_seconds: u64,
677    /// Total number of requests processed
678    pub total_requests: u64,
679    /// Number of active mock configurations
680    pub active_mocks: usize,
681    /// Number of currently enabled mocks
682    pub enabled_mocks: usize,
683    /// Number of registered API routes
684    pub registered_routes: usize,
685}
686
687/// Server configuration info
688#[derive(Debug, Clone, Serialize, Deserialize)]
689pub struct ServerConfig {
690    /// MockForge version string
691    pub version: String,
692    /// Server port number
693    pub port: u16,
694    /// Whether an OpenAPI spec is loaded
695    pub has_openapi_spec: bool,
696    /// Optional path to the OpenAPI spec file
697    #[serde(skip_serializing_if = "Option::is_none")]
698    pub spec_path: Option<String>,
699}
700
701/// Shared state for the management API
702#[derive(Clone)]
703pub struct ManagementState {
704    /// Collection of mock configurations
705    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
706    /// Optional OpenAPI specification
707    pub spec: Option<Arc<OpenApiSpec>>,
708    /// Optional path to the OpenAPI spec file
709    pub spec_path: Option<String>,
710    /// Server port number
711    pub port: u16,
712    /// Server start time for uptime calculation
713    pub start_time: std::time::Instant,
714    /// Counter for total requests processed
715    pub request_counter: Arc<RwLock<u64>>,
716    /// Optional proxy configuration for migration pipeline
717    pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
718    /// Optional SMTP registry for email mocking
719    #[cfg(feature = "smtp")]
720    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
721    /// Optional MQTT broker for message mocking
722    #[cfg(feature = "mqtt")]
723    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
724    /// Optional Kafka broker for event streaming
725    #[cfg(feature = "kafka")]
726    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
727    /// Broadcast channel for message events (MQTT & Kafka)
728    #[cfg(any(feature = "mqtt", feature = "kafka"))]
729    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
730    /// State machine manager for scenario state machines
731    pub state_machine_manager:
732        Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
733    /// Optional WebSocket broadcast channel for real-time updates
734    pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
735    /// Lifecycle hook registry for extensibility
736    pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
737    /// Rule explanations storage (in-memory for now)
738    pub rule_explanations: Arc<
739        RwLock<
740            std::collections::HashMap<
741                String,
742                mockforge_core::intelligent_behavior::RuleExplanation,
743            >,
744        >,
745    >,
746    /// Optional chaos API state for chaos config management
747    #[cfg(feature = "chaos")]
748    pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
749    /// Optional server configuration for profile application
750    pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
751    /// Conformance testing state
752    #[cfg(feature = "conformance")]
753    pub conformance_state: crate::handlers::conformance::ConformanceState,
754}
755
756impl ManagementState {
757    /// Create a new management state
758    ///
759    /// # Arguments
760    /// * `spec` - Optional OpenAPI specification
761    /// * `spec_path` - Optional path to the OpenAPI spec file
762    /// * `port` - Server port number
763    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
764        Self {
765            mocks: Arc::new(RwLock::new(Vec::new())),
766            spec,
767            spec_path,
768            port,
769            start_time: std::time::Instant::now(),
770            request_counter: Arc::new(RwLock::new(0)),
771            proxy_config: None,
772            #[cfg(feature = "smtp")]
773            smtp_registry: None,
774            #[cfg(feature = "mqtt")]
775            mqtt_broker: None,
776            #[cfg(feature = "kafka")]
777            kafka_broker: None,
778            #[cfg(any(feature = "mqtt", feature = "kafka"))]
779            message_events: {
780                let capacity = get_message_broadcast_capacity();
781                let (tx, _) = broadcast::channel(capacity);
782                Arc::new(tx)
783            },
784            state_machine_manager: Arc::new(RwLock::new(
785                mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
786            )),
787            ws_broadcast: None,
788            lifecycle_hooks: None,
789            rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
790            #[cfg(feature = "chaos")]
791            chaos_api_state: None,
792            server_config: None,
793            #[cfg(feature = "conformance")]
794            conformance_state: crate::handlers::conformance::ConformanceState::new(),
795        }
796    }
797
798    /// Add lifecycle hook registry to management state
799    pub fn with_lifecycle_hooks(
800        mut self,
801        hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
802    ) -> Self {
803        self.lifecycle_hooks = Some(hooks);
804        self
805    }
806
807    /// Add WebSocket broadcast channel to management state
808    pub fn with_ws_broadcast(
809        mut self,
810        ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
811    ) -> Self {
812        self.ws_broadcast = Some(ws_broadcast);
813        self
814    }
815
816    /// Add proxy configuration to management state
817    pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
818        self.proxy_config = Some(proxy_config);
819        self
820    }
821
822    #[cfg(feature = "smtp")]
823    /// Add SMTP registry to management state
824    pub fn with_smtp_registry(
825        mut self,
826        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
827    ) -> Self {
828        self.smtp_registry = Some(smtp_registry);
829        self
830    }
831
832    #[cfg(feature = "mqtt")]
833    /// Add MQTT broker to management state
834    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
835        self.mqtt_broker = Some(mqtt_broker);
836        self
837    }
838
839    #[cfg(feature = "kafka")]
840    /// Add Kafka broker to management state
841    pub fn with_kafka_broker(
842        mut self,
843        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
844    ) -> Self {
845        self.kafka_broker = Some(kafka_broker);
846        self
847    }
848
849    #[cfg(feature = "chaos")]
850    /// Add chaos API state to management state
851    pub fn with_chaos_api_state(
852        mut self,
853        chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
854    ) -> Self {
855        self.chaos_api_state = Some(chaos_api_state);
856        self
857    }
858
859    /// Add server configuration to management state
860    pub fn with_server_config(
861        mut self,
862        server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
863    ) -> Self {
864        self.server_config = Some(server_config);
865        self
866    }
867}
868
869/// List all mocks
870async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
871    let mocks = state.mocks.read().await;
872    Json(serde_json::json!({
873        "mocks": *mocks,
874        "total": mocks.len(),
875        "enabled": mocks.iter().filter(|m| m.enabled).count()
876    }))
877}
878
879/// Get a specific mock by ID
880async fn get_mock(
881    State(state): State<ManagementState>,
882    Path(id): Path<String>,
883) -> Result<Json<MockConfig>, StatusCode> {
884    let mocks = state.mocks.read().await;
885    mocks
886        .iter()
887        .find(|m| m.id == id)
888        .cloned()
889        .map(Json)
890        .ok_or(StatusCode::NOT_FOUND)
891}
892
893/// Create a new mock
894async fn create_mock(
895    State(state): State<ManagementState>,
896    Json(mut mock): Json<MockConfig>,
897) -> Result<Json<MockConfig>, StatusCode> {
898    let mut mocks = state.mocks.write().await;
899
900    // Generate ID if not provided
901    if mock.id.is_empty() {
902        mock.id = uuid::Uuid::new_v4().to_string();
903    }
904
905    // Check for duplicate ID
906    if mocks.iter().any(|m| m.id == mock.id) {
907        return Err(StatusCode::CONFLICT);
908    }
909
910    // Initialize optimistic locking version
911    mock.version = 1;
912
913    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
914
915    // Invoke lifecycle hooks
916    if let Some(hooks) = &state.lifecycle_hooks {
917        let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
918            id: mock.id.clone(),
919            name: mock.name.clone(),
920            config: serde_json::to_value(&mock).unwrap_or_default(),
921        };
922        hooks.invoke_mock_created(&event).await;
923    }
924
925    mocks.push(mock.clone());
926
927    // Broadcast WebSocket event
928    if let Some(tx) = &state.ws_broadcast {
929        if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone())) {
930            tracing::warn!("Failed to broadcast mock_created event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
931        }
932    }
933
934    Ok(Json(mock))
935}
936
937/// Update an existing mock
938async fn update_mock(
939    State(state): State<ManagementState>,
940    Path(id): Path<String>,
941    Json(mut updated_mock): Json<MockConfig>,
942) -> Result<Json<MockConfig>, (StatusCode, Json<serde_json::Value>)> {
943    let mut mocks = state.mocks.write().await;
944
945    let position = mocks
946        .iter()
947        .position(|m| m.id == id)
948        .ok_or((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Mock not found"}))))?;
949
950    // Get old mock for comparison
951    let old_mock = mocks[position].clone();
952
953    // Optimistic locking: reject update if the submitted version doesn't match
954    if updated_mock.version != old_mock.version {
955        return Err((
956            StatusCode::CONFLICT,
957            Json(serde_json::json!({
958                "error": "Version conflict",
959                "message": format!(
960                    "Mock '{}' has been modified by another request. \
961                     Expected version {}, but current version is {}. \
962                     Please fetch the latest version and retry.",
963                    id, updated_mock.version, old_mock.version
964                ),
965                "current_version": old_mock.version
966            })),
967        ));
968    }
969
970    // Increment version on successful update
971    updated_mock.version = old_mock.version + 1;
972
973    info!(
974        "Updating mock: {} (version {} -> {})",
975        id, old_mock.version, updated_mock.version
976    );
977    mocks[position] = updated_mock.clone();
978
979    // Invoke lifecycle hooks
980    if let Some(hooks) = &state.lifecycle_hooks {
981        let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
982            id: updated_mock.id.clone(),
983            name: updated_mock.name.clone(),
984            config: serde_json::to_value(&updated_mock).unwrap_or_default(),
985        };
986        hooks.invoke_mock_updated(&event).await;
987
988        // Check if enabled state changed
989        if old_mock.enabled != updated_mock.enabled {
990            let state_event = if updated_mock.enabled {
991                mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
992                    id: updated_mock.id.clone(),
993                }
994            } else {
995                mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
996                    id: updated_mock.id.clone(),
997                }
998            };
999            hooks.invoke_mock_state_changed(&state_event).await;
1000        }
1001    }
1002
1003    // Broadcast WebSocket event
1004    if let Some(tx) = &state.ws_broadcast {
1005        if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()))
1006        {
1007            tracing::warn!("Failed to broadcast mock_updated event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
1008        }
1009    }
1010
1011    Ok(Json(updated_mock))
1012}
1013
1014/// Delete a mock
1015async fn delete_mock(
1016    State(state): State<ManagementState>,
1017    Path(id): Path<String>,
1018) -> Result<StatusCode, StatusCode> {
1019    let mut mocks = state.mocks.write().await;
1020
1021    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
1022
1023    // Get mock info before deletion for lifecycle hooks
1024    let deleted_mock = mocks[position].clone();
1025
1026    info!("Deleting mock: {}", id);
1027    mocks.remove(position);
1028
1029    // Invoke lifecycle hooks
1030    if let Some(hooks) = &state.lifecycle_hooks {
1031        let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
1032            id: deleted_mock.id.clone(),
1033            name: deleted_mock.name.clone(),
1034        };
1035        hooks.invoke_mock_deleted(&event).await;
1036    }
1037
1038    // Broadcast WebSocket event
1039    if let Some(tx) = &state.ws_broadcast {
1040        if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone())) {
1041            tracing::warn!("Failed to broadcast mock_deleted event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
1042        }
1043    }
1044
1045    Ok(StatusCode::NO_CONTENT)
1046}
1047
1048/// Request to validate configuration
1049#[derive(Debug, Deserialize)]
1050pub struct ValidateConfigRequest {
1051    /// Configuration to validate (as JSON)
1052    pub config: serde_json::Value,
1053    /// Format of the configuration ("json" or "yaml")
1054    #[serde(default = "default_format")]
1055    pub format: String,
1056}
1057
1058fn default_format() -> String {
1059    "json".to_string()
1060}
1061
1062/// Validate configuration without applying it
1063async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1064    use mockforge_core::config::ServerConfig;
1065
1066    let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1067        "yaml" | "yml" => {
1068            let yaml_str = match serde_json::to_string(&request.config) {
1069                Ok(s) => s,
1070                Err(e) => {
1071                    return (
1072                        StatusCode::BAD_REQUEST,
1073                        Json(serde_json::json!({
1074                            "valid": false,
1075                            "error": format!("Failed to convert to string: {}", e),
1076                            "message": "Configuration validation failed"
1077                        })),
1078                    )
1079                        .into_response();
1080                }
1081            };
1082            serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1083        }
1084        _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1085    };
1086
1087    match config_result {
1088        Ok(_) => Json(serde_json::json!({
1089            "valid": true,
1090            "message": "Configuration is valid"
1091        }))
1092        .into_response(),
1093        Err(e) => (
1094            StatusCode::BAD_REQUEST,
1095            Json(serde_json::json!({
1096                "valid": false,
1097                "error": format!("Invalid configuration: {}", e),
1098                "message": "Configuration validation failed"
1099            })),
1100        )
1101            .into_response(),
1102    }
1103}
1104
1105/// Request for bulk configuration update
1106#[derive(Debug, Deserialize)]
1107pub struct BulkConfigUpdateRequest {
1108    /// Partial configuration updates (only specified fields will be updated)
1109    pub updates: serde_json::Value,
1110}
1111
1112/// Bulk update configuration
1113///
1114/// This endpoint allows updating multiple configuration options at once.
1115/// Only the specified fields in the updates object will be modified.
1116///
1117/// Configuration updates are applied to the server configuration if available
1118/// in ManagementState. Changes take effect immediately for supported settings.
1119async fn bulk_update_config(
1120    State(state): State<ManagementState>,
1121    Json(request): Json<BulkConfigUpdateRequest>,
1122) -> impl IntoResponse {
1123    // Validate the updates structure
1124    if !request.updates.is_object() {
1125        return (
1126            StatusCode::BAD_REQUEST,
1127            Json(serde_json::json!({
1128                "error": "Invalid request",
1129                "message": "Updates must be a JSON object"
1130            })),
1131        )
1132            .into_response();
1133    }
1134
1135    // Try to validate as partial ServerConfig
1136    use mockforge_core::config::ServerConfig;
1137
1138    // Create a minimal valid config and try to merge updates
1139    let base_config = ServerConfig::default();
1140    let base_json = match serde_json::to_value(&base_config) {
1141        Ok(v) => v,
1142        Err(e) => {
1143            return (
1144                StatusCode::INTERNAL_SERVER_ERROR,
1145                Json(serde_json::json!({
1146                    "error": "Internal error",
1147                    "message": format!("Failed to serialize base config: {}", e)
1148                })),
1149            )
1150                .into_response();
1151        }
1152    };
1153
1154    // Merge updates into base config (simplified merge)
1155    let mut merged = base_json.clone();
1156    if let (Some(merged_obj), Some(updates_obj)) =
1157        (merged.as_object_mut(), request.updates.as_object())
1158    {
1159        for (key, value) in updates_obj {
1160            merged_obj.insert(key.clone(), value.clone());
1161        }
1162    }
1163
1164    // Validate the merged config
1165    match serde_json::from_value::<ServerConfig>(merged) {
1166        Ok(validated_config) => {
1167            // Apply config if server_config is available in ManagementState
1168            if let Some(ref config_lock) = state.server_config {
1169                let mut config = config_lock.write().await;
1170                *config = validated_config;
1171                Json(serde_json::json!({
1172                    "success": true,
1173                    "message": "Bulk configuration update applied successfully",
1174                    "updates_received": request.updates,
1175                    "validated": true,
1176                    "applied": true
1177                }))
1178                .into_response()
1179            } else {
1180                Json(serde_json::json!({
1181                    "success": true,
1182                    "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1183                    "updates_received": request.updates,
1184                    "validated": true,
1185                    "applied": false
1186                }))
1187                .into_response()
1188            }
1189        }
1190        Err(e) => (
1191            StatusCode::BAD_REQUEST,
1192            Json(serde_json::json!({
1193                "error": "Invalid configuration",
1194                "message": format!("Configuration validation failed: {}", e),
1195                "validated": false
1196            })),
1197        )
1198            .into_response(),
1199    }
1200}
1201
1202/// Get server statistics
1203async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1204    let mocks = state.mocks.read().await;
1205    let request_count = *state.request_counter.read().await;
1206
1207    Json(ServerStats {
1208        uptime_seconds: state.start_time.elapsed().as_secs(),
1209        total_requests: request_count,
1210        active_mocks: mocks.len(),
1211        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1212        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
1213    })
1214}
1215
1216/// Get server configuration
1217async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1218    Json(ServerConfig {
1219        version: env!("CARGO_PKG_VERSION").to_string(),
1220        port: state.port,
1221        has_openapi_spec: state.spec.is_some(),
1222        spec_path: state.spec_path.clone(),
1223    })
1224}
1225
1226/// Serve the loaded OpenAPI spec as JSON
1227async fn get_openapi_spec(
1228    State(state): State<ManagementState>,
1229) -> Result<Json<serde_json::Value>, StatusCode> {
1230    match &state.spec {
1231        Some(spec) => match &spec.raw_document {
1232            Some(doc) => Ok(Json(doc.clone())),
1233            None => {
1234                // Fall back to serializing the parsed spec
1235                match serde_json::to_value(&spec.spec) {
1236                    Ok(val) => Ok(Json(val)),
1237                    Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1238                }
1239            }
1240        },
1241        None => Err(StatusCode::NOT_FOUND),
1242    }
1243}
1244
1245/// Health check endpoint
1246async fn health_check() -> Json<serde_json::Value> {
1247    Json(serde_json::json!({
1248        "status": "healthy",
1249        "service": "mockforge-management",
1250        "timestamp": chrono::Utc::now().to_rfc3339()
1251    }))
1252}
1253
1254/// Export format for mock configurations
1255#[derive(Debug, Clone, Serialize, Deserialize)]
1256#[serde(rename_all = "lowercase")]
1257pub enum ExportFormat {
1258    /// JSON format
1259    Json,
1260    /// YAML format
1261    Yaml,
1262}
1263
1264/// Export mocks in specified format
1265async fn export_mocks(
1266    State(state): State<ManagementState>,
1267    Query(params): Query<std::collections::HashMap<String, String>>,
1268) -> Result<(StatusCode, String), StatusCode> {
1269    let mocks = state.mocks.read().await;
1270
1271    let format = params
1272        .get("format")
1273        .map(|f| match f.as_str() {
1274            "yaml" | "yml" => ExportFormat::Yaml,
1275            _ => ExportFormat::Json,
1276        })
1277        .unwrap_or(ExportFormat::Json);
1278
1279    match format {
1280        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1281            .map(|json| (StatusCode::OK, json))
1282            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1283        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1284            .map(|yaml| (StatusCode::OK, yaml))
1285            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1286    }
1287}
1288
1289/// Import mocks from JSON/YAML
1290async fn import_mocks(
1291    State(state): State<ManagementState>,
1292    Json(mocks): Json<Vec<MockConfig>>,
1293) -> impl IntoResponse {
1294    let mut current_mocks = state.mocks.write().await;
1295    current_mocks.clear();
1296    current_mocks.extend(mocks);
1297    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1298}
1299
1300#[cfg(feature = "smtp")]
1301/// List SMTP emails in mailbox
1302async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1303    if let Some(ref smtp_registry) = state.smtp_registry {
1304        match smtp_registry.get_emails() {
1305            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1306            Err(e) => (
1307                StatusCode::INTERNAL_SERVER_ERROR,
1308                Json(serde_json::json!({
1309                    "error": "Failed to retrieve emails",
1310                    "message": e.to_string()
1311                })),
1312            ),
1313        }
1314    } else {
1315        (
1316            StatusCode::NOT_IMPLEMENTED,
1317            Json(serde_json::json!({
1318                "error": "SMTP mailbox management not available",
1319                "message": "SMTP server is not enabled or registry not available."
1320            })),
1321        )
1322    }
1323}
1324
1325/// Get specific SMTP email
1326#[cfg(feature = "smtp")]
1327async fn get_smtp_email(
1328    State(state): State<ManagementState>,
1329    Path(id): Path<String>,
1330) -> impl IntoResponse {
1331    if let Some(ref smtp_registry) = state.smtp_registry {
1332        match smtp_registry.get_email_by_id(&id) {
1333            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1334            Ok(None) => (
1335                StatusCode::NOT_FOUND,
1336                Json(serde_json::json!({
1337                    "error": "Email not found",
1338                    "id": id
1339                })),
1340            ),
1341            Err(e) => (
1342                StatusCode::INTERNAL_SERVER_ERROR,
1343                Json(serde_json::json!({
1344                    "error": "Failed to retrieve email",
1345                    "message": e.to_string()
1346                })),
1347            ),
1348        }
1349    } else {
1350        (
1351            StatusCode::NOT_IMPLEMENTED,
1352            Json(serde_json::json!({
1353                "error": "SMTP mailbox management not available",
1354                "message": "SMTP server is not enabled or registry not available."
1355            })),
1356        )
1357    }
1358}
1359
1360/// Clear SMTP mailbox
1361#[cfg(feature = "smtp")]
1362async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1363    if let Some(ref smtp_registry) = state.smtp_registry {
1364        match smtp_registry.clear_mailbox() {
1365            Ok(()) => (
1366                StatusCode::OK,
1367                Json(serde_json::json!({
1368                    "message": "Mailbox cleared successfully"
1369                })),
1370            ),
1371            Err(e) => (
1372                StatusCode::INTERNAL_SERVER_ERROR,
1373                Json(serde_json::json!({
1374                    "error": "Failed to clear mailbox",
1375                    "message": e.to_string()
1376                })),
1377            ),
1378        }
1379    } else {
1380        (
1381            StatusCode::NOT_IMPLEMENTED,
1382            Json(serde_json::json!({
1383                "error": "SMTP mailbox management not available",
1384                "message": "SMTP server is not enabled or registry not available."
1385            })),
1386        )
1387    }
1388}
1389
1390/// Export SMTP mailbox
1391#[cfg(feature = "smtp")]
1392async fn export_smtp_mailbox(
1393    Query(params): Query<std::collections::HashMap<String, String>>,
1394) -> impl IntoResponse {
1395    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1396    (
1397        StatusCode::NOT_IMPLEMENTED,
1398        Json(serde_json::json!({
1399            "error": "SMTP mailbox management not available via HTTP API",
1400            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1401            "requested_format": format
1402        })),
1403    )
1404}
1405
1406/// Search SMTP emails
1407#[cfg(feature = "smtp")]
1408async fn search_smtp_emails(
1409    State(state): State<ManagementState>,
1410    Query(params): Query<std::collections::HashMap<String, String>>,
1411) -> impl IntoResponse {
1412    if let Some(ref smtp_registry) = state.smtp_registry {
1413        let filters = EmailSearchFilters {
1414            sender: params.get("sender").cloned(),
1415            recipient: params.get("recipient").cloned(),
1416            subject: params.get("subject").cloned(),
1417            body: params.get("body").cloned(),
1418            since: params
1419                .get("since")
1420                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1421                .map(|dt| dt.with_timezone(&chrono::Utc)),
1422            until: params
1423                .get("until")
1424                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1425                .map(|dt| dt.with_timezone(&chrono::Utc)),
1426            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1427            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1428        };
1429
1430        match smtp_registry.search_emails(filters) {
1431            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1432            Err(e) => (
1433                StatusCode::INTERNAL_SERVER_ERROR,
1434                Json(serde_json::json!({
1435                    "error": "Failed to search emails",
1436                    "message": e.to_string()
1437                })),
1438            ),
1439        }
1440    } else {
1441        (
1442            StatusCode::NOT_IMPLEMENTED,
1443            Json(serde_json::json!({
1444                "error": "SMTP mailbox management not available",
1445                "message": "SMTP server is not enabled or registry not available."
1446            })),
1447        )
1448    }
1449}
1450
1451/// MQTT broker statistics
1452#[cfg(feature = "mqtt")]
1453#[derive(Debug, Clone, Serialize, Deserialize)]
1454pub struct MqttBrokerStats {
1455    /// Number of connected MQTT clients
1456    pub connected_clients: usize,
1457    /// Number of active MQTT topics
1458    pub active_topics: usize,
1459    /// Number of retained messages
1460    pub retained_messages: usize,
1461    /// Total number of subscriptions
1462    pub total_subscriptions: usize,
1463}
1464
1465/// MQTT management handlers
1466#[cfg(feature = "mqtt")]
1467async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1468    if let Some(broker) = &state.mqtt_broker {
1469        let connected_clients = broker.get_connected_clients().await.len();
1470        let active_topics = broker.get_active_topics().await.len();
1471        let stats = broker.get_topic_stats().await;
1472
1473        let broker_stats = MqttBrokerStats {
1474            connected_clients,
1475            active_topics,
1476            retained_messages: stats.retained_messages,
1477            total_subscriptions: stats.total_subscriptions,
1478        };
1479
1480        Json(broker_stats).into_response()
1481    } else {
1482        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1483    }
1484}
1485
1486#[cfg(feature = "mqtt")]
1487async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1488    if let Some(broker) = &state.mqtt_broker {
1489        let clients = broker.get_connected_clients().await;
1490        Json(serde_json::json!({
1491            "clients": clients
1492        }))
1493        .into_response()
1494    } else {
1495        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1496    }
1497}
1498
1499#[cfg(feature = "mqtt")]
1500async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1501    if let Some(broker) = &state.mqtt_broker {
1502        let topics = broker.get_active_topics().await;
1503        Json(serde_json::json!({
1504            "topics": topics
1505        }))
1506        .into_response()
1507    } else {
1508        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1509    }
1510}
1511
1512#[cfg(feature = "mqtt")]
1513async fn disconnect_mqtt_client(
1514    State(state): State<ManagementState>,
1515    Path(client_id): Path<String>,
1516) -> impl IntoResponse {
1517    if let Some(broker) = &state.mqtt_broker {
1518        match broker.disconnect_client(&client_id).await {
1519            Ok(_) => {
1520                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1521            }
1522            Err(e) => {
1523                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1524                    .into_response()
1525            }
1526        }
1527    } else {
1528        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1529    }
1530}
1531
1532// ========== MQTT Publish Handler ==========
1533
1534#[cfg(feature = "mqtt")]
1535/// Request to publish a single MQTT message
1536#[derive(Debug, Deserialize)]
1537pub struct MqttPublishRequest {
1538    /// Topic to publish to
1539    pub topic: String,
1540    /// Message payload (string or JSON)
1541    pub payload: String,
1542    /// QoS level (0, 1, or 2)
1543    #[serde(default = "default_qos")]
1544    pub qos: u8,
1545    /// Whether to retain the message
1546    #[serde(default)]
1547    pub retain: bool,
1548}
1549
1550#[cfg(feature = "mqtt")]
1551fn default_qos() -> u8 {
1552    0
1553}
1554
1555#[cfg(feature = "mqtt")]
1556/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
1557async fn publish_mqtt_message_handler(
1558    State(state): State<ManagementState>,
1559    Json(request): Json<serde_json::Value>,
1560) -> impl IntoResponse {
1561    // Extract fields from JSON manually
1562    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1563    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1564    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1565    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1566
1567    if topic.is_none() || payload.is_none() {
1568        return (
1569            StatusCode::BAD_REQUEST,
1570            Json(serde_json::json!({
1571                "error": "Invalid request",
1572                "message": "Missing required fields: topic and payload"
1573            })),
1574        );
1575    }
1576
1577    let topic = topic.unwrap();
1578    let payload = payload.unwrap();
1579
1580    if let Some(broker) = &state.mqtt_broker {
1581        // Validate QoS
1582        if qos > 2 {
1583            return (
1584                StatusCode::BAD_REQUEST,
1585                Json(serde_json::json!({
1586                    "error": "Invalid QoS",
1587                    "message": "QoS must be 0, 1, or 2"
1588                })),
1589            );
1590        }
1591
1592        // Convert payload to bytes
1593        let payload_bytes = payload.as_bytes().to_vec();
1594        let client_id = "mockforge-management-api".to_string();
1595
1596        let publish_result = broker
1597            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1598            .await
1599            .map_err(|e| format!("{}", e));
1600
1601        match publish_result {
1602            Ok(_) => {
1603                // Emit message event for real-time monitoring
1604                let event = MessageEvent::Mqtt(MqttMessageEvent {
1605                    topic: topic.clone(),
1606                    payload: payload.clone(),
1607                    qos,
1608                    retain,
1609                    timestamp: chrono::Utc::now().to_rfc3339(),
1610                });
1611                let _ = state.message_events.send(event);
1612
1613                (
1614                    StatusCode::OK,
1615                    Json(serde_json::json!({
1616                        "success": true,
1617                        "message": format!("Message published to topic '{}'", topic),
1618                        "topic": topic,
1619                        "qos": qos,
1620                        "retain": retain
1621                    })),
1622                )
1623            }
1624            Err(error_msg) => (
1625                StatusCode::INTERNAL_SERVER_ERROR,
1626                Json(serde_json::json!({
1627                    "error": "Failed to publish message",
1628                    "message": error_msg
1629                })),
1630            ),
1631        }
1632    } else {
1633        (
1634            StatusCode::SERVICE_UNAVAILABLE,
1635            Json(serde_json::json!({
1636                "error": "MQTT broker not available",
1637                "message": "MQTT broker is not enabled or not available."
1638            })),
1639        )
1640    }
1641}
1642
1643#[cfg(not(feature = "mqtt"))]
1644/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
1645async fn publish_mqtt_message_handler(
1646    State(_state): State<ManagementState>,
1647    Json(_request): Json<serde_json::Value>,
1648) -> impl IntoResponse {
1649    (
1650        StatusCode::SERVICE_UNAVAILABLE,
1651        Json(serde_json::json!({
1652            "error": "MQTT feature not enabled",
1653            "message": "MQTT support is not compiled into this build"
1654        })),
1655    )
1656}
1657
1658#[cfg(feature = "mqtt")]
1659/// Request to publish multiple MQTT messages
1660#[derive(Debug, Deserialize)]
1661pub struct MqttBatchPublishRequest {
1662    /// List of messages to publish
1663    pub messages: Vec<MqttPublishRequest>,
1664    /// Delay between messages in milliseconds
1665    #[serde(default = "default_delay")]
1666    pub delay_ms: u64,
1667}
1668
1669#[cfg(feature = "mqtt")]
1670fn default_delay() -> u64 {
1671    100
1672}
1673
1674#[cfg(feature = "mqtt")]
1675/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
1676async fn publish_mqtt_batch_handler(
1677    State(state): State<ManagementState>,
1678    Json(request): Json<serde_json::Value>,
1679) -> impl IntoResponse {
1680    // Extract fields from JSON manually
1681    let messages_json = request.get("messages").and_then(|v| v.as_array());
1682    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1683
1684    if messages_json.is_none() {
1685        return (
1686            StatusCode::BAD_REQUEST,
1687            Json(serde_json::json!({
1688                "error": "Invalid request",
1689                "message": "Missing required field: messages"
1690            })),
1691        );
1692    }
1693
1694    let messages_json = messages_json.unwrap();
1695
1696    if let Some(broker) = &state.mqtt_broker {
1697        if messages_json.is_empty() {
1698            return (
1699                StatusCode::BAD_REQUEST,
1700                Json(serde_json::json!({
1701                    "error": "Empty batch",
1702                    "message": "At least one message is required"
1703                })),
1704            );
1705        }
1706
1707        let mut results = Vec::new();
1708        let client_id = "mockforge-management-api".to_string();
1709
1710        for (index, msg_json) in messages_json.iter().enumerate() {
1711            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1712            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1713            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1714            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1715
1716            if topic.is_none() || payload.is_none() {
1717                results.push(serde_json::json!({
1718                    "index": index,
1719                    "success": false,
1720                    "error": "Missing required fields: topic and payload"
1721                }));
1722                continue;
1723            }
1724
1725            let topic = topic.unwrap();
1726            let payload = payload.unwrap();
1727
1728            // Validate QoS
1729            if qos > 2 {
1730                results.push(serde_json::json!({
1731                    "index": index,
1732                    "success": false,
1733                    "error": "Invalid QoS (must be 0, 1, or 2)"
1734                }));
1735                continue;
1736            }
1737
1738            // Convert payload to bytes
1739            let payload_bytes = payload.as_bytes().to_vec();
1740
1741            let publish_result = broker
1742                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1743                .await
1744                .map_err(|e| format!("{}", e));
1745
1746            match publish_result {
1747                Ok(_) => {
1748                    // Emit message event
1749                    let event = MessageEvent::Mqtt(MqttMessageEvent {
1750                        topic: topic.clone(),
1751                        payload: payload.clone(),
1752                        qos,
1753                        retain,
1754                        timestamp: chrono::Utc::now().to_rfc3339(),
1755                    });
1756                    let _ = state.message_events.send(event);
1757
1758                    results.push(serde_json::json!({
1759                        "index": index,
1760                        "success": true,
1761                        "topic": topic,
1762                        "qos": qos
1763                    }));
1764                }
1765                Err(error_msg) => {
1766                    results.push(serde_json::json!({
1767                        "index": index,
1768                        "success": false,
1769                        "error": error_msg
1770                    }));
1771                }
1772            }
1773
1774            // Add delay between messages (except for the last one)
1775            if index < messages_json.len() - 1 && delay_ms > 0 {
1776                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1777            }
1778        }
1779
1780        let success_count =
1781            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1782
1783        (
1784            StatusCode::OK,
1785            Json(serde_json::json!({
1786                "success": true,
1787                "total": messages_json.len(),
1788                "succeeded": success_count,
1789                "failed": messages_json.len() - success_count,
1790                "results": results
1791            })),
1792        )
1793    } else {
1794        (
1795            StatusCode::SERVICE_UNAVAILABLE,
1796            Json(serde_json::json!({
1797                "error": "MQTT broker not available",
1798                "message": "MQTT broker is not enabled or not available."
1799            })),
1800        )
1801    }
1802}
1803
1804#[cfg(not(feature = "mqtt"))]
1805/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
1806async fn publish_mqtt_batch_handler(
1807    State(_state): State<ManagementState>,
1808    Json(_request): Json<serde_json::Value>,
1809) -> impl IntoResponse {
1810    (
1811        StatusCode::SERVICE_UNAVAILABLE,
1812        Json(serde_json::json!({
1813            "error": "MQTT feature not enabled",
1814            "message": "MQTT support is not compiled into this build"
1815        })),
1816    )
1817}
1818
1819// Migration pipeline handlers
1820
1821/// Request to set migration mode
1822#[derive(Debug, Deserialize)]
1823struct SetMigrationModeRequest {
1824    mode: String,
1825}
1826
1827/// Get all migration routes
1828async fn get_migration_routes(
1829    State(state): State<ManagementState>,
1830) -> Result<Json<serde_json::Value>, StatusCode> {
1831    let proxy_config = match &state.proxy_config {
1832        Some(config) => config,
1833        None => {
1834            return Ok(Json(serde_json::json!({
1835                "error": "Migration not configured. Proxy config not available."
1836            })));
1837        }
1838    };
1839
1840    let config = proxy_config.read().await;
1841    let routes = config.get_migration_routes();
1842
1843    Ok(Json(serde_json::json!({
1844        "routes": routes
1845    })))
1846}
1847
1848/// Toggle a route's migration mode
1849async fn toggle_route_migration(
1850    State(state): State<ManagementState>,
1851    Path(pattern): Path<String>,
1852) -> Result<Json<serde_json::Value>, StatusCode> {
1853    let proxy_config = match &state.proxy_config {
1854        Some(config) => config,
1855        None => {
1856            return Ok(Json(serde_json::json!({
1857                "error": "Migration not configured. Proxy config not available."
1858            })));
1859        }
1860    };
1861
1862    let mut config = proxy_config.write().await;
1863    let new_mode = match config.toggle_route_migration(&pattern) {
1864        Some(mode) => mode,
1865        None => {
1866            return Ok(Json(serde_json::json!({
1867                "error": format!("Route pattern not found: {}", pattern)
1868            })));
1869        }
1870    };
1871
1872    Ok(Json(serde_json::json!({
1873        "pattern": pattern,
1874        "mode": format!("{:?}", new_mode).to_lowercase()
1875    })))
1876}
1877
1878/// Set a route's migration mode explicitly
1879async fn set_route_migration_mode(
1880    State(state): State<ManagementState>,
1881    Path(pattern): Path<String>,
1882    Json(request): Json<SetMigrationModeRequest>,
1883) -> Result<Json<serde_json::Value>, StatusCode> {
1884    let proxy_config = match &state.proxy_config {
1885        Some(config) => config,
1886        None => {
1887            return Ok(Json(serde_json::json!({
1888                "error": "Migration not configured. Proxy config not available."
1889            })));
1890        }
1891    };
1892
1893    use mockforge_core::proxy::config::MigrationMode;
1894    let mode = match request.mode.to_lowercase().as_str() {
1895        "mock" => MigrationMode::Mock,
1896        "shadow" => MigrationMode::Shadow,
1897        "real" => MigrationMode::Real,
1898        "auto" => MigrationMode::Auto,
1899        _ => {
1900            return Ok(Json(serde_json::json!({
1901                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1902            })));
1903        }
1904    };
1905
1906    let mut config = proxy_config.write().await;
1907    let updated = config.update_rule_migration_mode(&pattern, mode);
1908
1909    if !updated {
1910        return Ok(Json(serde_json::json!({
1911            "error": format!("Route pattern not found: {}", pattern)
1912        })));
1913    }
1914
1915    Ok(Json(serde_json::json!({
1916        "pattern": pattern,
1917        "mode": format!("{:?}", mode).to_lowercase()
1918    })))
1919}
1920
1921/// Toggle a group's migration mode
1922async fn toggle_group_migration(
1923    State(state): State<ManagementState>,
1924    Path(group): Path<String>,
1925) -> Result<Json<serde_json::Value>, StatusCode> {
1926    let proxy_config = match &state.proxy_config {
1927        Some(config) => config,
1928        None => {
1929            return Ok(Json(serde_json::json!({
1930                "error": "Migration not configured. Proxy config not available."
1931            })));
1932        }
1933    };
1934
1935    let mut config = proxy_config.write().await;
1936    let new_mode = config.toggle_group_migration(&group);
1937
1938    Ok(Json(serde_json::json!({
1939        "group": group,
1940        "mode": format!("{:?}", new_mode).to_lowercase()
1941    })))
1942}
1943
1944/// Set a group's migration mode explicitly
1945async fn set_group_migration_mode(
1946    State(state): State<ManagementState>,
1947    Path(group): Path<String>,
1948    Json(request): Json<SetMigrationModeRequest>,
1949) -> Result<Json<serde_json::Value>, StatusCode> {
1950    let proxy_config = match &state.proxy_config {
1951        Some(config) => config,
1952        None => {
1953            return Ok(Json(serde_json::json!({
1954                "error": "Migration not configured. Proxy config not available."
1955            })));
1956        }
1957    };
1958
1959    use mockforge_core::proxy::config::MigrationMode;
1960    let mode = match request.mode.to_lowercase().as_str() {
1961        "mock" => MigrationMode::Mock,
1962        "shadow" => MigrationMode::Shadow,
1963        "real" => MigrationMode::Real,
1964        "auto" => MigrationMode::Auto,
1965        _ => {
1966            return Ok(Json(serde_json::json!({
1967                "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1968            })));
1969        }
1970    };
1971
1972    let mut config = proxy_config.write().await;
1973    config.update_group_migration_mode(&group, mode);
1974
1975    Ok(Json(serde_json::json!({
1976        "group": group,
1977        "mode": format!("{:?}", mode).to_lowercase()
1978    })))
1979}
1980
1981/// Get all migration groups
1982async fn get_migration_groups(
1983    State(state): State<ManagementState>,
1984) -> Result<Json<serde_json::Value>, StatusCode> {
1985    let proxy_config = match &state.proxy_config {
1986        Some(config) => config,
1987        None => {
1988            return Ok(Json(serde_json::json!({
1989                "error": "Migration not configured. Proxy config not available."
1990            })));
1991        }
1992    };
1993
1994    let config = proxy_config.read().await;
1995    let groups = config.get_migration_groups();
1996
1997    // Convert to JSON-serializable format
1998    let groups_json: serde_json::Map<String, serde_json::Value> = groups
1999        .into_iter()
2000        .map(|(name, info)| {
2001            (
2002                name,
2003                serde_json::json!({
2004                    "name": info.name,
2005                    "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
2006                    "route_count": info.route_count
2007                }),
2008            )
2009        })
2010        .collect();
2011
2012    Ok(Json(serde_json::json!(groups_json)))
2013}
2014
2015/// Get overall migration status
2016async fn get_migration_status(
2017    State(state): State<ManagementState>,
2018) -> Result<Json<serde_json::Value>, StatusCode> {
2019    let proxy_config = match &state.proxy_config {
2020        Some(config) => config,
2021        None => {
2022            return Ok(Json(serde_json::json!({
2023                "error": "Migration not configured. Proxy config not available."
2024            })));
2025        }
2026    };
2027
2028    let config = proxy_config.read().await;
2029    let routes = config.get_migration_routes();
2030    let groups = config.get_migration_groups();
2031
2032    let mut mock_count = 0;
2033    let mut shadow_count = 0;
2034    let mut real_count = 0;
2035    let mut auto_count = 0;
2036
2037    for route in &routes {
2038        match route.migration_mode {
2039            mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
2040            mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
2041            mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
2042            mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
2043        }
2044    }
2045
2046    Ok(Json(serde_json::json!({
2047        "total_routes": routes.len(),
2048        "mock_routes": mock_count,
2049        "shadow_routes": shadow_count,
2050        "real_routes": real_count,
2051        "auto_routes": auto_count,
2052        "total_groups": groups.len(),
2053        "migration_enabled": config.migration_enabled
2054    })))
2055}
2056
2057// ========== Proxy Replacement Rules Management ==========
2058
2059/// Request body for creating/updating proxy replacement rules
2060#[derive(Debug, Deserialize, Serialize)]
2061pub struct ProxyRuleRequest {
2062    /// URL pattern to match (supports wildcards like "/api/users/*")
2063    pub pattern: String,
2064    /// Rule type: "request" or "response"
2065    #[serde(rename = "type")]
2066    pub rule_type: String,
2067    /// Optional status code filter for response rules
2068    #[serde(default)]
2069    pub status_codes: Vec<u16>,
2070    /// Body transformations to apply
2071    pub body_transforms: Vec<BodyTransformRequest>,
2072    /// Whether this rule is enabled
2073    #[serde(default = "default_true")]
2074    pub enabled: bool,
2075}
2076
2077/// Request body for individual body transformations
2078#[derive(Debug, Deserialize, Serialize)]
2079pub struct BodyTransformRequest {
2080    /// JSONPath expression to target (e.g., "$.userId", "$.email")
2081    pub path: String,
2082    /// Replacement value (supports template expansion like "{{uuid}}", "{{faker.email}}")
2083    pub replace: String,
2084    /// Operation to perform: "replace", "add", or "remove"
2085    #[serde(default)]
2086    pub operation: String,
2087}
2088
2089/// Response format for proxy rules
2090#[derive(Debug, Serialize)]
2091pub struct ProxyRuleResponse {
2092    /// Rule ID (index in the array)
2093    pub id: usize,
2094    /// URL pattern
2095    pub pattern: String,
2096    /// Rule type
2097    #[serde(rename = "type")]
2098    pub rule_type: String,
2099    /// Status codes (for response rules)
2100    pub status_codes: Vec<u16>,
2101    /// Body transformations
2102    pub body_transforms: Vec<BodyTransformRequest>,
2103    /// Whether enabled
2104    pub enabled: bool,
2105}
2106
2107/// List all proxy replacement rules
2108async fn list_proxy_rules(
2109    State(state): State<ManagementState>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111    let proxy_config = match &state.proxy_config {
2112        Some(config) => config,
2113        None => {
2114            return Ok(Json(serde_json::json!({
2115                "error": "Proxy not configured. Proxy config not available."
2116            })));
2117        }
2118    };
2119
2120    let config = proxy_config.read().await;
2121
2122    let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2123
2124    // Add request replacement rules
2125    for (idx, rule) in config.request_replacements.iter().enumerate() {
2126        rules.push(ProxyRuleResponse {
2127            id: idx,
2128            pattern: rule.pattern.clone(),
2129            rule_type: "request".to_string(),
2130            status_codes: Vec::new(),
2131            body_transforms: rule
2132                .body_transforms
2133                .iter()
2134                .map(|t| BodyTransformRequest {
2135                    path: t.path.clone(),
2136                    replace: t.replace.clone(),
2137                    operation: format!("{:?}", t.operation).to_lowercase(),
2138                })
2139                .collect(),
2140            enabled: rule.enabled,
2141        });
2142    }
2143
2144    // Add response replacement rules
2145    let request_count = config.request_replacements.len();
2146    for (idx, rule) in config.response_replacements.iter().enumerate() {
2147        rules.push(ProxyRuleResponse {
2148            id: request_count + idx,
2149            pattern: rule.pattern.clone(),
2150            rule_type: "response".to_string(),
2151            status_codes: rule.status_codes.clone(),
2152            body_transforms: rule
2153                .body_transforms
2154                .iter()
2155                .map(|t| BodyTransformRequest {
2156                    path: t.path.clone(),
2157                    replace: t.replace.clone(),
2158                    operation: format!("{:?}", t.operation).to_lowercase(),
2159                })
2160                .collect(),
2161            enabled: rule.enabled,
2162        });
2163    }
2164
2165    Ok(Json(serde_json::json!({
2166        "rules": rules
2167    })))
2168}
2169
2170/// Create a new proxy replacement rule
2171async fn create_proxy_rule(
2172    State(state): State<ManagementState>,
2173    Json(request): Json<ProxyRuleRequest>,
2174) -> Result<Json<serde_json::Value>, StatusCode> {
2175    let proxy_config = match &state.proxy_config {
2176        Some(config) => config,
2177        None => {
2178            return Ok(Json(serde_json::json!({
2179                "error": "Proxy not configured. Proxy config not available."
2180            })));
2181        }
2182    };
2183
2184    // Validate request
2185    if request.body_transforms.is_empty() {
2186        return Ok(Json(serde_json::json!({
2187            "error": "At least one body transform is required"
2188        })));
2189    }
2190
2191    let body_transforms: Vec<BodyTransform> = request
2192        .body_transforms
2193        .iter()
2194        .map(|t| {
2195            let op = match t.operation.as_str() {
2196                "replace" => TransformOperation::Replace,
2197                "add" => TransformOperation::Add,
2198                "remove" => TransformOperation::Remove,
2199                _ => TransformOperation::Replace,
2200            };
2201            BodyTransform {
2202                path: t.path.clone(),
2203                replace: t.replace.clone(),
2204                operation: op,
2205            }
2206        })
2207        .collect();
2208
2209    let new_rule = BodyTransformRule {
2210        pattern: request.pattern.clone(),
2211        status_codes: request.status_codes.clone(),
2212        body_transforms,
2213        enabled: request.enabled,
2214    };
2215
2216    let mut config = proxy_config.write().await;
2217
2218    let rule_id = if request.rule_type == "request" {
2219        config.request_replacements.push(new_rule);
2220        config.request_replacements.len() - 1
2221    } else if request.rule_type == "response" {
2222        config.response_replacements.push(new_rule);
2223        config.request_replacements.len() + config.response_replacements.len() - 1
2224    } else {
2225        return Ok(Json(serde_json::json!({
2226            "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2227        })));
2228    };
2229
2230    Ok(Json(serde_json::json!({
2231        "id": rule_id,
2232        "message": "Rule created successfully"
2233    })))
2234}
2235
2236/// Get a specific proxy replacement rule
2237async fn get_proxy_rule(
2238    State(state): State<ManagementState>,
2239    Path(id): Path<String>,
2240) -> Result<Json<serde_json::Value>, StatusCode> {
2241    let proxy_config = match &state.proxy_config {
2242        Some(config) => config,
2243        None => {
2244            return Ok(Json(serde_json::json!({
2245                "error": "Proxy not configured. Proxy config not available."
2246            })));
2247        }
2248    };
2249
2250    let config = proxy_config.read().await;
2251    let rule_id: usize = match id.parse() {
2252        Ok(id) => id,
2253        Err(_) => {
2254            return Ok(Json(serde_json::json!({
2255                "error": format!("Invalid rule ID: {}", id)
2256            })));
2257        }
2258    };
2259
2260    let request_count = config.request_replacements.len();
2261
2262    if rule_id < request_count {
2263        // Request rule
2264        let rule = &config.request_replacements[rule_id];
2265        Ok(Json(serde_json::json!({
2266            "id": rule_id,
2267            "pattern": rule.pattern,
2268            "type": "request",
2269            "status_codes": [],
2270            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2271                "path": t.path,
2272                "replace": t.replace,
2273                "operation": format!("{:?}", t.operation).to_lowercase()
2274            })).collect::<Vec<_>>(),
2275            "enabled": rule.enabled
2276        })))
2277    } else if rule_id < request_count + config.response_replacements.len() {
2278        // Response rule
2279        let response_idx = rule_id - request_count;
2280        let rule = &config.response_replacements[response_idx];
2281        Ok(Json(serde_json::json!({
2282            "id": rule_id,
2283            "pattern": rule.pattern,
2284            "type": "response",
2285            "status_codes": rule.status_codes,
2286            "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2287                "path": t.path,
2288                "replace": t.replace,
2289                "operation": format!("{:?}", t.operation).to_lowercase()
2290            })).collect::<Vec<_>>(),
2291            "enabled": rule.enabled
2292        })))
2293    } else {
2294        Ok(Json(serde_json::json!({
2295            "error": format!("Rule ID {} not found", rule_id)
2296        })))
2297    }
2298}
2299
2300/// Update a proxy replacement rule
2301async fn update_proxy_rule(
2302    State(state): State<ManagementState>,
2303    Path(id): Path<String>,
2304    Json(request): Json<ProxyRuleRequest>,
2305) -> Result<Json<serde_json::Value>, StatusCode> {
2306    let proxy_config = match &state.proxy_config {
2307        Some(config) => config,
2308        None => {
2309            return Ok(Json(serde_json::json!({
2310                "error": "Proxy not configured. Proxy config not available."
2311            })));
2312        }
2313    };
2314
2315    let mut config = proxy_config.write().await;
2316    let rule_id: usize = match id.parse() {
2317        Ok(id) => id,
2318        Err(_) => {
2319            return Ok(Json(serde_json::json!({
2320                "error": format!("Invalid rule ID: {}", id)
2321            })));
2322        }
2323    };
2324
2325    let body_transforms: Vec<BodyTransform> = request
2326        .body_transforms
2327        .iter()
2328        .map(|t| {
2329            let op = match t.operation.as_str() {
2330                "replace" => TransformOperation::Replace,
2331                "add" => TransformOperation::Add,
2332                "remove" => TransformOperation::Remove,
2333                _ => TransformOperation::Replace,
2334            };
2335            BodyTransform {
2336                path: t.path.clone(),
2337                replace: t.replace.clone(),
2338                operation: op,
2339            }
2340        })
2341        .collect();
2342
2343    let updated_rule = BodyTransformRule {
2344        pattern: request.pattern.clone(),
2345        status_codes: request.status_codes.clone(),
2346        body_transforms,
2347        enabled: request.enabled,
2348    };
2349
2350    let request_count = config.request_replacements.len();
2351
2352    if rule_id < request_count {
2353        // Update request rule
2354        config.request_replacements[rule_id] = updated_rule;
2355    } else if rule_id < request_count + config.response_replacements.len() {
2356        // Update response rule
2357        let response_idx = rule_id - request_count;
2358        config.response_replacements[response_idx] = updated_rule;
2359    } else {
2360        return Ok(Json(serde_json::json!({
2361            "error": format!("Rule ID {} not found", rule_id)
2362        })));
2363    }
2364
2365    Ok(Json(serde_json::json!({
2366        "id": rule_id,
2367        "message": "Rule updated successfully"
2368    })))
2369}
2370
2371/// Delete a proxy replacement rule
2372async fn delete_proxy_rule(
2373    State(state): State<ManagementState>,
2374    Path(id): Path<String>,
2375) -> Result<Json<serde_json::Value>, StatusCode> {
2376    let proxy_config = match &state.proxy_config {
2377        Some(config) => config,
2378        None => {
2379            return Ok(Json(serde_json::json!({
2380                "error": "Proxy not configured. Proxy config not available."
2381            })));
2382        }
2383    };
2384
2385    let mut config = proxy_config.write().await;
2386    let rule_id: usize = match id.parse() {
2387        Ok(id) => id,
2388        Err(_) => {
2389            return Ok(Json(serde_json::json!({
2390                "error": format!("Invalid rule ID: {}", id)
2391            })));
2392        }
2393    };
2394
2395    let request_count = config.request_replacements.len();
2396
2397    if rule_id < request_count {
2398        // Delete request rule
2399        config.request_replacements.remove(rule_id);
2400    } else if rule_id < request_count + config.response_replacements.len() {
2401        // Delete response rule
2402        let response_idx = rule_id - request_count;
2403        config.response_replacements.remove(response_idx);
2404    } else {
2405        return Ok(Json(serde_json::json!({
2406            "error": format!("Rule ID {} not found", rule_id)
2407        })));
2408    }
2409
2410    Ok(Json(serde_json::json!({
2411        "id": rule_id,
2412        "message": "Rule deleted successfully"
2413    })))
2414}
2415
2416/// Get proxy rules and transformation configuration for inspection
2417async fn get_proxy_inspect(
2418    State(state): State<ManagementState>,
2419    Query(params): Query<std::collections::HashMap<String, String>>,
2420) -> Result<Json<serde_json::Value>, StatusCode> {
2421    let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2422    let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2423
2424    let proxy_config = match &state.proxy_config {
2425        Some(config) => config.read().await,
2426        None => {
2427            return Ok(Json(serde_json::json!({
2428                "error": "Proxy not configured. Proxy config not available."
2429            })));
2430        }
2431    };
2432
2433    let mut rules = Vec::new();
2434    for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2435        rules.push(serde_json::json!({
2436            "id": idx,
2437            "kind": "request",
2438            "pattern": rule.pattern,
2439            "enabled": rule.enabled,
2440            "status_codes": rule.status_codes,
2441            "transform_count": rule.body_transforms.len(),
2442            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2443                "path": t.path,
2444                "operation": t.operation,
2445                "replace": t.replace
2446            })).collect::<Vec<_>>()
2447        }));
2448    }
2449    let request_rule_count = rules.len();
2450    for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2451        rules.push(serde_json::json!({
2452            "id": request_rule_count + idx,
2453            "kind": "response",
2454            "pattern": rule.pattern,
2455            "enabled": rule.enabled,
2456            "status_codes": rule.status_codes,
2457            "transform_count": rule.body_transforms.len(),
2458            "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2459                "path": t.path,
2460                "operation": t.operation,
2461                "replace": t.replace
2462            })).collect::<Vec<_>>()
2463        }));
2464    }
2465
2466    let total = rules.len();
2467    let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2468
2469    Ok(Json(serde_json::json!({
2470        "enabled": proxy_config.enabled,
2471        "target_url": proxy_config.target_url,
2472        "prefix": proxy_config.prefix,
2473        "timeout_seconds": proxy_config.timeout_seconds,
2474        "follow_redirects": proxy_config.follow_redirects,
2475        "passthrough_by_default": proxy_config.passthrough_by_default,
2476        "rules": paged_rules,
2477        "request_rule_count": request_rule_count,
2478        "response_rule_count": total.saturating_sub(request_rule_count),
2479        "limit": limit,
2480        "offset": offset,
2481        "total": total
2482    })))
2483}
2484
2485/// Build the management API router
2486pub fn management_router(state: ManagementState) -> Router {
2487    let router = Router::new()
2488        .route("/health", get(health_check))
2489        .route("/stats", get(get_stats))
2490        .route("/config", get(get_config))
2491        .route("/config/validate", post(validate_config))
2492        .route("/config/bulk", post(bulk_update_config))
2493        .route("/mocks", get(list_mocks))
2494        .route("/mocks", post(create_mock))
2495        .route("/mocks/{id}", get(get_mock))
2496        .route("/mocks/{id}", put(update_mock))
2497        .route("/mocks/{id}", delete(delete_mock))
2498        .route("/export", get(export_mocks))
2499        .route("/import", post(import_mocks))
2500        .route("/spec", get(get_openapi_spec));
2501
2502    #[cfg(feature = "smtp")]
2503    let router = router
2504        .route("/smtp/mailbox", get(list_smtp_emails))
2505        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2506        .route("/smtp/mailbox/{id}", get(get_smtp_email))
2507        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2508        .route("/smtp/mailbox/search", get(search_smtp_emails));
2509
2510    #[cfg(not(feature = "smtp"))]
2511    let router = router;
2512
2513    // MQTT routes
2514    #[cfg(feature = "mqtt")]
2515    let router = router
2516        .route("/mqtt/stats", get(get_mqtt_stats))
2517        .route("/mqtt/clients", get(get_mqtt_clients))
2518        .route("/mqtt/topics", get(get_mqtt_topics))
2519        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2520        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2521        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2522        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2523
2524    #[cfg(not(feature = "mqtt"))]
2525    let router = router
2526        .route("/mqtt/publish", post(publish_mqtt_message_handler))
2527        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2528
2529    #[cfg(feature = "kafka")]
2530    let router = router
2531        .route("/kafka/stats", get(get_kafka_stats))
2532        .route("/kafka/topics", get(get_kafka_topics))
2533        .route("/kafka/topics/{topic}", get(get_kafka_topic))
2534        .route("/kafka/groups", get(get_kafka_groups))
2535        .route("/kafka/groups/{group_id}", get(get_kafka_group))
2536        .route("/kafka/produce", post(produce_kafka_message))
2537        .route("/kafka/produce/batch", post(produce_kafka_batch))
2538        .route("/kafka/messages/stream", get(kafka_messages_stream));
2539
2540    #[cfg(not(feature = "kafka"))]
2541    let router = router;
2542
2543    // Migration pipeline routes
2544    let router = router
2545        .route("/migration/routes", get(get_migration_routes))
2546        .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2547        .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2548        .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2549        .route("/migration/groups/{group}", put(set_group_migration_mode))
2550        .route("/migration/groups", get(get_migration_groups))
2551        .route("/migration/status", get(get_migration_status));
2552
2553    // Proxy replacement rules routes
2554    let router = router
2555        .route("/proxy/rules", get(list_proxy_rules))
2556        .route("/proxy/rules", post(create_proxy_rule))
2557        .route("/proxy/rules/{id}", get(get_proxy_rule))
2558        .route("/proxy/rules/{id}", put(update_proxy_rule))
2559        .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2560        .route("/proxy/inspect", get(get_proxy_inspect));
2561
2562    // AI-powered features
2563    let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2564
2565    // Snapshot diff endpoints
2566    let router = router.nest(
2567        "/snapshot-diff",
2568        crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2569    );
2570
2571    #[cfg(feature = "behavioral-cloning")]
2572    let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2573
2574    let router = router
2575        .route("/mockai/learn", post(learn_from_examples))
2576        .route("/mockai/rules/explanations", get(list_rule_explanations))
2577        .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2578        .route("/chaos/config", get(get_chaos_config))
2579        .route("/chaos/config", post(update_chaos_config))
2580        .route("/network/profiles", get(list_network_profiles))
2581        .route("/network/profile/apply", post(apply_network_profile));
2582
2583    // State machine API routes
2584    let router =
2585        router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2586
2587    // Conformance testing API routes
2588    #[cfg(feature = "conformance")]
2589    let router = router.nest_service(
2590        "/conformance",
2591        crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2592    );
2593    #[cfg(not(feature = "conformance"))]
2594    let router = router;
2595
2596    router.with_state(state)
2597}
2598
2599#[cfg(feature = "kafka")]
2600/// Kafka broker statistics
2601#[derive(Debug, Clone, Serialize, Deserialize)]
2602pub struct KafkaBrokerStats {
2603    /// Number of topics
2604    pub topics: usize,
2605    /// Total number of partitions
2606    pub partitions: usize,
2607    /// Number of consumer groups
2608    pub consumer_groups: usize,
2609    /// Total messages produced
2610    pub messages_produced: u64,
2611    /// Total messages consumed
2612    pub messages_consumed: u64,
2613}
2614
2615#[cfg(feature = "kafka")]
2616#[allow(missing_docs)]
2617#[derive(Debug, Clone, Serialize, Deserialize)]
2618pub struct KafkaTopicInfo {
2619    pub name: String,
2620    pub partitions: usize,
2621    pub replication_factor: i32,
2622}
2623
2624#[cfg(feature = "kafka")]
2625#[allow(missing_docs)]
2626#[derive(Debug, Clone, Serialize, Deserialize)]
2627pub struct KafkaConsumerGroupInfo {
2628    pub group_id: String,
2629    pub members: usize,
2630    pub state: String,
2631}
2632
2633#[cfg(feature = "kafka")]
2634/// Get Kafka broker statistics
2635async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2636    if let Some(broker) = &state.kafka_broker {
2637        let topics = broker.topics.read().await;
2638        let consumer_groups = broker.consumer_groups.read().await;
2639
2640        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2641
2642        // Get metrics snapshot for message counts
2643        let metrics_snapshot = broker.metrics().snapshot();
2644
2645        let stats = KafkaBrokerStats {
2646            topics: topics.len(),
2647            partitions: total_partitions,
2648            consumer_groups: consumer_groups.groups().len(),
2649            messages_produced: metrics_snapshot.messages_produced_total,
2650            messages_consumed: metrics_snapshot.messages_consumed_total,
2651        };
2652
2653        Json(stats).into_response()
2654    } else {
2655        (
2656            StatusCode::SERVICE_UNAVAILABLE,
2657            Json(serde_json::json!({
2658                "error": "Kafka broker not available",
2659                "message": "Kafka broker is not enabled or not available."
2660            })),
2661        )
2662            .into_response()
2663    }
2664}
2665
2666#[cfg(feature = "kafka")]
2667/// List Kafka topics
2668async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2669    if let Some(broker) = &state.kafka_broker {
2670        let topics = broker.topics.read().await;
2671        let topic_list: Vec<KafkaTopicInfo> = topics
2672            .iter()
2673            .map(|(name, topic)| KafkaTopicInfo {
2674                name: name.clone(),
2675                partitions: topic.partitions.len(),
2676                replication_factor: topic.config.replication_factor as i32,
2677            })
2678            .collect();
2679
2680        Json(serde_json::json!({
2681            "topics": topic_list
2682        }))
2683        .into_response()
2684    } else {
2685        (
2686            StatusCode::SERVICE_UNAVAILABLE,
2687            Json(serde_json::json!({
2688                "error": "Kafka broker not available",
2689                "message": "Kafka broker is not enabled or not available."
2690            })),
2691        )
2692            .into_response()
2693    }
2694}
2695
2696#[cfg(feature = "kafka")]
2697/// Get Kafka topic details
2698async fn get_kafka_topic(
2699    State(state): State<ManagementState>,
2700    Path(topic_name): Path<String>,
2701) -> impl IntoResponse {
2702    if let Some(broker) = &state.kafka_broker {
2703        let topics = broker.topics.read().await;
2704        if let Some(topic) = topics.get(&topic_name) {
2705            Json(serde_json::json!({
2706                "name": topic_name,
2707                "partitions": topic.partitions.len(),
2708                "replication_factor": topic.config.replication_factor,
2709                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2710                    "id": idx as i32,
2711                    "leader": 0,
2712                    "replicas": vec![0],
2713                    "message_count": partition.messages.len()
2714                })).collect::<Vec<_>>()
2715            })).into_response()
2716        } else {
2717            (
2718                StatusCode::NOT_FOUND,
2719                Json(serde_json::json!({
2720                    "error": "Topic not found",
2721                    "topic": topic_name
2722                })),
2723            )
2724                .into_response()
2725        }
2726    } else {
2727        (
2728            StatusCode::SERVICE_UNAVAILABLE,
2729            Json(serde_json::json!({
2730                "error": "Kafka broker not available",
2731                "message": "Kafka broker is not enabled or not available."
2732            })),
2733        )
2734            .into_response()
2735    }
2736}
2737
2738#[cfg(feature = "kafka")]
2739/// List Kafka consumer groups
2740async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2741    if let Some(broker) = &state.kafka_broker {
2742        let consumer_groups = broker.consumer_groups.read().await;
2743        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2744            .groups()
2745            .iter()
2746            .map(|(group_id, group)| KafkaConsumerGroupInfo {
2747                group_id: group_id.clone(),
2748                members: group.members.len(),
2749                state: "Stable".to_string(), // Simplified - could be more detailed
2750            })
2751            .collect();
2752
2753        Json(serde_json::json!({
2754            "groups": groups
2755        }))
2756        .into_response()
2757    } else {
2758        (
2759            StatusCode::SERVICE_UNAVAILABLE,
2760            Json(serde_json::json!({
2761                "error": "Kafka broker not available",
2762                "message": "Kafka broker is not enabled or not available."
2763            })),
2764        )
2765            .into_response()
2766    }
2767}
2768
2769#[cfg(feature = "kafka")]
2770/// Get Kafka consumer group details
2771async fn get_kafka_group(
2772    State(state): State<ManagementState>,
2773    Path(group_id): Path<String>,
2774) -> impl IntoResponse {
2775    if let Some(broker) = &state.kafka_broker {
2776        let consumer_groups = broker.consumer_groups.read().await;
2777        if let Some(group) = consumer_groups.groups().get(&group_id) {
2778            Json(serde_json::json!({
2779                "group_id": group_id,
2780                "members": group.members.len(),
2781                "state": "Stable",
2782                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2783                    "member_id": member_id,
2784                    "client_id": member.client_id,
2785                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
2786                        "topic": a.topic,
2787                        "partitions": a.partitions
2788                    })).collect::<Vec<_>>()
2789                })).collect::<Vec<_>>(),
2790                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2791                    "topic": topic,
2792                    "partition": partition,
2793                    "offset": offset
2794                })).collect::<Vec<_>>()
2795            })).into_response()
2796        } else {
2797            (
2798                StatusCode::NOT_FOUND,
2799                Json(serde_json::json!({
2800                    "error": "Consumer group not found",
2801                    "group_id": group_id
2802                })),
2803            )
2804                .into_response()
2805        }
2806    } else {
2807        (
2808            StatusCode::SERVICE_UNAVAILABLE,
2809            Json(serde_json::json!({
2810                "error": "Kafka broker not available",
2811                "message": "Kafka broker is not enabled or not available."
2812            })),
2813        )
2814            .into_response()
2815    }
2816}
2817
2818// ========== Kafka Produce Handler ==========
2819
2820#[cfg(feature = "kafka")]
2821/// Request body for producing a Kafka message
2822#[derive(Debug, Deserialize)]
2823pub struct KafkaProduceRequest {
2824    /// Topic to produce to
2825    pub topic: String,
2826    /// Message key (optional)
2827    #[serde(default)]
2828    pub key: Option<String>,
2829    /// Message value (JSON string or plain string)
2830    pub value: String,
2831    /// Partition ID (optional, auto-assigned if not provided)
2832    #[serde(default)]
2833    pub partition: Option<i32>,
2834    /// Message headers (optional, key-value pairs)
2835    #[serde(default)]
2836    pub headers: Option<std::collections::HashMap<String, String>>,
2837}
2838
2839#[cfg(feature = "kafka")]
2840/// Produce a message to a Kafka topic
2841async fn produce_kafka_message(
2842    State(state): State<ManagementState>,
2843    Json(request): Json<KafkaProduceRequest>,
2844) -> impl IntoResponse {
2845    if let Some(broker) = &state.kafka_broker {
2846        let mut topics = broker.topics.write().await;
2847
2848        // Get or create the topic
2849        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2850            mockforge_kafka::topics::Topic::new(
2851                request.topic.clone(),
2852                mockforge_kafka::topics::TopicConfig::default(),
2853            )
2854        });
2855
2856        // Determine partition
2857        let partition_id = if let Some(partition) = request.partition {
2858            partition
2859        } else {
2860            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2861        };
2862
2863        // Validate partition exists
2864        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2865            return (
2866                StatusCode::BAD_REQUEST,
2867                Json(serde_json::json!({
2868                    "error": "Invalid partition",
2869                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2870                })),
2871            )
2872                .into_response();
2873        }
2874
2875        // Create the message
2876        let key_clone = request.key.clone();
2877        let headers_clone = request.headers.clone();
2878        let message = mockforge_kafka::partitions::KafkaMessage {
2879            offset: 0, // Will be set by partition.append
2880            timestamp: chrono::Utc::now().timestamp_millis(),
2881            key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2882            value: request.value.as_bytes().to_vec(),
2883            headers: headers_clone
2884                .clone()
2885                .unwrap_or_default()
2886                .into_iter()
2887                .map(|(k, v)| (k, v.as_bytes().to_vec()))
2888                .collect(),
2889        };
2890
2891        // Produce to partition
2892        match topic_entry.produce(partition_id, message).await {
2893            Ok(offset) => {
2894                // Record metrics for successful message production
2895                if let Some(broker) = &state.kafka_broker {
2896                    broker.metrics().record_messages_produced(1);
2897                }
2898
2899                // Emit message event for real-time monitoring
2900                #[cfg(feature = "kafka")]
2901                {
2902                    let event = MessageEvent::Kafka(KafkaMessageEvent {
2903                        topic: request.topic.clone(),
2904                        key: key_clone,
2905                        value: request.value.clone(),
2906                        partition: partition_id,
2907                        offset,
2908                        headers: headers_clone,
2909                        timestamp: chrono::Utc::now().to_rfc3339(),
2910                    });
2911                    let _ = state.message_events.send(event);
2912                }
2913
2914                Json(serde_json::json!({
2915                    "success": true,
2916                    "message": format!("Message produced to topic '{}'", request.topic),
2917                    "topic": request.topic,
2918                    "partition": partition_id,
2919                    "offset": offset
2920                }))
2921                .into_response()
2922            }
2923            Err(e) => (
2924                StatusCode::INTERNAL_SERVER_ERROR,
2925                Json(serde_json::json!({
2926                    "error": "Failed to produce message",
2927                    "message": e.to_string()
2928                })),
2929            )
2930                .into_response(),
2931        }
2932    } else {
2933        (
2934            StatusCode::SERVICE_UNAVAILABLE,
2935            Json(serde_json::json!({
2936                "error": "Kafka broker not available",
2937                "message": "Kafka broker is not enabled or not available."
2938            })),
2939        )
2940            .into_response()
2941    }
2942}
2943
2944#[cfg(feature = "kafka")]
2945/// Request body for producing a batch of Kafka messages
2946#[derive(Debug, Deserialize)]
2947pub struct KafkaBatchProduceRequest {
2948    /// List of messages to produce
2949    pub messages: Vec<KafkaProduceRequest>,
2950    /// Delay between messages in milliseconds
2951    #[serde(default = "default_delay")]
2952    pub delay_ms: u64,
2953}
2954
2955#[cfg(feature = "kafka")]
2956/// Produce multiple messages to Kafka topics
2957async fn produce_kafka_batch(
2958    State(state): State<ManagementState>,
2959    Json(request): Json<KafkaBatchProduceRequest>,
2960) -> impl IntoResponse {
2961    if let Some(broker) = &state.kafka_broker {
2962        if request.messages.is_empty() {
2963            return (
2964                StatusCode::BAD_REQUEST,
2965                Json(serde_json::json!({
2966                    "error": "Empty batch",
2967                    "message": "At least one message is required"
2968                })),
2969            )
2970                .into_response();
2971        }
2972
2973        let mut results = Vec::new();
2974
2975        for (index, msg_request) in request.messages.iter().enumerate() {
2976            let mut topics = broker.topics.write().await;
2977
2978            // Get or create the topic
2979            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2980                mockforge_kafka::topics::Topic::new(
2981                    msg_request.topic.clone(),
2982                    mockforge_kafka::topics::TopicConfig::default(),
2983                )
2984            });
2985
2986            // Determine partition
2987            let partition_id = if let Some(partition) = msg_request.partition {
2988                partition
2989            } else {
2990                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2991            };
2992
2993            // Validate partition exists
2994            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2995                results.push(serde_json::json!({
2996                    "index": index,
2997                    "success": false,
2998                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2999                }));
3000                continue;
3001            }
3002
3003            // Create the message
3004            let message = mockforge_kafka::partitions::KafkaMessage {
3005                offset: 0,
3006                timestamp: chrono::Utc::now().timestamp_millis(),
3007                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
3008                value: msg_request.value.as_bytes().to_vec(),
3009                headers: msg_request
3010                    .headers
3011                    .clone()
3012                    .unwrap_or_default()
3013                    .into_iter()
3014                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
3015                    .collect(),
3016            };
3017
3018            // Produce to partition
3019            match topic_entry.produce(partition_id, message).await {
3020                Ok(offset) => {
3021                    // Record metrics for successful message production
3022                    if let Some(broker) = &state.kafka_broker {
3023                        broker.metrics().record_messages_produced(1);
3024                    }
3025
3026                    // Emit message event
3027                    let event = MessageEvent::Kafka(KafkaMessageEvent {
3028                        topic: msg_request.topic.clone(),
3029                        key: msg_request.key.clone(),
3030                        value: msg_request.value.clone(),
3031                        partition: partition_id,
3032                        offset,
3033                        headers: msg_request.headers.clone(),
3034                        timestamp: chrono::Utc::now().to_rfc3339(),
3035                    });
3036                    let _ = state.message_events.send(event);
3037
3038                    results.push(serde_json::json!({
3039                        "index": index,
3040                        "success": true,
3041                        "topic": msg_request.topic,
3042                        "partition": partition_id,
3043                        "offset": offset
3044                    }));
3045                }
3046                Err(e) => {
3047                    results.push(serde_json::json!({
3048                        "index": index,
3049                        "success": false,
3050                        "error": e.to_string()
3051                    }));
3052                }
3053            }
3054
3055            // Add delay between messages (except for the last one)
3056            if index < request.messages.len() - 1 && request.delay_ms > 0 {
3057                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3058            }
3059        }
3060
3061        let success_count =
3062            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3063
3064        Json(serde_json::json!({
3065            "success": true,
3066            "total": request.messages.len(),
3067            "succeeded": success_count,
3068            "failed": request.messages.len() - success_count,
3069            "results": results
3070        }))
3071        .into_response()
3072    } else {
3073        (
3074            StatusCode::SERVICE_UNAVAILABLE,
3075            Json(serde_json::json!({
3076                "error": "Kafka broker not available",
3077                "message": "Kafka broker is not enabled or not available."
3078            })),
3079        )
3080            .into_response()
3081    }
3082}
3083
3084// ========== Real-time Message Streaming (SSE) ==========
3085
3086#[cfg(feature = "mqtt")]
3087/// SSE stream for MQTT messages
3088async fn mqtt_messages_stream(
3089    State(state): State<ManagementState>,
3090    Query(params): Query<std::collections::HashMap<String, String>>,
3091) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3092    let rx = state.message_events.subscribe();
3093    let topic_filter = params.get("topic").cloned();
3094
3095    let stream = stream::unfold(rx, move |mut rx| {
3096        let topic_filter = topic_filter.clone();
3097
3098        async move {
3099            loop {
3100                match rx.recv().await {
3101                    Ok(MessageEvent::Mqtt(event)) => {
3102                        // Apply topic filter if specified
3103                        if let Some(filter) = &topic_filter {
3104                            if !event.topic.contains(filter) {
3105                                continue;
3106                            }
3107                        }
3108
3109                        let event_json = serde_json::json!({
3110                            "protocol": "mqtt",
3111                            "topic": event.topic,
3112                            "payload": event.payload,
3113                            "qos": event.qos,
3114                            "retain": event.retain,
3115                            "timestamp": event.timestamp,
3116                        });
3117
3118                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3119                            let sse_event = Event::default().event("mqtt_message").data(event_data);
3120                            return Some((Ok(sse_event), rx));
3121                        }
3122                    }
3123                    #[cfg(feature = "kafka")]
3124                    Ok(MessageEvent::Kafka(_)) => {
3125                        // Skip Kafka events in MQTT stream
3126                        continue;
3127                    }
3128                    Err(broadcast::error::RecvError::Closed) => {
3129                        return None;
3130                    }
3131                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3132                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
3133                        continue;
3134                    }
3135                }
3136            }
3137        }
3138    });
3139
3140    Sse::new(stream).keep_alive(
3141        axum::response::sse::KeepAlive::new()
3142            .interval(std::time::Duration::from_secs(15))
3143            .text("keep-alive-text"),
3144    )
3145}
3146
3147#[cfg(feature = "kafka")]
3148/// SSE stream for Kafka messages
3149async fn kafka_messages_stream(
3150    State(state): State<ManagementState>,
3151    Query(params): Query<std::collections::HashMap<String, String>>,
3152) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3153    let rx = state.message_events.subscribe();
3154    let topic_filter = params.get("topic").cloned();
3155
3156    let stream = stream::unfold(rx, move |mut rx| {
3157        let topic_filter = topic_filter.clone();
3158
3159        async move {
3160            loop {
3161                match rx.recv().await {
3162                    #[cfg(feature = "mqtt")]
3163                    Ok(MessageEvent::Mqtt(_)) => {
3164                        // Skip MQTT events in Kafka stream
3165                        continue;
3166                    }
3167                    Ok(MessageEvent::Kafka(event)) => {
3168                        // Apply topic filter if specified
3169                        if let Some(filter) = &topic_filter {
3170                            if !event.topic.contains(filter) {
3171                                continue;
3172                            }
3173                        }
3174
3175                        let event_json = serde_json::json!({
3176                            "protocol": "kafka",
3177                            "topic": event.topic,
3178                            "key": event.key,
3179                            "value": event.value,
3180                            "partition": event.partition,
3181                            "offset": event.offset,
3182                            "headers": event.headers,
3183                            "timestamp": event.timestamp,
3184                        });
3185
3186                        if let Ok(event_data) = serde_json::to_string(&event_json) {
3187                            let sse_event =
3188                                Event::default().event("kafka_message").data(event_data);
3189                            return Some((Ok(sse_event), rx));
3190                        }
3191                    }
3192                    Err(broadcast::error::RecvError::Closed) => {
3193                        return None;
3194                    }
3195                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
3196                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
3197                        continue;
3198                    }
3199                }
3200            }
3201        }
3202    });
3203
3204    Sse::new(stream).keep_alive(
3205        axum::response::sse::KeepAlive::new()
3206            .interval(std::time::Duration::from_secs(15))
3207            .text("keep-alive-text"),
3208    )
3209}
3210
3211// ========== AI-Powered Features ==========
3212
3213/// Request for AI-powered API specification generation
3214#[derive(Debug, Deserialize)]
3215pub struct GenerateSpecRequest {
3216    /// Natural language description of the API to generate
3217    pub query: String,
3218    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
3219    pub spec_type: String,
3220    /// Optional API version (e.g., "3.0.0" for OpenAPI)
3221    pub api_version: Option<String>,
3222}
3223
3224/// Request for OpenAPI generation from recorded traffic
3225#[derive(Debug, Deserialize)]
3226pub struct GenerateOpenApiFromTrafficRequest {
3227    /// Path to recorder database (optional, defaults to ./recordings.db)
3228    #[serde(default)]
3229    pub database_path: Option<String>,
3230    /// Start time for filtering (ISO 8601 format, e.g., 2025-01-01T00:00:00Z)
3231    #[serde(default)]
3232    pub since: Option<String>,
3233    /// End time for filtering (ISO 8601 format)
3234    #[serde(default)]
3235    pub until: Option<String>,
3236    /// Path pattern filter (supports wildcards, e.g., /api/*)
3237    #[serde(default)]
3238    pub path_pattern: Option<String>,
3239    /// Minimum confidence score for including paths (0.0 to 1.0)
3240    #[serde(default = "default_min_confidence")]
3241    pub min_confidence: f64,
3242}
3243
3244fn default_min_confidence() -> f64 {
3245    0.7
3246}
3247
3248/// Generate API specification from natural language using AI
3249#[cfg(feature = "data-faker")]
3250async fn generate_ai_spec(
3251    State(_state): State<ManagementState>,
3252    Json(request): Json<GenerateSpecRequest>,
3253) -> impl IntoResponse {
3254    use mockforge_data::rag::{
3255        config::{LlmProvider, RagConfig},
3256        engine::RagEngine,
3257        storage::DocumentStorage,
3258    };
3259    use std::sync::Arc;
3260
3261    // Build RAG config from environment variables
3262    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3263        .ok()
3264        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3265
3266    // Check if RAG is configured - require API key
3267    if api_key.is_none() {
3268        return (
3269            StatusCode::SERVICE_UNAVAILABLE,
3270            Json(serde_json::json!({
3271                "error": "AI service not configured",
3272                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3273            })),
3274        )
3275            .into_response();
3276    }
3277
3278    // Build RAG configuration
3279    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3280        .unwrap_or_else(|_| "openai".to_string())
3281        .to_lowercase();
3282
3283    let provider = match provider_str.as_str() {
3284        "openai" => LlmProvider::OpenAI,
3285        "anthropic" => LlmProvider::Anthropic,
3286        "ollama" => LlmProvider::Ollama,
3287        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3288        _ => LlmProvider::OpenAI,
3289    };
3290
3291    let api_endpoint =
3292        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3293            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3294            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3295            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3296            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3297        });
3298
3299    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3300        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3301        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3302        LlmProvider::Ollama => "llama2".to_string(),
3303        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3304    });
3305
3306    // Build RagConfig using struct literal with defaults
3307    let rag_config = RagConfig {
3308        provider,
3309        api_endpoint,
3310        api_key,
3311        model,
3312        max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3313            .unwrap_or_else(|_| "4096".to_string())
3314            .parse()
3315            .unwrap_or(4096),
3316        temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3317            .unwrap_or_else(|_| "0.3".to_string())
3318            .parse()
3319            .unwrap_or(0.3), // Lower temperature for more structured output
3320        timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3321            .unwrap_or_else(|_| "60".to_string())
3322            .parse()
3323            .unwrap_or(60),
3324        max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3325            .unwrap_or_else(|_| "4000".to_string())
3326            .parse()
3327            .unwrap_or(4000),
3328        ..Default::default()
3329    };
3330
3331    // Build the prompt for spec generation
3332    let spec_type_label = match request.spec_type.as_str() {
3333        "openapi" => "OpenAPI 3.0",
3334        "graphql" => "GraphQL",
3335        "asyncapi" => "AsyncAPI",
3336        _ => "OpenAPI 3.0",
3337    };
3338
3339    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3340
3341    let prompt = format!(
3342        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3343
3344User Requirements:
3345{}
3346
3347Instructions:
33481. Generate a complete, valid {} specification
33492. Include all paths, operations, request/response schemas, and components
33503. Use realistic field names and data types
33514. Include proper descriptions and examples
33525. Follow {} best practices
33536. Return ONLY the specification, no additional explanation
33547. For OpenAPI, use version {}
3355
3356Return the specification in {} format."#,
3357        spec_type_label,
3358        request.query,
3359        spec_type_label,
3360        spec_type_label,
3361        api_version,
3362        if request.spec_type == "graphql" {
3363            "GraphQL SDL"
3364        } else {
3365            "YAML"
3366        }
3367    );
3368
3369    // Create in-memory storage for RAG engine
3370    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
3371    // We need to use unsafe transmute or create a wrapper, but for now we'll use
3372    // a simpler approach: create InMemoryStorage directly
3373    use mockforge_data::rag::storage::InMemoryStorage;
3374    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3375
3376    // Create RAG engine
3377    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3378        Ok(engine) => engine,
3379        Err(e) => {
3380            return (
3381                StatusCode::INTERNAL_SERVER_ERROR,
3382                Json(serde_json::json!({
3383                    "error": "Failed to initialize RAG engine",
3384                    "message": e.to_string()
3385                })),
3386            )
3387                .into_response();
3388        }
3389    };
3390
3391    // Generate using RAG engine
3392    match rag_engine.generate(&prompt, None).await {
3393        Ok(generated_text) => {
3394            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
3395            let spec = if request.spec_type == "graphql" {
3396                // For GraphQL, extract SDL
3397                extract_graphql_schema(&generated_text)
3398            } else {
3399                // For OpenAPI/AsyncAPI, extract YAML
3400                extract_yaml_spec(&generated_text)
3401            };
3402
3403            Json(serde_json::json!({
3404                "success": true,
3405                "spec": spec,
3406                "spec_type": request.spec_type,
3407            }))
3408            .into_response()
3409        }
3410        Err(e) => (
3411            StatusCode::INTERNAL_SERVER_ERROR,
3412            Json(serde_json::json!({
3413                "error": "AI generation failed",
3414                "message": e.to_string()
3415            })),
3416        )
3417            .into_response(),
3418    }
3419}
3420
3421#[cfg(not(feature = "data-faker"))]
3422async fn generate_ai_spec(
3423    State(_state): State<ManagementState>,
3424    Json(_request): Json<GenerateSpecRequest>,
3425) -> impl IntoResponse {
3426    (
3427        StatusCode::NOT_IMPLEMENTED,
3428        Json(serde_json::json!({
3429            "error": "AI features not enabled",
3430            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3431        })),
3432    )
3433        .into_response()
3434}
3435
3436/// Generate OpenAPI specification from recorded traffic
3437#[cfg(feature = "behavioral-cloning")]
3438async fn generate_openapi_from_traffic(
3439    State(_state): State<ManagementState>,
3440    Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3441) -> impl IntoResponse {
3442    use chrono::{DateTime, Utc};
3443    use mockforge_core::intelligent_behavior::{
3444        openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3445        IntelligentBehaviorConfig,
3446    };
3447    use mockforge_recorder::{
3448        database::RecorderDatabase,
3449        openapi_export::{QueryFilters, RecordingsToOpenApi},
3450    };
3451    use std::path::PathBuf;
3452
3453    // Determine database path
3454    let db_path = if let Some(ref path) = request.database_path {
3455        PathBuf::from(path)
3456    } else {
3457        std::env::current_dir()
3458            .unwrap_or_else(|_| PathBuf::from("."))
3459            .join("recordings.db")
3460    };
3461
3462    // Open database
3463    let db = match RecorderDatabase::new(&db_path).await {
3464        Ok(db) => db,
3465        Err(e) => {
3466            return (
3467                StatusCode::BAD_REQUEST,
3468                Json(serde_json::json!({
3469                    "error": "Database error",
3470                    "message": format!("Failed to open recorder database: {}", e)
3471                })),
3472            )
3473                .into_response();
3474        }
3475    };
3476
3477    // Parse time filters
3478    let since_dt = if let Some(ref since_str) = request.since {
3479        match DateTime::parse_from_rfc3339(since_str) {
3480            Ok(dt) => Some(dt.with_timezone(&Utc)),
3481            Err(e) => {
3482                return (
3483                    StatusCode::BAD_REQUEST,
3484                    Json(serde_json::json!({
3485                        "error": "Invalid date format",
3486                        "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3487                    })),
3488                )
3489                    .into_response();
3490            }
3491        }
3492    } else {
3493        None
3494    };
3495
3496    let until_dt = if let Some(ref until_str) = request.until {
3497        match DateTime::parse_from_rfc3339(until_str) {
3498            Ok(dt) => Some(dt.with_timezone(&Utc)),
3499            Err(e) => {
3500                return (
3501                    StatusCode::BAD_REQUEST,
3502                    Json(serde_json::json!({
3503                        "error": "Invalid date format",
3504                        "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3505                    })),
3506                )
3507                    .into_response();
3508            }
3509        }
3510    } else {
3511        None
3512    };
3513
3514    // Build query filters
3515    let query_filters = QueryFilters {
3516        since: since_dt,
3517        until: until_dt,
3518        path_pattern: request.path_pattern.clone(),
3519        min_status_code: None,
3520        max_requests: Some(1000),
3521    };
3522
3523    // Query HTTP exchanges
3524    // Note: We need to convert from mockforge-recorder's HttpExchange to mockforge-core's HttpExchange
3525    // to avoid version mismatch issues. The converter returns the version from mockforge-recorder's
3526    // dependency, so we need to manually convert to the local version.
3527    let exchanges_from_recorder =
3528        match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3529            Ok(exchanges) => exchanges,
3530            Err(e) => {
3531                return (
3532                    StatusCode::INTERNAL_SERVER_ERROR,
3533                    Json(serde_json::json!({
3534                        "error": "Query error",
3535                        "message": format!("Failed to query HTTP exchanges: {}", e)
3536                    })),
3537                )
3538                    .into_response();
3539            }
3540        };
3541
3542    if exchanges_from_recorder.is_empty() {
3543        return (
3544            StatusCode::NOT_FOUND,
3545            Json(serde_json::json!({
3546                "error": "No exchanges found",
3547                "message": "No HTTP exchanges found matching the specified filters"
3548            })),
3549        )
3550            .into_response();
3551    }
3552
3553    // Convert to local HttpExchange type to avoid version mismatch
3554    use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3555    let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3556        .into_iter()
3557        .map(|e| LocalHttpExchange {
3558            method: e.method,
3559            path: e.path,
3560            query_params: e.query_params,
3561            headers: e.headers,
3562            body: e.body,
3563            body_encoding: e.body_encoding,
3564            status_code: e.status_code,
3565            response_headers: e.response_headers,
3566            response_body: e.response_body,
3567            response_body_encoding: e.response_body_encoding,
3568            timestamp: e.timestamp,
3569        })
3570        .collect();
3571
3572    // Create OpenAPI generator config
3573    let behavior_config = IntelligentBehaviorConfig::default();
3574    let gen_config = OpenApiGenerationConfig {
3575        min_confidence: request.min_confidence,
3576        behavior_model: Some(behavior_config.behavior_model),
3577    };
3578
3579    // Generate OpenAPI spec
3580    let generator = OpenApiSpecGenerator::new(gen_config);
3581    let result = match generator.generate_from_exchanges(exchanges).await {
3582        Ok(result) => result,
3583        Err(e) => {
3584            return (
3585                StatusCode::INTERNAL_SERVER_ERROR,
3586                Json(serde_json::json!({
3587                    "error": "Generation error",
3588                    "message": format!("Failed to generate OpenAPI spec: {}", e)
3589                })),
3590            )
3591                .into_response();
3592        }
3593    };
3594
3595    // Prepare response
3596    let spec_json = if let Some(ref raw) = result.spec.raw_document {
3597        raw.clone()
3598    } else {
3599        match serde_json::to_value(&result.spec.spec) {
3600            Ok(json) => json,
3601            Err(e) => {
3602                return (
3603                    StatusCode::INTERNAL_SERVER_ERROR,
3604                    Json(serde_json::json!({
3605                        "error": "Serialization error",
3606                        "message": format!("Failed to serialize OpenAPI spec: {}", e)
3607                    })),
3608                )
3609                    .into_response();
3610            }
3611        }
3612    };
3613
3614    // Build response with metadata
3615    let response = serde_json::json!({
3616        "spec": spec_json,
3617        "metadata": {
3618            "requests_analyzed": result.metadata.requests_analyzed,
3619            "paths_inferred": result.metadata.paths_inferred,
3620            "path_confidence": result.metadata.path_confidence,
3621            "generated_at": result.metadata.generated_at.to_rfc3339(),
3622            "duration_ms": result.metadata.duration_ms,
3623        }
3624    });
3625
3626    Json(response).into_response()
3627}
3628
3629/// List all rule explanations
3630async fn list_rule_explanations(
3631    State(state): State<ManagementState>,
3632    Query(params): Query<std::collections::HashMap<String, String>>,
3633) -> impl IntoResponse {
3634    use mockforge_core::intelligent_behavior::RuleType;
3635
3636    let explanations = state.rule_explanations.read().await;
3637    let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3638
3639    // Filter by rule type if provided
3640    if let Some(rule_type_str) = params.get("rule_type") {
3641        if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3642            explanations_vec.retain(|e| e.rule_type == rule_type);
3643        }
3644    }
3645
3646    // Filter by minimum confidence if provided
3647    if let Some(min_confidence_str) = params.get("min_confidence") {
3648        if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3649            explanations_vec.retain(|e| e.confidence >= min_confidence);
3650        }
3651    }
3652
3653    // Sort by confidence (descending) and then by generated_at (descending)
3654    explanations_vec.sort_by(|a, b| {
3655        b.confidence
3656            .partial_cmp(&a.confidence)
3657            .unwrap_or(std::cmp::Ordering::Equal)
3658            .then_with(|| b.generated_at.cmp(&a.generated_at))
3659    });
3660
3661    Json(serde_json::json!({
3662        "explanations": explanations_vec,
3663        "total": explanations_vec.len(),
3664    }))
3665    .into_response()
3666}
3667
3668/// Get a specific rule explanation by ID
3669async fn get_rule_explanation(
3670    State(state): State<ManagementState>,
3671    Path(rule_id): Path<String>,
3672) -> impl IntoResponse {
3673    let explanations = state.rule_explanations.read().await;
3674
3675    match explanations.get(&rule_id) {
3676        Some(explanation) => Json(serde_json::json!({
3677            "explanation": explanation,
3678        }))
3679        .into_response(),
3680        None => (
3681            StatusCode::NOT_FOUND,
3682            Json(serde_json::json!({
3683                "error": "Rule explanation not found",
3684                "message": format!("No explanation found for rule ID: {}", rule_id)
3685            })),
3686        )
3687            .into_response(),
3688    }
3689}
3690
3691/// Request for learning from examples
3692#[derive(Debug, Deserialize)]
3693pub struct LearnFromExamplesRequest {
3694    /// Example request/response pairs to learn from
3695    pub examples: Vec<ExamplePairRequest>,
3696    /// Optional configuration override
3697    #[serde(default)]
3698    pub config: Option<serde_json::Value>,
3699}
3700
3701/// Example pair request format
3702#[derive(Debug, Deserialize)]
3703pub struct ExamplePairRequest {
3704    /// Request data (method, path, body, etc.)
3705    pub request: serde_json::Value,
3706    /// Response data (status_code, body, etc.)
3707    pub response: serde_json::Value,
3708}
3709
3710/// Learn behavioral rules from example pairs
3711///
3712/// This endpoint accepts example request/response pairs, generates behavioral rules
3713/// with explanations, and stores the explanations for later retrieval.
3714async fn learn_from_examples(
3715    State(state): State<ManagementState>,
3716    Json(request): Json<LearnFromExamplesRequest>,
3717) -> impl IntoResponse {
3718    use mockforge_core::intelligent_behavior::{
3719        config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3720        rule_generator::{ExamplePair, RuleGenerator},
3721    };
3722
3723    if request.examples.is_empty() {
3724        return (
3725            StatusCode::BAD_REQUEST,
3726            Json(serde_json::json!({
3727                "error": "No examples provided",
3728                "message": "At least one example pair is required"
3729            })),
3730        )
3731            .into_response();
3732    }
3733
3734    // Convert request examples to ExamplePair format
3735    let example_pairs: Result<Vec<ExamplePair>, String> = request
3736        .examples
3737        .into_iter()
3738        .enumerate()
3739        .map(|(idx, ex)| {
3740            // Parse request JSON to extract method, path, body, etc.
3741            let method = ex
3742                .request
3743                .get("method")
3744                .and_then(|v| v.as_str())
3745                .map(|s| s.to_string())
3746                .unwrap_or_else(|| "GET".to_string());
3747            let path = ex
3748                .request
3749                .get("path")
3750                .and_then(|v| v.as_str())
3751                .map(|s| s.to_string())
3752                .unwrap_or_else(|| "/".to_string());
3753            let request_body = ex.request.get("body").cloned();
3754            let query_params = ex
3755                .request
3756                .get("query_params")
3757                .and_then(|v| v.as_object())
3758                .map(|obj| {
3759                    obj.iter()
3760                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3761                        .collect()
3762                })
3763                .unwrap_or_default();
3764            let headers = ex
3765                .request
3766                .get("headers")
3767                .and_then(|v| v.as_object())
3768                .map(|obj| {
3769                    obj.iter()
3770                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3771                        .collect()
3772                })
3773                .unwrap_or_default();
3774
3775            // Parse response JSON to extract status, body, etc.
3776            let status = ex
3777                .response
3778                .get("status_code")
3779                .or_else(|| ex.response.get("status"))
3780                .and_then(|v| v.as_u64())
3781                .map(|n| n as u16)
3782                .unwrap_or(200);
3783            let response_body = ex.response.get("body").cloned();
3784
3785            Ok(ExamplePair {
3786                method,
3787                path,
3788                request: request_body,
3789                status,
3790                response: response_body,
3791                query_params,
3792                headers,
3793                metadata: {
3794                    let mut meta = std::collections::HashMap::new();
3795                    meta.insert("source".to_string(), "api".to_string());
3796                    meta.insert("example_index".to_string(), idx.to_string());
3797                    meta
3798                },
3799            })
3800        })
3801        .collect();
3802
3803    let example_pairs = match example_pairs {
3804        Ok(pairs) => pairs,
3805        Err(e) => {
3806            return (
3807                StatusCode::BAD_REQUEST,
3808                Json(serde_json::json!({
3809                    "error": "Invalid examples",
3810                    "message": e
3811                })),
3812            )
3813                .into_response();
3814        }
3815    };
3816
3817    // Create behavior config (use provided config or default)
3818    let behavior_config = if let Some(config_json) = request.config {
3819        // Try to deserialize custom config, fall back to default
3820        serde_json::from_value(config_json)
3821            .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3822            .behavior_model
3823    } else {
3824        BehaviorModelConfig::default()
3825    };
3826
3827    // Create rule generator
3828    let generator = RuleGenerator::new(behavior_config);
3829
3830    // Generate rules with explanations
3831    let (rules, explanations) =
3832        match generator.generate_rules_with_explanations(example_pairs).await {
3833            Ok(result) => result,
3834            Err(e) => {
3835                return (
3836                    StatusCode::INTERNAL_SERVER_ERROR,
3837                    Json(serde_json::json!({
3838                        "error": "Rule generation failed",
3839                        "message": format!("Failed to generate rules: {}", e)
3840                    })),
3841                )
3842                    .into_response();
3843            }
3844        };
3845
3846    // Store explanations in ManagementState
3847    {
3848        let mut stored_explanations = state.rule_explanations.write().await;
3849        for explanation in &explanations {
3850            stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3851        }
3852    }
3853
3854    // Prepare response
3855    let response = serde_json::json!({
3856        "success": true,
3857        "rules_generated": {
3858            "consistency_rules": rules.consistency_rules.len(),
3859            "schemas": rules.schemas.len(),
3860            "state_machines": rules.state_transitions.len(),
3861            "system_prompt": !rules.system_prompt.is_empty(),
3862        },
3863        "explanations": explanations.iter().map(|e| serde_json::json!({
3864            "rule_id": e.rule_id,
3865            "rule_type": e.rule_type,
3866            "confidence": e.confidence,
3867            "reasoning": e.reasoning,
3868        })).collect::<Vec<_>>(),
3869        "total_explanations": explanations.len(),
3870    });
3871
3872    Json(response).into_response()
3873}
3874
3875#[cfg(feature = "data-faker")]
3876fn extract_yaml_spec(text: &str) -> String {
3877    // Try to find YAML code blocks
3878    if let Some(start) = text.find("```yaml") {
3879        let yaml_start = text[start + 7..].trim_start();
3880        if let Some(end) = yaml_start.find("```") {
3881            return yaml_start[..end].trim().to_string();
3882        }
3883    }
3884    if let Some(start) = text.find("```") {
3885        let content_start = text[start + 3..].trim_start();
3886        if let Some(end) = content_start.find("```") {
3887            return content_start[..end].trim().to_string();
3888        }
3889    }
3890
3891    // Check if it starts with openapi: or asyncapi:
3892    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3893        return text.trim().to_string();
3894    }
3895
3896    // Return as-is if no code blocks found
3897    text.trim().to_string()
3898}
3899
3900/// Extract GraphQL schema from text content
3901#[cfg(feature = "data-faker")]
3902fn extract_graphql_schema(text: &str) -> String {
3903    // Try to find GraphQL code blocks
3904    if let Some(start) = text.find("```graphql") {
3905        let schema_start = text[start + 10..].trim_start();
3906        if let Some(end) = schema_start.find("```") {
3907            return schema_start[..end].trim().to_string();
3908        }
3909    }
3910    if let Some(start) = text.find("```") {
3911        let content_start = text[start + 3..].trim_start();
3912        if let Some(end) = content_start.find("```") {
3913            return content_start[..end].trim().to_string();
3914        }
3915    }
3916
3917    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
3918    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3919        return text.trim().to_string();
3920    }
3921
3922    text.trim().to_string()
3923}
3924
3925// ========== Chaos Engineering Management ==========
3926
3927/// Get current chaos engineering configuration
3928async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3929    #[cfg(feature = "chaos")]
3930    {
3931        if let Some(chaos_state) = &_state.chaos_api_state {
3932            let config = chaos_state.config.read().await;
3933            // Convert ChaosConfig to JSON response format
3934            Json(serde_json::json!({
3935                "enabled": config.enabled,
3936                "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3937                "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3938                "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3939                "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3940            }))
3941            .into_response()
3942        } else {
3943            // Chaos API not available, return default
3944            Json(serde_json::json!({
3945                "enabled": false,
3946                "latency": null,
3947                "fault_injection": null,
3948                "rate_limit": null,
3949                "traffic_shaping": null,
3950            }))
3951            .into_response()
3952        }
3953    }
3954    #[cfg(not(feature = "chaos"))]
3955    {
3956        // Chaos feature not enabled
3957        Json(serde_json::json!({
3958            "enabled": false,
3959            "latency": null,
3960            "fault_injection": null,
3961            "rate_limit": null,
3962            "traffic_shaping": null,
3963        }))
3964        .into_response()
3965    }
3966}
3967
3968/// Request to update chaos configuration
3969#[derive(Debug, Deserialize)]
3970pub struct ChaosConfigUpdate {
3971    /// Whether to enable chaos engineering
3972    pub enabled: Option<bool>,
3973    /// Latency configuration
3974    pub latency: Option<serde_json::Value>,
3975    /// Fault injection configuration
3976    pub fault_injection: Option<serde_json::Value>,
3977    /// Rate limiting configuration
3978    pub rate_limit: Option<serde_json::Value>,
3979    /// Traffic shaping configuration
3980    pub traffic_shaping: Option<serde_json::Value>,
3981}
3982
3983/// Update chaos engineering configuration
3984async fn update_chaos_config(
3985    State(_state): State<ManagementState>,
3986    Json(_config_update): Json<ChaosConfigUpdate>,
3987) -> impl IntoResponse {
3988    #[cfg(feature = "chaos")]
3989    {
3990        if let Some(chaos_state) = &_state.chaos_api_state {
3991            use mockforge_chaos::config::{
3992                FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3993            };
3994
3995            let mut config = chaos_state.config.write().await;
3996
3997            // Update enabled flag if provided
3998            if let Some(enabled) = _config_update.enabled {
3999                config.enabled = enabled;
4000            }
4001
4002            // Update latency config if provided
4003            if let Some(latency_json) = _config_update.latency {
4004                if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
4005                    config.latency = Some(latency);
4006                }
4007            }
4008
4009            // Update fault injection config if provided
4010            if let Some(fault_json) = _config_update.fault_injection {
4011                if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
4012                    config.fault_injection = Some(fault);
4013                }
4014            }
4015
4016            // Update rate limit config if provided
4017            if let Some(rate_json) = _config_update.rate_limit {
4018                if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
4019                    config.rate_limit = Some(rate);
4020                }
4021            }
4022
4023            // Update traffic shaping config if provided
4024            if let Some(traffic_json) = _config_update.traffic_shaping {
4025                if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
4026                    config.traffic_shaping = Some(traffic);
4027                }
4028            }
4029
4030            // Reinitialize middleware injectors with new config
4031            // The middleware will pick up the changes on the next request
4032            drop(config);
4033
4034            info!("Chaos configuration updated successfully");
4035            Json(serde_json::json!({
4036                "success": true,
4037                "message": "Chaos configuration updated and applied"
4038            }))
4039            .into_response()
4040        } else {
4041            (
4042                StatusCode::SERVICE_UNAVAILABLE,
4043                Json(serde_json::json!({
4044                    "success": false,
4045                    "error": "Chaos API not available",
4046                    "message": "Chaos engineering is not enabled or configured"
4047                })),
4048            )
4049                .into_response()
4050        }
4051    }
4052    #[cfg(not(feature = "chaos"))]
4053    {
4054        (
4055            StatusCode::NOT_IMPLEMENTED,
4056            Json(serde_json::json!({
4057                "success": false,
4058                "error": "Chaos feature not enabled",
4059                "message": "Chaos engineering feature is not compiled into this build"
4060            })),
4061        )
4062            .into_response()
4063    }
4064}
4065
4066// ========== Network Profile Management ==========
4067
4068/// List available network profiles
4069async fn list_network_profiles() -> impl IntoResponse {
4070    use mockforge_core::network_profiles::NetworkProfileCatalog;
4071
4072    let catalog = NetworkProfileCatalog::default();
4073    let profiles: Vec<serde_json::Value> = catalog
4074        .list_profiles_with_description()
4075        .iter()
4076        .map(|(name, description)| {
4077            serde_json::json!({
4078                "name": name,
4079                "description": description,
4080            })
4081        })
4082        .collect();
4083
4084    Json(serde_json::json!({
4085        "profiles": profiles
4086    }))
4087    .into_response()
4088}
4089
4090#[derive(Debug, Deserialize)]
4091/// Request to apply a network profile
4092pub struct ApplyNetworkProfileRequest {
4093    /// Name of the network profile to apply
4094    pub profile_name: String,
4095}
4096
4097/// Apply a network profile
4098async fn apply_network_profile(
4099    State(state): State<ManagementState>,
4100    Json(request): Json<ApplyNetworkProfileRequest>,
4101) -> impl IntoResponse {
4102    use mockforge_core::network_profiles::NetworkProfileCatalog;
4103
4104    let catalog = NetworkProfileCatalog::default();
4105    if let Some(profile) = catalog.get(&request.profile_name) {
4106        // Apply profile to server configuration if available
4107        // NetworkProfile contains latency and traffic_shaping configs
4108        if let Some(server_config) = &state.server_config {
4109            let mut config = server_config.write().await;
4110
4111            // Apply network profile's traffic shaping to core config
4112            use mockforge_core::config::NetworkShapingConfig;
4113
4114            // Convert NetworkProfile's TrafficShapingConfig to NetworkShapingConfig
4115            // NetworkProfile uses mockforge_core::traffic_shaping::TrafficShapingConfig
4116            // which has bandwidth and burst_loss fields
4117            let network_shaping = NetworkShapingConfig {
4118                enabled: profile.traffic_shaping.bandwidth.enabled
4119                    || profile.traffic_shaping.burst_loss.enabled,
4120                bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4121                packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4122                max_connections: 1000, // Default value
4123            };
4124
4125            // Update chaos config if it exists, or create it
4126            // Chaos config is in observability.chaos, not core.chaos
4127            if let Some(ref mut chaos) = config.observability.chaos {
4128                chaos.traffic_shaping = Some(network_shaping);
4129            } else {
4130                // Create minimal chaos config with traffic shaping
4131                use mockforge_core::config::ChaosEngConfig;
4132                config.observability.chaos = Some(ChaosEngConfig {
4133                    enabled: true,
4134                    latency: None,
4135                    fault_injection: None,
4136                    rate_limit: None,
4137                    traffic_shaping: Some(network_shaping),
4138                    scenario: None,
4139                });
4140            }
4141
4142            info!("Network profile '{}' applied to server configuration", request.profile_name);
4143        } else {
4144            warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4145        }
4146
4147        // Also update chaos API state if available
4148        #[cfg(feature = "chaos")]
4149        {
4150            if let Some(chaos_state) = &state.chaos_api_state {
4151                use mockforge_chaos::config::TrafficShapingConfig;
4152
4153                let mut chaos_config = chaos_state.config.write().await;
4154                // Apply profile's traffic shaping to chaos API state
4155                let chaos_traffic_shaping = TrafficShapingConfig {
4156                    enabled: profile.traffic_shaping.bandwidth.enabled
4157                        || profile.traffic_shaping.burst_loss.enabled,
4158                    bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, // Convert bytes to bits
4159                    packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4160                    max_connections: 0,
4161                    connection_timeout_ms: 30000,
4162                };
4163                chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4164                chaos_config.enabled = true; // Enable chaos when applying a profile
4165                drop(chaos_config);
4166                info!("Network profile '{}' applied to chaos API state", request.profile_name);
4167            }
4168        }
4169
4170        Json(serde_json::json!({
4171            "success": true,
4172            "message": format!("Network profile '{}' applied", request.profile_name),
4173            "profile": {
4174                "name": profile.name,
4175                "description": profile.description,
4176            }
4177        }))
4178        .into_response()
4179    } else {
4180        (
4181            StatusCode::NOT_FOUND,
4182            Json(serde_json::json!({
4183                "error": "Profile not found",
4184                "message": format!("Network profile '{}' not found", request.profile_name)
4185            })),
4186        )
4187            .into_response()
4188    }
4189}
4190
4191/// Build the management API router with UI Builder support
4192pub fn management_router_with_ui_builder(
4193    state: ManagementState,
4194    server_config: mockforge_core::config::ServerConfig,
4195) -> Router {
4196    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4197
4198    // Create the base management router
4199    let management = management_router(state);
4200
4201    // Create UI Builder state and router
4202    let ui_builder_state = UIBuilderState::new(server_config);
4203    let ui_builder = create_ui_builder_router(ui_builder_state);
4204
4205    // Nest UI Builder under /ui-builder
4206    management.nest("/ui-builder", ui_builder)
4207}
4208
4209/// Build management router with spec import API
4210pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4211    use crate::spec_import::{spec_import_router, SpecImportState};
4212
4213    // Create base management router
4214    let management = management_router(state);
4215
4216    // Merge with spec import router
4217    Router::new()
4218        .merge(management)
4219        .merge(spec_import_router(SpecImportState::new()))
4220}
4221
4222#[cfg(test)]
4223mod tests {
4224    use super::*;
4225
4226    #[tokio::test]
4227    async fn test_create_and_get_mock() {
4228        let state = ManagementState::new(None, None, 3000);
4229
4230        let mock = MockConfig {
4231            id: "test-1".to_string(),
4232            name: "Test Mock".to_string(),
4233            method: "GET".to_string(),
4234            path: "/test".to_string(),
4235            response: MockResponse {
4236                body: serde_json::json!({"message": "test"}),
4237                headers: None,
4238            },
4239            enabled: true,
4240            latency_ms: None,
4241            status_code: Some(200),
4242            request_match: None,
4243            priority: None,
4244            scenario: None,
4245            required_scenario_state: None,
4246            new_scenario_state: None,
4247            version: 1,
4248        };
4249
4250        // Create mock
4251        {
4252            let mut mocks = state.mocks.write().await;
4253            mocks.push(mock.clone());
4254        }
4255
4256        // Get mock
4257        let mocks = state.mocks.read().await;
4258        let found = mocks.iter().find(|m| m.id == "test-1");
4259        assert!(found.is_some());
4260        assert_eq!(found.unwrap().name, "Test Mock");
4261    }
4262
4263    #[tokio::test]
4264    async fn test_server_stats() {
4265        let state = ManagementState::new(None, None, 3000);
4266
4267        // Add some mocks
4268        {
4269            let mut mocks = state.mocks.write().await;
4270            mocks.push(MockConfig {
4271                id: "1".to_string(),
4272                name: "Mock 1".to_string(),
4273                method: "GET".to_string(),
4274                path: "/test1".to_string(),
4275                response: MockResponse {
4276                    body: serde_json::json!({}),
4277                    headers: None,
4278                },
4279                enabled: true,
4280                latency_ms: None,
4281                status_code: Some(200),
4282                request_match: None,
4283                priority: None,
4284                scenario: None,
4285                required_scenario_state: None,
4286                new_scenario_state: None,
4287                version: 1,
4288            });
4289            mocks.push(MockConfig {
4290                id: "2".to_string(),
4291                name: "Mock 2".to_string(),
4292                method: "POST".to_string(),
4293                path: "/test2".to_string(),
4294                response: MockResponse {
4295                    body: serde_json::json!({}),
4296                    headers: None,
4297                },
4298                enabled: false,
4299                latency_ms: None,
4300                status_code: Some(201),
4301                request_match: None,
4302                priority: None,
4303                scenario: None,
4304                required_scenario_state: None,
4305                new_scenario_state: None,
4306                version: 1,
4307            });
4308        }
4309
4310        let mocks = state.mocks.read().await;
4311        assert_eq!(mocks.len(), 2);
4312        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4313    }
4314
4315    #[test]
4316    fn test_mock_matches_request_with_xpath_absolute_path() {
4317        let mock = MockConfig {
4318            id: "xpath-1".to_string(),
4319            name: "XPath Match".to_string(),
4320            method: "POST".to_string(),
4321            path: "/xml".to_string(),
4322            response: MockResponse {
4323                body: serde_json::json!({"ok": true}),
4324                headers: None,
4325            },
4326            enabled: true,
4327            latency_ms: None,
4328            status_code: Some(200),
4329            request_match: Some(RequestMatchCriteria {
4330                xpath: Some("/root/order/id".to_string()),
4331                ..Default::default()
4332            }),
4333            priority: None,
4334            scenario: None,
4335            required_scenario_state: None,
4336            new_scenario_state: None,
4337            version: 1,
4338        };
4339
4340        let body = br#"<root><order><id>123</id></order></root>"#;
4341        let headers = std::collections::HashMap::new();
4342        let query = std::collections::HashMap::new();
4343
4344        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4345    }
4346
4347    #[test]
4348    fn test_mock_matches_request_with_xpath_text_predicate() {
4349        let mock = MockConfig {
4350            id: "xpath-2".to_string(),
4351            name: "XPath Predicate Match".to_string(),
4352            method: "POST".to_string(),
4353            path: "/xml".to_string(),
4354            response: MockResponse {
4355                body: serde_json::json!({"ok": true}),
4356                headers: None,
4357            },
4358            enabled: true,
4359            latency_ms: None,
4360            status_code: Some(200),
4361            request_match: Some(RequestMatchCriteria {
4362                xpath: Some("//order/id[text()='123']".to_string()),
4363                ..Default::default()
4364            }),
4365            priority: None,
4366            scenario: None,
4367            required_scenario_state: None,
4368            new_scenario_state: None,
4369            version: 1,
4370        };
4371
4372        let body = br#"<root><order><id>123</id></order></root>"#;
4373        let headers = std::collections::HashMap::new();
4374        let query = std::collections::HashMap::new();
4375
4376        assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4377    }
4378
4379    #[test]
4380    fn test_mock_matches_request_with_xpath_no_match() {
4381        let mock = MockConfig {
4382            id: "xpath-3".to_string(),
4383            name: "XPath No Match".to_string(),
4384            method: "POST".to_string(),
4385            path: "/xml".to_string(),
4386            response: MockResponse {
4387                body: serde_json::json!({"ok": true}),
4388                headers: None,
4389            },
4390            enabled: true,
4391            latency_ms: None,
4392            status_code: Some(200),
4393            request_match: Some(RequestMatchCriteria {
4394                xpath: Some("//order/id[text()='456']".to_string()),
4395                ..Default::default()
4396            }),
4397            priority: None,
4398            scenario: None,
4399            required_scenario_state: None,
4400            new_scenario_state: None,
4401            version: 1,
4402        };
4403
4404        let body = br#"<root><order><id>123</id></order></root>"#;
4405        let headers = std::collections::HashMap::new();
4406        let query = std::collections::HashMap::new();
4407
4408        assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4409    }
4410
4411    /// Helper to build a test MockConfig with sensible defaults
4412    fn test_mock(id: &str, name: &str) -> MockConfig {
4413        MockConfig {
4414            id: id.to_string(),
4415            name: name.to_string(),
4416            method: "GET".to_string(),
4417            path: "/test".to_string(),
4418            response: MockResponse {
4419                body: serde_json::json!({"message": "test"}),
4420                headers: None,
4421            },
4422            enabled: true,
4423            latency_ms: None,
4424            status_code: Some(200),
4425            request_match: None,
4426            priority: None,
4427            scenario: None,
4428            required_scenario_state: None,
4429            new_scenario_state: None,
4430            version: 0, // will be set by create_mock
4431        }
4432    }
4433
4434    #[tokio::test]
4435    async fn test_create_mock_sets_version_to_1() {
4436        use axum::http::Request;
4437        use tower::ServiceExt;
4438
4439        let state = ManagementState::new(None, None, 3000);
4440        let app = management_router(state.clone());
4441
4442        let mock = test_mock("", "Version Test");
4443        let body = serde_json::to_string(&mock).unwrap();
4444
4445        let response = app
4446            .oneshot(
4447                Request::builder()
4448                    .method("POST")
4449                    .uri("/mocks")
4450                    .header("content-type", "application/json")
4451                    .body(axum::body::Body::from(body))
4452                    .unwrap(),
4453            )
4454            .await
4455            .unwrap();
4456
4457        assert_eq!(response.status(), StatusCode::OK);
4458
4459        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4460        let created: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4461        assert_eq!(created.version, 1, "Newly created mock should have version 1");
4462        assert!(!created.id.is_empty(), "Mock should get an auto-generated ID");
4463    }
4464
4465    #[tokio::test]
4466    async fn test_update_mock_with_correct_version_succeeds() {
4467        use axum::http::Request;
4468        use tower::ServiceExt;
4469
4470        let state = ManagementState::new(None, None, 3000);
4471
4472        // Seed a mock with version 1
4473        {
4474            let mut mocks = state.mocks.write().await;
4475            let mut mock = test_mock("v-test-1", "Original");
4476            mock.version = 1;
4477            mocks.push(mock);
4478        }
4479
4480        let app = management_router(state.clone());
4481
4482        // Update with matching version
4483        let mut updated = test_mock("v-test-1", "Updated");
4484        updated.version = 1; // matches current stored version
4485
4486        let body = serde_json::to_string(&updated).unwrap();
4487
4488        let response = app
4489            .oneshot(
4490                Request::builder()
4491                    .method("PUT")
4492                    .uri("/mocks/v-test-1")
4493                    .header("content-type", "application/json")
4494                    .body(axum::body::Body::from(body))
4495                    .unwrap(),
4496            )
4497            .await
4498            .unwrap();
4499
4500        assert_eq!(response.status(), StatusCode::OK);
4501
4502        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4503        let result: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4504        assert_eq!(result.version, 2, "Version should be incremented to 2");
4505        assert_eq!(result.name, "Updated");
4506    }
4507
4508    #[tokio::test]
4509    async fn test_update_mock_with_stale_version_returns_409() {
4510        use axum::http::Request;
4511        use tower::ServiceExt;
4512
4513        let state = ManagementState::new(None, None, 3000);
4514
4515        // Seed a mock at version 3 (simulating previous updates)
4516        {
4517            let mut mocks = state.mocks.write().await;
4518            let mut mock = test_mock("v-test-2", "Current");
4519            mock.version = 3;
4520            mocks.push(mock);
4521        }
4522
4523        let app = management_router(state.clone());
4524
4525        // Attempt update with stale version (1 instead of 3)
4526        let mut stale = test_mock("v-test-2", "Stale Update");
4527        stale.version = 1;
4528
4529        let body = serde_json::to_string(&stale).unwrap();
4530
4531        let response = app
4532            .oneshot(
4533                Request::builder()
4534                    .method("PUT")
4535                    .uri("/mocks/v-test-2")
4536                    .header("content-type", "application/json")
4537                    .body(axum::body::Body::from(body))
4538                    .unwrap(),
4539            )
4540            .await
4541            .unwrap();
4542
4543        assert_eq!(
4544            response.status(),
4545            StatusCode::CONFLICT,
4546            "Stale version should be rejected with 409 Conflict"
4547        );
4548
4549        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4550        let error: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
4551        assert_eq!(error["error"], "Version conflict");
4552        assert_eq!(error["current_version"], 3);
4553
4554        // Verify the mock was NOT modified
4555        let mocks = state.mocks.read().await;
4556        let mock = mocks.iter().find(|m| m.id == "v-test-2").unwrap();
4557        assert_eq!(mock.name, "Current", "Mock should not have been modified");
4558        assert_eq!(mock.version, 3, "Version should remain unchanged");
4559    }
4560
4561    #[tokio::test]
4562    async fn test_get_mock_includes_version() {
4563        use axum::http::Request;
4564        use tower::ServiceExt;
4565
4566        let state = ManagementState::new(None, None, 3000);
4567
4568        {
4569            let mut mocks = state.mocks.write().await;
4570            let mut mock = test_mock("v-get-1", "Get Test");
4571            mock.version = 5;
4572            mocks.push(mock);
4573        }
4574
4575        let app = management_router(state.clone());
4576
4577        let response = app
4578            .oneshot(
4579                Request::builder()
4580                    .method("GET")
4581                    .uri("/mocks/v-get-1")
4582                    .body(axum::body::Body::empty())
4583                    .unwrap(),
4584            )
4585            .await
4586            .unwrap();
4587
4588        assert_eq!(response.status(), StatusCode::OK);
4589
4590        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4591        let mock: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4592        assert_eq!(mock.version, 5, "GET should return the stored version");
4593    }
4594}