mockforge_http/
management.rs

1/// Management API for MockForge
2///
3/// Provides REST endpoints for controlling mocks, server configuration,
4/// and integration with developer tools (VS Code extension, CI/CD, etc.)
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::{
9        sse::{Event, Sse},
10        IntoResponse, Json, Response,
11    },
12    routing::{delete, get, post, put},
13    Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17#[cfg(feature = "smtp")]
18use mockforge_smtp::EmailSearchFilters;
19use serde::{Deserialize, Serialize};
20use std::convert::Infallible;
21use std::sync::Arc;
22use tokio::sync::{broadcast, RwLock};
23use tracing::*;
24
25/// Message event types for real-time monitoring
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "protocol", content = "data")]
28#[serde(rename_all = "lowercase")]
29pub enum MessageEvent {
30    #[cfg(feature = "mqtt")]
31    /// MQTT message event
32    Mqtt(MqttMessageEvent),
33    #[cfg(feature = "kafka")]
34    /// Kafka message event
35    Kafka(KafkaMessageEvent),
36}
37
38#[cfg(feature = "mqtt")]
39/// MQTT message event for real-time monitoring
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct MqttMessageEvent {
42    /// MQTT topic name
43    pub topic: String,
44    /// Message payload content
45    pub payload: String,
46    /// Quality of Service level (0, 1, or 2)
47    pub qos: u8,
48    /// Whether the message is retained
49    pub retain: bool,
50    /// RFC3339 formatted timestamp
51    pub timestamp: String,
52}
53
54#[cfg(feature = "kafka")]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct KafkaMessageEvent {
57    pub topic: String,
58    pub key: Option<String>,
59    pub value: String,
60    pub partition: i32,
61    pub offset: i64,
62    pub headers: Option<std::collections::HashMap<String, String>>,
63    pub timestamp: String,
64}
65
66/// Mock configuration representation
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct MockConfig {
69    /// Unique identifier for the mock
70    pub id: String,
71    /// Human-readable name for the mock
72    pub name: String,
73    /// HTTP method (GET, POST, etc.)
74    pub method: String,
75    /// API path pattern to match
76    pub path: String,
77    /// Response configuration
78    pub response: MockResponse,
79    /// Whether this mock is currently enabled
80    pub enabled: bool,
81    /// Optional latency to inject in milliseconds
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub latency_ms: Option<u64>,
84    /// Optional HTTP status code override
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub status_code: Option<u16>,
87}
88
89/// Mock response configuration
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct MockResponse {
92    /// Response body as JSON
93    pub body: serde_json::Value,
94    /// Optional custom response headers
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub headers: Option<std::collections::HashMap<String, String>>,
97}
98
99/// Server statistics
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ServerStats {
102    /// Server uptime in seconds
103    pub uptime_seconds: u64,
104    /// Total number of requests processed
105    pub total_requests: u64,
106    /// Number of active mock configurations
107    pub active_mocks: usize,
108    /// Number of currently enabled mocks
109    pub enabled_mocks: usize,
110    /// Number of registered API routes
111    pub registered_routes: usize,
112}
113
114/// Server configuration info
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ServerConfig {
117    /// MockForge version string
118    pub version: String,
119    /// Server port number
120    pub port: u16,
121    /// Whether an OpenAPI spec is loaded
122    pub has_openapi_spec: bool,
123    /// Optional path to the OpenAPI spec file
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub spec_path: Option<String>,
126}
127
128/// Shared state for the management API
129#[derive(Clone)]
130pub struct ManagementState {
131    /// Collection of mock configurations
132    pub mocks: Arc<RwLock<Vec<MockConfig>>>,
133    /// Optional OpenAPI specification
134    pub spec: Option<Arc<OpenApiSpec>>,
135    /// Optional path to the OpenAPI spec file
136    pub spec_path: Option<String>,
137    /// Server port number
138    pub port: u16,
139    /// Server start time for uptime calculation
140    pub start_time: std::time::Instant,
141    /// Counter for total requests processed
142    pub request_counter: Arc<RwLock<u64>>,
143    /// Optional SMTP registry for email mocking
144    #[cfg(feature = "smtp")]
145    pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
146    /// Optional MQTT broker for message mocking
147    #[cfg(feature = "mqtt")]
148    pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
149    /// Optional Kafka broker for event streaming
150    #[cfg(feature = "kafka")]
151    pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
152    /// Broadcast channel for message events (MQTT & Kafka)
153    #[cfg(any(feature = "mqtt", feature = "kafka"))]
154    pub message_events: Arc<broadcast::Sender<MessageEvent>>,
155}
156
157impl ManagementState {
158    /// Create a new management state
159    ///
160    /// # Arguments
161    /// * `spec` - Optional OpenAPI specification
162    /// * `spec_path` - Optional path to the OpenAPI spec file
163    /// * `port` - Server port number
164    pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
165        Self {
166            mocks: Arc::new(RwLock::new(Vec::new())),
167            spec,
168            spec_path,
169            port,
170            start_time: std::time::Instant::now(),
171            request_counter: Arc::new(RwLock::new(0)),
172            #[cfg(feature = "smtp")]
173            smtp_registry: None,
174            #[cfg(feature = "mqtt")]
175            mqtt_broker: None,
176            #[cfg(feature = "kafka")]
177            kafka_broker: None,
178            #[cfg(any(feature = "mqtt", feature = "kafka"))]
179            message_events: {
180                let (tx, _) = broadcast::channel(1000);
181                Arc::new(tx)
182            },
183        }
184    }
185
186    #[cfg(feature = "smtp")]
187    /// Add SMTP registry to management state
188    pub fn with_smtp_registry(
189        mut self,
190        smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
191    ) -> Self {
192        self.smtp_registry = Some(smtp_registry);
193        self
194    }
195
196    #[cfg(feature = "mqtt")]
197    /// Add MQTT broker to management state
198    pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
199        self.mqtt_broker = Some(mqtt_broker);
200        self
201    }
202
203    #[cfg(feature = "kafka")]
204    /// Add Kafka broker to management state
205    pub fn with_kafka_broker(
206        mut self,
207        kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
208    ) -> Self {
209        self.kafka_broker = Some(kafka_broker);
210        self
211    }
212}
213
214/// List all mocks
215async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
216    let mocks = state.mocks.read().await;
217    Json(serde_json::json!({
218        "mocks": *mocks,
219        "total": mocks.len(),
220        "enabled": mocks.iter().filter(|m| m.enabled).count()
221    }))
222}
223
224/// Get a specific mock by ID
225async fn get_mock(
226    State(state): State<ManagementState>,
227    Path(id): Path<String>,
228) -> Result<Json<MockConfig>, StatusCode> {
229    let mocks = state.mocks.read().await;
230    mocks
231        .iter()
232        .find(|m| m.id == id)
233        .cloned()
234        .map(Json)
235        .ok_or(StatusCode::NOT_FOUND)
236}
237
238/// Create a new mock
239async fn create_mock(
240    State(state): State<ManagementState>,
241    Json(mut mock): Json<MockConfig>,
242) -> Result<Json<MockConfig>, StatusCode> {
243    let mut mocks = state.mocks.write().await;
244
245    // Generate ID if not provided
246    if mock.id.is_empty() {
247        mock.id = uuid::Uuid::new_v4().to_string();
248    }
249
250    // Check for duplicate ID
251    if mocks.iter().any(|m| m.id == mock.id) {
252        return Err(StatusCode::CONFLICT);
253    }
254
255    info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
256    mocks.push(mock.clone());
257    Ok(Json(mock))
258}
259
260/// Update an existing mock
261async fn update_mock(
262    State(state): State<ManagementState>,
263    Path(id): Path<String>,
264    Json(updated_mock): Json<MockConfig>,
265) -> Result<Json<MockConfig>, StatusCode> {
266    let mut mocks = state.mocks.write().await;
267
268    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
269
270    info!("Updating mock: {}", id);
271    mocks[position] = updated_mock.clone();
272    Ok(Json(updated_mock))
273}
274
275/// Delete a mock
276async fn delete_mock(
277    State(state): State<ManagementState>,
278    Path(id): Path<String>,
279) -> Result<StatusCode, StatusCode> {
280    let mut mocks = state.mocks.write().await;
281
282    let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
283
284    info!("Deleting mock: {}", id);
285    mocks.remove(position);
286    Ok(StatusCode::NO_CONTENT)
287}
288
289/// Get server statistics
290async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
291    let mocks = state.mocks.read().await;
292    let request_count = *state.request_counter.read().await;
293
294    Json(ServerStats {
295        uptime_seconds: state.start_time.elapsed().as_secs(),
296        total_requests: request_count,
297        active_mocks: mocks.len(),
298        enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
299        registered_routes: mocks.len(), // This could be enhanced with actual route registry info
300    })
301}
302
303/// Get server configuration
304async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
305    Json(ServerConfig {
306        version: env!("CARGO_PKG_VERSION").to_string(),
307        port: state.port,
308        has_openapi_spec: state.spec.is_some(),
309        spec_path: state.spec_path.clone(),
310    })
311}
312
313/// Health check endpoint
314async fn health_check() -> Json<serde_json::Value> {
315    Json(serde_json::json!({
316        "status": "healthy",
317        "service": "mockforge-management",
318        "timestamp": chrono::Utc::now().to_rfc3339()
319    }))
320}
321
322/// Export format for mock configurations
323#[derive(Debug, Clone, Serialize, Deserialize)]
324#[serde(rename_all = "lowercase")]
325pub enum ExportFormat {
326    /// JSON format
327    Json,
328    /// YAML format
329    Yaml,
330}
331
332/// Export mocks in specified format
333async fn export_mocks(
334    State(state): State<ManagementState>,
335    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
336) -> Result<(StatusCode, String), StatusCode> {
337    let mocks = state.mocks.read().await;
338
339    let format = params
340        .get("format")
341        .map(|f| match f.as_str() {
342            "yaml" | "yml" => ExportFormat::Yaml,
343            _ => ExportFormat::Json,
344        })
345        .unwrap_or(ExportFormat::Json);
346
347    match format {
348        ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
349            .map(|json| (StatusCode::OK, json))
350            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
351        ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
352            .map(|yaml| (StatusCode::OK, yaml))
353            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
354    }
355}
356
357/// Import mocks from JSON/YAML
358async fn import_mocks(
359    State(state): State<ManagementState>,
360    Json(mocks): Json<Vec<MockConfig>>,
361) -> impl IntoResponse {
362    let mut current_mocks = state.mocks.write().await;
363    current_mocks.clear();
364    current_mocks.extend(mocks);
365    Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
366}
367
368#[cfg(feature = "smtp")]
369/// List SMTP emails in mailbox
370async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
371    if let Some(ref smtp_registry) = state.smtp_registry {
372        match smtp_registry.get_emails() {
373            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
374            Err(e) => (
375                StatusCode::INTERNAL_SERVER_ERROR,
376                Json(serde_json::json!({
377                    "error": "Failed to retrieve emails",
378                    "message": e.to_string()
379                })),
380            ),
381        }
382    } else {
383        (
384            StatusCode::NOT_IMPLEMENTED,
385            Json(serde_json::json!({
386                "error": "SMTP mailbox management not available",
387                "message": "SMTP server is not enabled or registry not available."
388            })),
389        )
390    }
391}
392
393/// Get specific SMTP email
394#[cfg(feature = "smtp")]
395async fn get_smtp_email(
396    State(state): State<ManagementState>,
397    Path(id): Path<String>,
398) -> impl IntoResponse {
399    if let Some(ref smtp_registry) = state.smtp_registry {
400        match smtp_registry.get_email_by_id(&id) {
401            Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
402            Ok(None) => (
403                StatusCode::NOT_FOUND,
404                Json(serde_json::json!({
405                    "error": "Email not found",
406                    "id": id
407                })),
408            ),
409            Err(e) => (
410                StatusCode::INTERNAL_SERVER_ERROR,
411                Json(serde_json::json!({
412                    "error": "Failed to retrieve email",
413                    "message": e.to_string()
414                })),
415            ),
416        }
417    } else {
418        (
419            StatusCode::NOT_IMPLEMENTED,
420            Json(serde_json::json!({
421                "error": "SMTP mailbox management not available",
422                "message": "SMTP server is not enabled or registry not available."
423            })),
424        )
425    }
426}
427
428/// Clear SMTP mailbox
429#[cfg(feature = "smtp")]
430async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
431    if let Some(ref smtp_registry) = state.smtp_registry {
432        match smtp_registry.clear_mailbox() {
433            Ok(()) => (
434                StatusCode::OK,
435                Json(serde_json::json!({
436                    "message": "Mailbox cleared successfully"
437                })),
438            ),
439            Err(e) => (
440                StatusCode::INTERNAL_SERVER_ERROR,
441                Json(serde_json::json!({
442                    "error": "Failed to clear mailbox",
443                    "message": e.to_string()
444                })),
445            ),
446        }
447    } else {
448        (
449            StatusCode::NOT_IMPLEMENTED,
450            Json(serde_json::json!({
451                "error": "SMTP mailbox management not available",
452                "message": "SMTP server is not enabled or registry not available."
453            })),
454        )
455    }
456}
457
458/// Export SMTP mailbox
459#[cfg(feature = "smtp")]
460async fn export_smtp_mailbox(
461    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
462) -> impl IntoResponse {
463    let format = params.get("format").unwrap_or(&"json".to_string()).clone();
464    (
465        StatusCode::NOT_IMPLEMENTED,
466        Json(serde_json::json!({
467            "error": "SMTP mailbox management not available via HTTP API",
468            "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
469            "requested_format": format
470        })),
471    )
472}
473
474/// Search SMTP emails
475#[cfg(feature = "smtp")]
476async fn search_smtp_emails(
477    State(state): State<ManagementState>,
478    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
479) -> impl IntoResponse {
480    if let Some(ref smtp_registry) = state.smtp_registry {
481        let filters = EmailSearchFilters {
482            sender: params.get("sender").cloned(),
483            recipient: params.get("recipient").cloned(),
484            subject: params.get("subject").cloned(),
485            body: params.get("body").cloned(),
486            since: params
487                .get("since")
488                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
489                .map(|dt| dt.with_timezone(&chrono::Utc)),
490            until: params
491                .get("until")
492                .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
493                .map(|dt| dt.with_timezone(&chrono::Utc)),
494            use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
495            case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
496        };
497
498        match smtp_registry.search_emails(filters) {
499            Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
500            Err(e) => (
501                StatusCode::INTERNAL_SERVER_ERROR,
502                Json(serde_json::json!({
503                    "error": "Failed to search emails",
504                    "message": e.to_string()
505                })),
506            ),
507        }
508    } else {
509        (
510            StatusCode::NOT_IMPLEMENTED,
511            Json(serde_json::json!({
512                "error": "SMTP mailbox management not available",
513                "message": "SMTP server is not enabled or registry not available."
514            })),
515        )
516    }
517}
518
519/// MQTT broker statistics
520#[cfg(feature = "mqtt")]
521#[derive(Debug, Clone, Serialize, Deserialize)]
522pub struct MqttBrokerStats {
523    /// Number of connected MQTT clients
524    pub connected_clients: usize,
525    /// Number of active MQTT topics
526    pub active_topics: usize,
527    /// Number of retained messages
528    pub retained_messages: usize,
529    /// Total number of subscriptions
530    pub total_subscriptions: usize,
531}
532
533/// MQTT management handlers
534#[cfg(feature = "mqtt")]
535async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
536    if let Some(broker) = &state.mqtt_broker {
537        let connected_clients = broker.get_connected_clients().await.len();
538        let active_topics = broker.get_active_topics().await.len();
539        let stats = broker.get_topic_stats().await;
540
541        let broker_stats = MqttBrokerStats {
542            connected_clients,
543            active_topics,
544            retained_messages: stats.retained_messages,
545            total_subscriptions: stats.total_subscriptions,
546        };
547
548        Json(broker_stats).into_response()
549    } else {
550        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
551    }
552}
553
554#[cfg(feature = "mqtt")]
555async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
556    if let Some(broker) = &state.mqtt_broker {
557        let clients = broker.get_connected_clients().await;
558        Json(serde_json::json!({
559            "clients": clients
560        }))
561        .into_response()
562    } else {
563        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
564    }
565}
566
567#[cfg(feature = "mqtt")]
568async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
569    if let Some(broker) = &state.mqtt_broker {
570        let topics = broker.get_active_topics().await;
571        Json(serde_json::json!({
572            "topics": topics
573        }))
574        .into_response()
575    } else {
576        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
577    }
578}
579
580#[cfg(feature = "mqtt")]
581async fn disconnect_mqtt_client(
582    State(state): State<ManagementState>,
583    Path(client_id): Path<String>,
584) -> impl IntoResponse {
585    if let Some(broker) = &state.mqtt_broker {
586        match broker.disconnect_client(&client_id).await {
587            Ok(_) => {
588                (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
589            }
590            Err(e) => {
591                (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
592                    .into_response()
593            }
594        }
595    } else {
596        (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
597    }
598}
599
600// ========== MQTT Publish Handler ==========
601
602#[cfg(feature = "mqtt")]
603/// Request to publish a single MQTT message
604#[derive(Debug, Deserialize)]
605pub struct MqttPublishRequest {
606    /// Topic to publish to
607    pub topic: String,
608    /// Message payload (string or JSON)
609    pub payload: String,
610    /// QoS level (0, 1, or 2)
611    #[serde(default = "default_qos")]
612    pub qos: u8,
613    /// Whether to retain the message
614    #[serde(default)]
615    pub retain: bool,
616}
617
618#[cfg(feature = "mqtt")]
619fn default_qos() -> u8 {
620    0
621}
622
623#[cfg(feature = "mqtt")]
624/// Publish a message to an MQTT topic (only compiled when mqtt feature is enabled)
625async fn publish_mqtt_message_handler(
626    State(state): State<ManagementState>,
627    Json(request): Json<serde_json::Value>,
628) -> impl IntoResponse {
629    // Extract fields from JSON manually
630    let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
631    let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
632    let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
633    let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
634
635    if topic.is_none() || payload.is_none() {
636        return (
637            StatusCode::BAD_REQUEST,
638            Json(serde_json::json!({
639                "error": "Invalid request",
640                "message": "Missing required fields: topic and payload"
641            })),
642        );
643    }
644
645    let topic = topic.unwrap();
646    let payload = payload.unwrap();
647
648    if let Some(broker) = &state.mqtt_broker {
649        // Validate QoS
650        if qos > 2 {
651            return (
652                StatusCode::BAD_REQUEST,
653                Json(serde_json::json!({
654                    "error": "Invalid QoS",
655                    "message": "QoS must be 0, 1, or 2"
656                })),
657            );
658        }
659
660        // Convert payload to bytes
661        let payload_bytes = payload.as_bytes().to_vec();
662        let client_id = "mockforge-management-api".to_string();
663
664        let publish_result = broker
665            .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
666            .await
667            .map_err(|e| format!("{}", e));
668
669        match publish_result {
670            Ok(_) => {
671                // Emit message event for real-time monitoring
672                let event = MessageEvent::Mqtt(MqttMessageEvent {
673                    topic: topic.clone(),
674                    payload: payload.clone(),
675                    qos,
676                    retain,
677                    timestamp: chrono::Utc::now().to_rfc3339(),
678                });
679                let _ = state.message_events.send(event);
680
681                (
682                    StatusCode::OK,
683                    Json(serde_json::json!({
684                        "success": true,
685                        "message": format!("Message published to topic '{}'", topic),
686                        "topic": topic,
687                        "qos": qos,
688                        "retain": retain
689                    })),
690                )
691            }
692            Err(error_msg) => (
693                StatusCode::INTERNAL_SERVER_ERROR,
694                Json(serde_json::json!({
695                    "error": "Failed to publish message",
696                    "message": error_msg
697                })),
698            ),
699        }
700    } else {
701        (
702            StatusCode::SERVICE_UNAVAILABLE,
703            Json(serde_json::json!({
704                "error": "MQTT broker not available",
705                "message": "MQTT broker is not enabled or not available."
706            })),
707        )
708    }
709}
710
711#[cfg(not(feature = "mqtt"))]
712/// Publish a message to an MQTT topic (stub when mqtt feature is disabled)
713async fn publish_mqtt_message_handler(
714    State(_state): State<ManagementState>,
715    Json(_request): Json<serde_json::Value>,
716) -> impl IntoResponse {
717    (
718        StatusCode::SERVICE_UNAVAILABLE,
719        Json(serde_json::json!({
720            "error": "MQTT feature not enabled",
721            "message": "MQTT support is not compiled into this build"
722        })),
723    )
724}
725
726#[cfg(feature = "mqtt")]
727/// Request to publish multiple MQTT messages
728#[derive(Debug, Deserialize)]
729pub struct MqttBatchPublishRequest {
730    /// List of messages to publish
731    pub messages: Vec<MqttPublishRequest>,
732    /// Delay between messages in milliseconds
733    #[serde(default = "default_delay")]
734    pub delay_ms: u64,
735}
736
737#[cfg(feature = "mqtt")]
738fn default_delay() -> u64 {
739    100
740}
741
742#[cfg(feature = "mqtt")]
743/// Publish multiple messages to MQTT topics (only compiled when mqtt feature is enabled)
744async fn publish_mqtt_batch_handler(
745    State(state): State<ManagementState>,
746    Json(request): Json<serde_json::Value>,
747) -> impl IntoResponse {
748    // Extract fields from JSON manually
749    let messages_json = request.get("messages").and_then(|v| v.as_array());
750    let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
751
752    if messages_json.is_none() {
753        return (
754            StatusCode::BAD_REQUEST,
755            Json(serde_json::json!({
756                "error": "Invalid request",
757                "message": "Missing required field: messages"
758            })),
759        );
760    }
761
762    let messages_json = messages_json.unwrap();
763
764    if let Some(broker) = &state.mqtt_broker {
765        if messages_json.is_empty() {
766            return (
767                StatusCode::BAD_REQUEST,
768                Json(serde_json::json!({
769                    "error": "Empty batch",
770                    "message": "At least one message is required"
771                })),
772            );
773        }
774
775        let mut results = Vec::new();
776        let client_id = "mockforge-management-api".to_string();
777
778        for (index, msg_json) in messages_json.iter().enumerate() {
779            let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
780            let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
781            let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
782            let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
783
784            if topic.is_none() || payload.is_none() {
785                results.push(serde_json::json!({
786                    "index": index,
787                    "success": false,
788                    "error": "Missing required fields: topic and payload"
789                }));
790                continue;
791            }
792
793            let topic = topic.unwrap();
794            let payload = payload.unwrap();
795
796            // Validate QoS
797            if qos > 2 {
798                results.push(serde_json::json!({
799                    "index": index,
800                    "success": false,
801                    "error": "Invalid QoS (must be 0, 1, or 2)"
802                }));
803                continue;
804            }
805
806            // Convert payload to bytes
807            let payload_bytes = payload.as_bytes().to_vec();
808
809            let publish_result = broker
810                .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
811                .await
812                .map_err(|e| format!("{}", e));
813
814            match publish_result {
815                Ok(_) => {
816                    // Emit message event
817                    let event = MessageEvent::Mqtt(MqttMessageEvent {
818                        topic: topic.clone(),
819                        payload: payload.clone(),
820                        qos,
821                        retain,
822                        timestamp: chrono::Utc::now().to_rfc3339(),
823                    });
824                    let _ = state.message_events.send(event);
825
826                    results.push(serde_json::json!({
827                        "index": index,
828                        "success": true,
829                        "topic": topic,
830                        "qos": qos
831                    }));
832                }
833                Err(error_msg) => {
834                    results.push(serde_json::json!({
835                        "index": index,
836                        "success": false,
837                        "error": error_msg
838                    }));
839                }
840            }
841
842            // Add delay between messages (except for the last one)
843            if index < messages_json.len() - 1 && delay_ms > 0 {
844                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
845            }
846        }
847
848        let success_count =
849            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
850
851        (
852            StatusCode::OK,
853            Json(serde_json::json!({
854                "success": true,
855                "total": messages_json.len(),
856                "succeeded": success_count,
857                "failed": messages_json.len() - success_count,
858                "results": results
859            })),
860        )
861    } else {
862        (
863            StatusCode::SERVICE_UNAVAILABLE,
864            Json(serde_json::json!({
865                "error": "MQTT broker not available",
866                "message": "MQTT broker is not enabled or not available."
867            })),
868        )
869    }
870}
871
872#[cfg(not(feature = "mqtt"))]
873/// Publish multiple messages to MQTT topics (stub when mqtt feature is disabled)
874async fn publish_mqtt_batch_handler(
875    State(_state): State<ManagementState>,
876    Json(_request): Json<serde_json::Value>,
877) -> impl IntoResponse {
878    (
879        StatusCode::SERVICE_UNAVAILABLE,
880        Json(serde_json::json!({
881            "error": "MQTT feature not enabled",
882            "message": "MQTT support is not compiled into this build"
883        })),
884    )
885}
886
887/// Build the management API router
888pub fn management_router(state: ManagementState) -> Router {
889    let router = Router::new()
890        .route("/health", get(health_check))
891        .route("/stats", get(get_stats))
892        .route("/config", get(get_config))
893        .route("/mocks", get(list_mocks))
894        .route("/mocks", post(create_mock))
895        .route("/mocks/{id}", get(get_mock))
896        .route("/mocks/{id}", put(update_mock))
897        .route("/mocks/{id}", delete(delete_mock))
898        .route("/export", get(export_mocks))
899        .route("/import", post(import_mocks));
900
901    #[cfg(feature = "smtp")]
902    let router = router
903        .route("/smtp/mailbox", get(list_smtp_emails))
904        .route("/smtp/mailbox", delete(clear_smtp_mailbox))
905        .route("/smtp/mailbox/{id}", get(get_smtp_email))
906        .route("/smtp/mailbox/export", get(export_smtp_mailbox))
907        .route("/smtp/mailbox/search", get(search_smtp_emails));
908
909    #[cfg(not(feature = "smtp"))]
910    let router = router;
911
912    // MQTT routes
913    #[cfg(feature = "mqtt")]
914    let router = router
915        .route("/mqtt/stats", get(get_mqtt_stats))
916        .route("/mqtt/clients", get(get_mqtt_clients))
917        .route("/mqtt/topics", get(get_mqtt_topics))
918        .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
919        .route("/mqtt/messages/stream", get(mqtt_messages_stream))
920        .route("/mqtt/publish", post(publish_mqtt_message_handler))
921        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
922
923    #[cfg(not(feature = "mqtt"))]
924    let router = router
925        .route("/mqtt/publish", post(publish_mqtt_message_handler))
926        .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
927
928    #[cfg(feature = "kafka")]
929    let router = router
930        .route("/kafka/stats", get(get_kafka_stats))
931        .route("/kafka/topics", get(get_kafka_topics))
932        .route("/kafka/topics/{topic}", get(get_kafka_topic))
933        .route("/kafka/groups", get(get_kafka_groups))
934        .route("/kafka/groups/{group_id}", get(get_kafka_group))
935        .route("/kafka/produce", post(produce_kafka_message))
936        .route("/kafka/produce/batch", post(produce_kafka_batch))
937        .route("/kafka/messages/stream", get(kafka_messages_stream));
938
939    #[cfg(not(feature = "kafka"))]
940    let router = router;
941
942    // AI-powered features
943    let router = router
944        .route("/ai/generate-spec", post(generate_ai_spec))
945        .route("/chaos/config", get(get_chaos_config))
946        .route("/chaos/config", post(update_chaos_config))
947        .route("/network/profiles", get(list_network_profiles))
948        .route("/network/profile/apply", post(apply_network_profile));
949
950    router.with_state(state)
951}
952
953#[cfg(feature = "kafka")]
954#[derive(Debug, Clone, Serialize, Deserialize)]
955pub struct KafkaBrokerStats {
956    /// Number of topics
957    pub topics: usize,
958    /// Total number of partitions
959    pub partitions: usize,
960    /// Number of consumer groups
961    pub consumer_groups: usize,
962    /// Total messages produced
963    pub messages_produced: u64,
964    /// Total messages consumed
965    pub messages_consumed: u64,
966}
967
968#[cfg(feature = "kafka")]
969#[derive(Debug, Clone, Serialize, Deserialize)]
970pub struct KafkaTopicInfo {
971    pub name: String,
972    pub partitions: usize,
973    pub replication_factor: i32,
974}
975
976#[cfg(feature = "kafka")]
977#[derive(Debug, Clone, Serialize, Deserialize)]
978pub struct KafkaConsumerGroupInfo {
979    pub group_id: String,
980    pub members: usize,
981    pub state: String,
982}
983
984#[cfg(feature = "kafka")]
985/// Get Kafka broker statistics
986async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
987    if let Some(broker) = &state.kafka_broker {
988        let topics = broker.topics.read().await;
989        let consumer_groups = broker.consumer_groups.read().await;
990        let metrics = broker.metrics.clone();
991
992        let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
993        let snapshot = metrics.snapshot();
994        let messages_produced = snapshot.messages_produced_total;
995        let messages_consumed = snapshot.messages_consumed_total;
996
997        let stats = KafkaBrokerStats {
998            topics: topics.len(),
999            partitions: total_partitions,
1000            consumer_groups: consumer_groups.groups().len(),
1001            messages_produced,
1002            messages_consumed,
1003        };
1004
1005        Json(stats).into_response()
1006    } else {
1007        (
1008            StatusCode::SERVICE_UNAVAILABLE,
1009            Json(serde_json::json!({
1010                "error": "Kafka broker not available",
1011                "message": "Kafka broker is not enabled or not available."
1012            })),
1013        )
1014            .into_response()
1015    }
1016}
1017
1018#[cfg(feature = "kafka")]
1019/// List Kafka topics
1020async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1021    if let Some(broker) = &state.kafka_broker {
1022        let topics = broker.topics.read().await;
1023        let topic_list: Vec<KafkaTopicInfo> = topics
1024            .iter()
1025            .map(|(name, topic)| KafkaTopicInfo {
1026                name: name.clone(),
1027                partitions: topic.partitions.len(),
1028                replication_factor: topic.config.replication_factor,
1029            })
1030            .collect();
1031
1032        Json(serde_json::json!({
1033            "topics": topic_list
1034        }))
1035        .into_response()
1036    } else {
1037        (
1038            StatusCode::SERVICE_UNAVAILABLE,
1039            Json(serde_json::json!({
1040                "error": "Kafka broker not available",
1041                "message": "Kafka broker is not enabled or not available."
1042            })),
1043        )
1044            .into_response()
1045    }
1046}
1047
1048#[cfg(feature = "kafka")]
1049/// Get Kafka topic details
1050async fn get_kafka_topic(
1051    State(state): State<ManagementState>,
1052    Path(topic_name): Path<String>,
1053) -> impl IntoResponse {
1054    if let Some(broker) = &state.kafka_broker {
1055        let topics = broker.topics.read().await;
1056        if let Some(topic) = topics.get(&topic_name) {
1057            Json(serde_json::json!({
1058                "name": topic_name,
1059                "partitions": topic.partitions.len(),
1060                "replication_factor": topic.config.replication_factor,
1061                "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
1062                    "id": idx as i32,
1063                    "leader": 0,
1064                    "replicas": vec![0],
1065                    "message_count": partition.messages.len()
1066                })).collect::<Vec<_>>()
1067            })).into_response()
1068        } else {
1069            (
1070                StatusCode::NOT_FOUND,
1071                Json(serde_json::json!({
1072                    "error": "Topic not found",
1073                    "topic": topic_name
1074                })),
1075            )
1076                .into_response()
1077        }
1078    } else {
1079        (
1080            StatusCode::SERVICE_UNAVAILABLE,
1081            Json(serde_json::json!({
1082                "error": "Kafka broker not available",
1083                "message": "Kafka broker is not enabled or not available."
1084            })),
1085        )
1086            .into_response()
1087    }
1088}
1089
1090#[cfg(feature = "kafka")]
1091/// List Kafka consumer groups
1092async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
1093    if let Some(broker) = &state.kafka_broker {
1094        let consumer_groups = broker.consumer_groups.read().await;
1095        let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
1096            .groups()
1097            .iter()
1098            .map(|(group_id, group)| KafkaConsumerGroupInfo {
1099                group_id: group_id.clone(),
1100                members: group.members.len(),
1101                state: "Stable".to_string(), // Simplified - could be more detailed
1102            })
1103            .collect();
1104
1105        Json(serde_json::json!({
1106            "groups": groups
1107        }))
1108        .into_response()
1109    } else {
1110        (
1111            StatusCode::SERVICE_UNAVAILABLE,
1112            Json(serde_json::json!({
1113                "error": "Kafka broker not available",
1114                "message": "Kafka broker is not enabled or not available."
1115            })),
1116        )
1117            .into_response()
1118    }
1119}
1120
1121#[cfg(feature = "kafka")]
1122/// Get Kafka consumer group details
1123async fn get_kafka_group(
1124    State(state): State<ManagementState>,
1125    Path(group_id): Path<String>,
1126) -> impl IntoResponse {
1127    if let Some(broker) = &state.kafka_broker {
1128        let consumer_groups = broker.consumer_groups.read().await;
1129        if let Some(group) = consumer_groups.groups().get(&group_id) {
1130            Json(serde_json::json!({
1131                "group_id": group_id,
1132                "members": group.members.len(),
1133                "state": "Stable",
1134                "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
1135                    "member_id": member_id,
1136                    "client_id": member.client_id,
1137                    "assignments": member.assignment.iter().map(|a| serde_json::json!({
1138                        "topic": a.topic,
1139                        "partitions": a.partitions
1140                    })).collect::<Vec<_>>()
1141                })).collect::<Vec<_>>(),
1142                "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
1143                    "topic": topic,
1144                    "partition": partition,
1145                    "offset": offset
1146                })).collect::<Vec<_>>()
1147            })).into_response()
1148        } else {
1149            (
1150                StatusCode::NOT_FOUND,
1151                Json(serde_json::json!({
1152                    "error": "Consumer group not found",
1153                    "group_id": group_id
1154                })),
1155            )
1156                .into_response()
1157        }
1158    } else {
1159        (
1160            StatusCode::SERVICE_UNAVAILABLE,
1161            Json(serde_json::json!({
1162                "error": "Kafka broker not available",
1163                "message": "Kafka broker is not enabled or not available."
1164            })),
1165        )
1166            .into_response()
1167    }
1168}
1169
1170// ========== Kafka Produce Handler ==========
1171
1172#[cfg(feature = "kafka")]
1173#[derive(Debug, Deserialize)]
1174pub struct KafkaProduceRequest {
1175    /// Topic to produce to
1176    pub topic: String,
1177    /// Message key (optional)
1178    #[serde(default)]
1179    pub key: Option<String>,
1180    /// Message value (JSON string or plain string)
1181    pub value: String,
1182    /// Partition ID (optional, auto-assigned if not provided)
1183    #[serde(default)]
1184    pub partition: Option<i32>,
1185    /// Message headers (optional, key-value pairs)
1186    #[serde(default)]
1187    pub headers: Option<std::collections::HashMap<String, String>>,
1188}
1189
1190#[cfg(feature = "kafka")]
1191/// Produce a message to a Kafka topic
1192async fn produce_kafka_message(
1193    State(state): State<ManagementState>,
1194    Json(request): Json<KafkaProduceRequest>,
1195) -> impl IntoResponse {
1196    if let Some(broker) = &state.kafka_broker {
1197        let mut topics = broker.topics.write().await;
1198
1199        // Get or create the topic
1200        let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
1201            crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
1202        });
1203
1204        // Determine partition
1205        let partition_id = if let Some(partition) = request.partition {
1206            partition
1207        } else {
1208            topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
1209        };
1210
1211        // Validate partition exists
1212        if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
1213            return (
1214                StatusCode::BAD_REQUEST,
1215                Json(serde_json::json!({
1216                    "error": "Invalid partition",
1217                    "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
1218                })),
1219            )
1220                .into_response();
1221        }
1222
1223        // Create the message
1224        let message = crate::partitions::KafkaMessage {
1225            offset: 0, // Will be set by partition.append
1226            timestamp: chrono::Utc::now().timestamp_millis(),
1227            key: request.key.map(|k| k.as_bytes().to_vec()),
1228            value: request.value.as_bytes().to_vec(),
1229            headers: request
1230                .headers
1231                .unwrap_or_default()
1232                .into_iter()
1233                .map(|(k, v)| (k, v.as_bytes().to_vec()))
1234                .collect(),
1235        };
1236
1237        // Produce to partition
1238        match topic_entry.produce(partition_id, message).await {
1239            Ok(offset) => {
1240                // Record metrics
1241                broker.metrics.record_messages_produced(1);
1242
1243                // Emit message event for real-time monitoring
1244                #[cfg(feature = "kafka")]
1245                {
1246                    let event = MessageEvent::Kafka(KafkaMessageEvent {
1247                        topic: request.topic.clone(),
1248                        key: request.key.clone(),
1249                        value: request.value.clone(),
1250                        partition: partition_id,
1251                        offset,
1252                        headers: request.headers.clone(),
1253                        timestamp: chrono::Utc::now().to_rfc3339(),
1254                    });
1255                    let _ = state.message_events.send(event);
1256                }
1257
1258                Json(serde_json::json!({
1259                    "success": true,
1260                    "message": format!("Message produced to topic '{}'", request.topic),
1261                    "topic": request.topic,
1262                    "partition": partition_id,
1263                    "offset": offset
1264                }))
1265                .into_response()
1266            }
1267            Err(e) => (
1268                StatusCode::INTERNAL_SERVER_ERROR,
1269                Json(serde_json::json!({
1270                    "error": "Failed to produce message",
1271                    "message": e.to_string()
1272                })),
1273            )
1274                .into_response(),
1275        }
1276    } else {
1277        (
1278            StatusCode::SERVICE_UNAVAILABLE,
1279            Json(serde_json::json!({
1280                "error": "Kafka broker not available",
1281                "message": "Kafka broker is not enabled or not available."
1282            })),
1283        )
1284            .into_response()
1285    }
1286}
1287
1288#[cfg(feature = "kafka")]
1289#[derive(Debug, Deserialize)]
1290pub struct KafkaBatchProduceRequest {
1291    /// List of messages to produce
1292    pub messages: Vec<KafkaProduceRequest>,
1293    /// Delay between messages in milliseconds
1294    #[serde(default = "default_delay")]
1295    pub delay_ms: u64,
1296}
1297
1298#[cfg(feature = "kafka")]
1299/// Produce multiple messages to Kafka topics
1300async fn produce_kafka_batch(
1301    State(state): State<ManagementState>,
1302    Json(request): Json<KafkaBatchProduceRequest>,
1303) -> impl IntoResponse {
1304    if let Some(broker) = &state.kafka_broker {
1305        if request.messages.is_empty() {
1306            return (
1307                StatusCode::BAD_REQUEST,
1308                Json(serde_json::json!({
1309                    "error": "Empty batch",
1310                    "message": "At least one message is required"
1311                })),
1312            )
1313                .into_response();
1314        }
1315
1316        let mut results = Vec::new();
1317
1318        for (index, msg_request) in request.messages.iter().enumerate() {
1319            let mut topics = broker.topics.write().await;
1320
1321            // Get or create the topic
1322            let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
1323                crate::topics::Topic::new(
1324                    msg_request.topic.clone(),
1325                    crate::topics::TopicConfig::default(),
1326                )
1327            });
1328
1329            // Determine partition
1330            let partition_id = if let Some(partition) = msg_request.partition {
1331                partition
1332            } else {
1333                topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
1334            };
1335
1336            // Validate partition exists
1337            if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
1338                results.push(serde_json::json!({
1339                    "index": index,
1340                    "success": false,
1341                    "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
1342                }));
1343                continue;
1344            }
1345
1346            // Create the message
1347            let message = crate::partitions::KafkaMessage {
1348                offset: 0,
1349                timestamp: chrono::Utc::now().timestamp_millis(),
1350                key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
1351                value: msg_request.value.as_bytes().to_vec(),
1352                headers: msg_request
1353                    .headers
1354                    .clone()
1355                    .unwrap_or_default()
1356                    .into_iter()
1357                    .map(|(k, v)| (k, v.as_bytes().to_vec()))
1358                    .collect(),
1359            };
1360
1361            // Produce to partition
1362            match topic_entry.produce(partition_id, message).await {
1363                Ok(offset) => {
1364                    broker.metrics.record_messages_produced(1);
1365
1366                    // Emit message event
1367                    let event = MessageEvent::Kafka(KafkaMessageEvent {
1368                        topic: msg_request.topic.clone(),
1369                        key: msg_request.key.clone(),
1370                        value: msg_request.value.clone(),
1371                        partition: partition_id,
1372                        offset,
1373                        headers: msg_request.headers.clone(),
1374                        timestamp: chrono::Utc::now().to_rfc3339(),
1375                    });
1376                    let _ = state.message_events.send(event);
1377
1378                    results.push(serde_json::json!({
1379                        "index": index,
1380                        "success": true,
1381                        "topic": msg_request.topic,
1382                        "partition": partition_id,
1383                        "offset": offset
1384                    }));
1385                }
1386                Err(e) => {
1387                    results.push(serde_json::json!({
1388                        "index": index,
1389                        "success": false,
1390                        "error": e.to_string()
1391                    }));
1392                }
1393            }
1394
1395            // Add delay between messages (except for the last one)
1396            if index < request.messages.len() - 1 && request.delay_ms > 0 {
1397                tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
1398            }
1399        }
1400
1401        let success_count =
1402            results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1403
1404        Json(serde_json::json!({
1405            "success": true,
1406            "total": request.messages.len(),
1407            "succeeded": success_count,
1408            "failed": request.messages.len() - success_count,
1409            "results": results
1410        }))
1411        .into_response()
1412    } else {
1413        (
1414            StatusCode::SERVICE_UNAVAILABLE,
1415            Json(serde_json::json!({
1416                "error": "Kafka broker not available",
1417                "message": "Kafka broker is not enabled or not available."
1418            })),
1419        )
1420            .into_response()
1421    }
1422}
1423
1424// ========== Real-time Message Streaming (SSE) ==========
1425
1426#[cfg(feature = "mqtt")]
1427/// SSE stream for MQTT messages
1428async fn mqtt_messages_stream(
1429    State(state): State<ManagementState>,
1430    Query(params): Query<std::collections::HashMap<String, String>>,
1431) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1432    let mut rx = state.message_events.subscribe();
1433    let topic_filter = params.get("topic").cloned();
1434
1435    let stream = stream::unfold(rx, move |mut rx| {
1436        let topic_filter = topic_filter.clone();
1437
1438        async move {
1439            loop {
1440                match rx.recv().await {
1441                    Ok(MessageEvent::Mqtt(event)) => {
1442                        // Apply topic filter if specified
1443                        if let Some(filter) = &topic_filter {
1444                            if !event.topic.contains(filter) {
1445                                continue;
1446                            }
1447                        }
1448
1449                        let event_json = serde_json::json!({
1450                            "protocol": "mqtt",
1451                            "topic": event.topic,
1452                            "payload": event.payload,
1453                            "qos": event.qos,
1454                            "retain": event.retain,
1455                            "timestamp": event.timestamp,
1456                        });
1457
1458                        if let Ok(event_data) = serde_json::to_string(&event_json) {
1459                            let sse_event = Event::default().event("mqtt_message").data(event_data);
1460                            return Some((Ok(sse_event), rx));
1461                        }
1462                    }
1463                    #[cfg(feature = "kafka")]
1464                    Ok(MessageEvent::Kafka(_)) => {
1465                        // Skip Kafka events in MQTT stream
1466                        continue;
1467                    }
1468                    Err(broadcast::error::RecvError::Closed) => {
1469                        return None;
1470                    }
1471                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
1472                        warn!("MQTT message stream lagged, skipped {} messages", skipped);
1473                        continue;
1474                    }
1475                }
1476            }
1477        }
1478    });
1479
1480    Sse::new(stream).keep_alive(
1481        axum::response::sse::KeepAlive::new()
1482            .interval(std::time::Duration::from_secs(15))
1483            .text("keep-alive-text"),
1484    )
1485}
1486
1487#[cfg(feature = "kafka")]
1488/// SSE stream for Kafka messages
1489async fn kafka_messages_stream(
1490    State(state): State<ManagementState>,
1491    Query(params): Query<std::collections::HashMap<String, String>>,
1492) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1493    let mut rx = state.message_events.subscribe();
1494    let topic_filter = params.get("topic").cloned();
1495
1496    let stream = stream::unfold(rx, move |mut rx| {
1497        let topic_filter = topic_filter.clone();
1498
1499        async move {
1500            loop {
1501                match rx.recv().await {
1502                    #[cfg(feature = "mqtt")]
1503                    Ok(MessageEvent::Mqtt(_)) => {
1504                        // Skip MQTT events in Kafka stream
1505                        continue;
1506                    }
1507                    Ok(MessageEvent::Kafka(event)) => {
1508                        // Apply topic filter if specified
1509                        if let Some(filter) = &topic_filter {
1510                            if !event.topic.contains(filter) {
1511                                continue;
1512                            }
1513                        }
1514
1515                        let event_json = serde_json::json!({
1516                            "protocol": "kafka",
1517                            "topic": event.topic,
1518                            "key": event.key,
1519                            "value": event.value,
1520                            "partition": event.partition,
1521                            "offset": event.offset,
1522                            "headers": event.headers,
1523                            "timestamp": event.timestamp,
1524                        });
1525
1526                        if let Ok(event_data) = serde_json::to_string(&event_json) {
1527                            let sse_event =
1528                                Event::default().event("kafka_message").data(event_data);
1529                            return Some((Ok(sse_event), rx));
1530                        }
1531                    }
1532                    Err(broadcast::error::RecvError::Closed) => {
1533                        return None;
1534                    }
1535                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
1536                        warn!("Kafka message stream lagged, skipped {} messages", skipped);
1537                        continue;
1538                    }
1539                }
1540            }
1541        }
1542    });
1543
1544    Sse::new(stream).keep_alive(
1545        axum::response::sse::KeepAlive::new()
1546            .interval(std::time::Duration::from_secs(15))
1547            .text("keep-alive-text"),
1548    )
1549}
1550
1551// ========== AI-Powered Features ==========
1552
1553/// Request for AI-powered API specification generation
1554#[derive(Debug, Deserialize)]
1555pub struct GenerateSpecRequest {
1556    /// Natural language description of the API to generate
1557    pub query: String,
1558    /// Type of specification to generate: "openapi", "graphql", or "asyncapi"
1559    pub spec_type: String,
1560    /// Optional API version (e.g., "3.0.0" for OpenAPI)
1561    pub api_version: Option<String>,
1562}
1563
1564/// Generate API specification from natural language using AI
1565#[cfg(feature = "data-faker")]
1566async fn generate_ai_spec(
1567    State(_state): State<ManagementState>,
1568    Json(request): Json<GenerateSpecRequest>,
1569) -> impl IntoResponse {
1570    use mockforge_data::rag::{
1571        config::{EmbeddingProvider, LlmProvider, RagConfig},
1572        engine::RagEngine,
1573        storage::{DocumentStorage, StorageFactory},
1574    };
1575    use std::sync::Arc;
1576
1577    // Build RAG config from environment variables
1578    let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
1579        .ok()
1580        .or_else(|| std::env::var("OPENAI_API_KEY").ok());
1581
1582    // Check if RAG is configured - require API key
1583    if api_key.is_none() {
1584        return (
1585            StatusCode::SERVICE_UNAVAILABLE,
1586            Json(serde_json::json!({
1587                "error": "AI service not configured",
1588                "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
1589            })),
1590        )
1591            .into_response();
1592    }
1593
1594    // Build RAG configuration
1595    let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
1596        .unwrap_or_else(|_| "openai".to_string())
1597        .to_lowercase();
1598
1599    let provider = match provider_str.as_str() {
1600        "openai" => LlmProvider::OpenAI,
1601        "anthropic" => LlmProvider::Anthropic,
1602        "ollama" => LlmProvider::Ollama,
1603        "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
1604        _ => LlmProvider::OpenAI,
1605    };
1606
1607    let api_endpoint =
1608        std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
1609            LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
1610            LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
1611            LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
1612            LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
1613        });
1614
1615    let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
1616        LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
1617        LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
1618        LlmProvider::Ollama => "llama2".to_string(),
1619        LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
1620    });
1621
1622    // Build RagConfig using default() and override fields
1623    let mut rag_config = RagConfig::default();
1624    rag_config.provider = provider;
1625    rag_config.api_endpoint = api_endpoint;
1626    rag_config.api_key = api_key;
1627    rag_config.model = model;
1628    rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
1629        .unwrap_or_else(|_| "4096".to_string())
1630        .parse()
1631        .unwrap_or(4096);
1632    rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
1633        .unwrap_or_else(|_| "0.3".to_string())
1634        .parse()
1635        .unwrap_or(0.3); // Lower temperature for more structured output
1636    rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
1637        .unwrap_or_else(|_| "60".to_string())
1638        .parse()
1639        .unwrap_or(60);
1640    rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
1641        .unwrap_or_else(|_| "4000".to_string())
1642        .parse()
1643        .unwrap_or(4000);
1644
1645    // Build the prompt for spec generation
1646    let spec_type_label = match request.spec_type.as_str() {
1647        "openapi" => "OpenAPI 3.0",
1648        "graphql" => "GraphQL",
1649        "asyncapi" => "AsyncAPI",
1650        _ => "OpenAPI 3.0",
1651    };
1652
1653    let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
1654
1655    let prompt = format!(
1656        r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
1657
1658User Requirements:
1659{}
1660
1661Instructions:
16621. Generate a complete, valid {} specification
16632. Include all paths, operations, request/response schemas, and components
16643. Use realistic field names and data types
16654. Include proper descriptions and examples
16665. Follow {} best practices
16676. Return ONLY the specification, no additional explanation
16687. For OpenAPI, use version {}
1669
1670Return the specification in {} format."#,
1671        spec_type_label,
1672        request.query,
1673        spec_type_label,
1674        spec_type_label,
1675        api_version,
1676        if request.spec_type == "graphql" {
1677            "GraphQL SDL"
1678        } else {
1679            "YAML"
1680        }
1681    );
1682
1683    // Create in-memory storage for RAG engine
1684    // Note: StorageFactory::create_memory() returns Box<dyn DocumentStorage>
1685    // We need to use unsafe transmute or create a wrapper, but for now we'll use
1686    // a simpler approach: create InMemoryStorage directly
1687    use mockforge_data::rag::storage::InMemoryStorage;
1688    let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
1689
1690    // Create RAG engine
1691    let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
1692        Ok(engine) => engine,
1693        Err(e) => {
1694            return (
1695                StatusCode::INTERNAL_SERVER_ERROR,
1696                Json(serde_json::json!({
1697                    "error": "Failed to initialize RAG engine",
1698                    "message": e.to_string()
1699                })),
1700            )
1701                .into_response();
1702        }
1703    };
1704
1705    // Generate using RAG engine
1706    match rag_engine.generate(&prompt, None).await {
1707        Ok(generated_text) => {
1708            // Try to extract just the YAML/JSON/SDL content if LLM added explanation
1709            let spec = if request.spec_type == "graphql" {
1710                // For GraphQL, extract SDL
1711                extract_graphql_schema(&generated_text)
1712            } else {
1713                // For OpenAPI/AsyncAPI, extract YAML
1714                extract_yaml_spec(&generated_text)
1715            };
1716
1717            Json(serde_json::json!({
1718                "success": true,
1719                "spec": spec,
1720                "spec_type": request.spec_type,
1721            }))
1722            .into_response()
1723        }
1724        Err(e) => (
1725            StatusCode::INTERNAL_SERVER_ERROR,
1726            Json(serde_json::json!({
1727                "error": "AI generation failed",
1728                "message": e.to_string()
1729            })),
1730        )
1731            .into_response(),
1732    }
1733}
1734
1735#[cfg(not(feature = "data-faker"))]
1736async fn generate_ai_spec(
1737    State(_state): State<ManagementState>,
1738    Json(_request): Json<GenerateSpecRequest>,
1739) -> impl IntoResponse {
1740    (
1741        StatusCode::NOT_IMPLEMENTED,
1742        Json(serde_json::json!({
1743            "error": "AI features not enabled",
1744            "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
1745        })),
1746    )
1747        .into_response()
1748}
1749
1750fn extract_yaml_spec(text: &str) -> String {
1751    // Try to find YAML code blocks
1752    if let Some(start) = text.find("```yaml") {
1753        let yaml_start = text[start + 7..].trim_start();
1754        if let Some(end) = yaml_start.find("```") {
1755            return yaml_start[..end].trim().to_string();
1756        }
1757    }
1758    if let Some(start) = text.find("```") {
1759        let content_start = text[start + 3..].trim_start();
1760        if let Some(end) = content_start.find("```") {
1761            return content_start[..end].trim().to_string();
1762        }
1763    }
1764
1765    // Check if it starts with openapi: or asyncapi:
1766    if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
1767        return text.trim().to_string();
1768    }
1769
1770    // Return as-is if no code blocks found
1771    text.trim().to_string()
1772}
1773
1774fn extract_graphql_schema(text: &str) -> String {
1775    // Try to find GraphQL code blocks
1776    if let Some(start) = text.find("```graphql") {
1777        let schema_start = text[start + 10..].trim_start();
1778        if let Some(end) = schema_start.find("```") {
1779            return schema_start[..end].trim().to_string();
1780        }
1781    }
1782    if let Some(start) = text.find("```") {
1783        let content_start = text[start + 3..].trim_start();
1784        if let Some(end) = content_start.find("```") {
1785            return content_start[..end].trim().to_string();
1786        }
1787    }
1788
1789    // Check if it looks like GraphQL SDL (starts with type, schema, etc.)
1790    if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
1791        return text.trim().to_string();
1792    }
1793
1794    text.trim().to_string()
1795}
1796
1797// ========== Chaos Engineering Management ==========
1798
1799/// Get current chaos engineering configuration
1800async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
1801    // TODO: Get from state when chaos config is stored
1802    Json(serde_json::json!({
1803        "enabled": false,
1804        "latency": null,
1805        "fault_injection": null,
1806        "rate_limit": null,
1807        "traffic_shaping": null,
1808    }))
1809    .into_response()
1810}
1811
1812/// Request to update chaos configuration
1813#[derive(Debug, Deserialize)]
1814pub struct ChaosConfigUpdate {
1815    /// Whether to enable chaos engineering
1816    pub enabled: Option<bool>,
1817    /// Latency configuration
1818    pub latency: Option<serde_json::Value>,
1819    /// Fault injection configuration
1820    pub fault_injection: Option<serde_json::Value>,
1821    /// Rate limiting configuration
1822    pub rate_limit: Option<serde_json::Value>,
1823    /// Traffic shaping configuration
1824    pub traffic_shaping: Option<serde_json::Value>,
1825}
1826
1827/// Update chaos engineering configuration
1828async fn update_chaos_config(
1829    State(_state): State<ManagementState>,
1830    Json(config): Json<ChaosConfigUpdate>,
1831) -> impl IntoResponse {
1832    // TODO: Apply chaos config to server
1833    Json(serde_json::json!({
1834        "success": true,
1835        "message": "Chaos configuration updated"
1836    }))
1837    .into_response()
1838}
1839
1840// ========== Network Profile Management ==========
1841
1842/// List available network profiles
1843async fn list_network_profiles() -> impl IntoResponse {
1844    use mockforge_core::network_profiles::NetworkProfileCatalog;
1845
1846    let catalog = NetworkProfileCatalog::default();
1847    let profiles: Vec<serde_json::Value> = catalog
1848        .list_profiles_with_description()
1849        .iter()
1850        .map(|(name, description)| {
1851            serde_json::json!({
1852                "name": name,
1853                "description": description,
1854            })
1855        })
1856        .collect();
1857
1858    Json(serde_json::json!({
1859        "profiles": profiles
1860    }))
1861    .into_response()
1862}
1863
1864#[derive(Debug, Deserialize)]
1865/// Request to apply a network profile
1866pub struct ApplyNetworkProfileRequest {
1867    /// Name of the network profile to apply
1868    pub profile_name: String,
1869}
1870
1871/// Apply a network profile
1872async fn apply_network_profile(
1873    State(_state): State<ManagementState>,
1874    Json(request): Json<ApplyNetworkProfileRequest>,
1875) -> impl IntoResponse {
1876    use mockforge_core::network_profiles::NetworkProfileCatalog;
1877
1878    let catalog = NetworkProfileCatalog::default();
1879    if let Some(profile) = catalog.get(&request.profile_name) {
1880        // TODO: Apply profile to server configuration
1881        Json(serde_json::json!({
1882            "success": true,
1883            "message": format!("Network profile '{}' applied", request.profile_name),
1884            "profile": {
1885                "name": profile.name,
1886                "description": profile.description,
1887            }
1888        }))
1889        .into_response()
1890    } else {
1891        (
1892            StatusCode::NOT_FOUND,
1893            Json(serde_json::json!({
1894                "error": "Profile not found",
1895                "message": format!("Network profile '{}' not found", request.profile_name)
1896            })),
1897        )
1898            .into_response()
1899    }
1900}
1901
1902/// Build the management API router with UI Builder support
1903pub fn management_router_with_ui_builder(
1904    state: ManagementState,
1905    server_config: mockforge_core::config::ServerConfig,
1906) -> Router {
1907    use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
1908
1909    // Create the base management router
1910    let management = management_router(state);
1911
1912    // Create UI Builder state and router
1913    let ui_builder_state = UIBuilderState::new(server_config);
1914    let ui_builder = create_ui_builder_router(ui_builder_state);
1915
1916    // Nest UI Builder under /ui-builder
1917    management.nest("/ui-builder", ui_builder)
1918}
1919
1920/// Build management router with spec import API
1921pub fn management_router_with_spec_import(state: ManagementState) -> Router {
1922    use crate::spec_import::{spec_import_router, SpecImportState};
1923
1924    // Create base management router
1925    let management = management_router(state);
1926
1927    // Merge with spec import router
1928    Router::new()
1929        .merge(management)
1930        .merge(spec_import_router(SpecImportState::new()))
1931}
1932
1933#[cfg(test)]
1934mod tests {
1935    use super::*;
1936
1937    #[tokio::test]
1938    async fn test_create_and_get_mock() {
1939        let state = ManagementState::new(None, None, 3000);
1940
1941        let mock = MockConfig {
1942            id: "test-1".to_string(),
1943            name: "Test Mock".to_string(),
1944            method: "GET".to_string(),
1945            path: "/test".to_string(),
1946            response: MockResponse {
1947                body: serde_json::json!({"message": "test"}),
1948                headers: None,
1949            },
1950            enabled: true,
1951            latency_ms: None,
1952            status_code: Some(200),
1953        };
1954
1955        // Create mock
1956        {
1957            let mut mocks = state.mocks.write().await;
1958            mocks.push(mock.clone());
1959        }
1960
1961        // Get mock
1962        let mocks = state.mocks.read().await;
1963        let found = mocks.iter().find(|m| m.id == "test-1");
1964        assert!(found.is_some());
1965        assert_eq!(found.unwrap().name, "Test Mock");
1966    }
1967
1968    #[tokio::test]
1969    async fn test_server_stats() {
1970        let state = ManagementState::new(None, None, 3000);
1971
1972        // Add some mocks
1973        {
1974            let mut mocks = state.mocks.write().await;
1975            mocks.push(MockConfig {
1976                id: "1".to_string(),
1977                name: "Mock 1".to_string(),
1978                method: "GET".to_string(),
1979                path: "/test1".to_string(),
1980                response: MockResponse {
1981                    body: serde_json::json!({}),
1982                    headers: None,
1983                },
1984                enabled: true,
1985                latency_ms: None,
1986                status_code: Some(200),
1987            });
1988            mocks.push(MockConfig {
1989                id: "2".to_string(),
1990                name: "Mock 2".to_string(),
1991                method: "POST".to_string(),
1992                path: "/test2".to_string(),
1993                response: MockResponse {
1994                    body: serde_json::json!({}),
1995                    headers: None,
1996                },
1997                enabled: false,
1998                latency_ms: None,
1999                status_code: Some(201),
2000            });
2001        }
2002
2003        let mocks = state.mocks.read().await;
2004        assert_eq!(mocks.len(), 2);
2005        assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
2006    }
2007}