1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json, Response,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17#[cfg(feature = "smtp")]
18use mockforge_smtp::EmailSearchFilters;
19use serde::{Deserialize, Serialize};
20use std::convert::Infallible;
21use std::sync::Arc;
22use tokio::sync::{broadcast, RwLock};
23use tracing::*;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "protocol", content = "data")]
28#[serde(rename_all = "lowercase")]
29pub enum MessageEvent {
30 #[cfg(feature = "mqtt")]
31 Mqtt(MqttMessageEvent),
33 #[cfg(feature = "kafka")]
34 Kafka(KafkaMessageEvent),
36}
37
38#[cfg(feature = "mqtt")]
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct MqttMessageEvent {
42 pub topic: String,
44 pub payload: String,
46 pub qos: u8,
48 pub retain: bool,
50 pub timestamp: String,
52}
53
54#[cfg(feature = "kafka")]
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct KafkaMessageEvent {
57 pub topic: String,
58 pub key: Option<String>,
59 pub value: String,
60 pub partition: i32,
61 pub offset: i64,
62 pub headers: Option<std::collections::HashMap<String, String>>,
63 pub timestamp: String,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct MockConfig {
69 pub id: String,
71 pub name: String,
73 pub method: String,
75 pub path: String,
77 pub response: MockResponse,
79 pub enabled: bool,
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub latency_ms: Option<u64>,
84 #[serde(skip_serializing_if = "Option::is_none")]
86 pub status_code: Option<u16>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct MockResponse {
92 pub body: serde_json::Value,
94 #[serde(skip_serializing_if = "Option::is_none")]
96 pub headers: Option<std::collections::HashMap<String, String>>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct ServerStats {
102 pub uptime_seconds: u64,
104 pub total_requests: u64,
106 pub active_mocks: usize,
108 pub enabled_mocks: usize,
110 pub registered_routes: usize,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ServerConfig {
117 pub version: String,
119 pub port: u16,
121 pub has_openapi_spec: bool,
123 #[serde(skip_serializing_if = "Option::is_none")]
125 pub spec_path: Option<String>,
126}
127
128#[derive(Clone)]
130pub struct ManagementState {
131 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
133 pub spec: Option<Arc<OpenApiSpec>>,
135 pub spec_path: Option<String>,
137 pub port: u16,
139 pub start_time: std::time::Instant,
141 pub request_counter: Arc<RwLock<u64>>,
143 #[cfg(feature = "smtp")]
145 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
146 #[cfg(feature = "mqtt")]
148 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
149 #[cfg(feature = "kafka")]
151 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
152 #[cfg(any(feature = "mqtt", feature = "kafka"))]
154 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
155}
156
157impl ManagementState {
158 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
165 Self {
166 mocks: Arc::new(RwLock::new(Vec::new())),
167 spec,
168 spec_path,
169 port,
170 start_time: std::time::Instant::now(),
171 request_counter: Arc::new(RwLock::new(0)),
172 #[cfg(feature = "smtp")]
173 smtp_registry: None,
174 #[cfg(feature = "mqtt")]
175 mqtt_broker: None,
176 #[cfg(feature = "kafka")]
177 kafka_broker: None,
178 #[cfg(any(feature = "mqtt", feature = "kafka"))]
179 message_events: {
180 let (tx, _) = broadcast::channel(1000);
181 Arc::new(tx)
182 },
183 }
184 }
185
186 #[cfg(feature = "smtp")]
187 pub fn with_smtp_registry(
189 mut self,
190 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
191 ) -> Self {
192 self.smtp_registry = Some(smtp_registry);
193 self
194 }
195
196 #[cfg(feature = "mqtt")]
197 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
199 self.mqtt_broker = Some(mqtt_broker);
200 self
201 }
202
203 #[cfg(feature = "kafka")]
204 pub fn with_kafka_broker(
206 mut self,
207 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
208 ) -> Self {
209 self.kafka_broker = Some(kafka_broker);
210 self
211 }
212}
213
214async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
216 let mocks = state.mocks.read().await;
217 Json(serde_json::json!({
218 "mocks": *mocks,
219 "total": mocks.len(),
220 "enabled": mocks.iter().filter(|m| m.enabled).count()
221 }))
222}
223
224async fn get_mock(
226 State(state): State<ManagementState>,
227 Path(id): Path<String>,
228) -> Result<Json<MockConfig>, StatusCode> {
229 let mocks = state.mocks.read().await;
230 mocks
231 .iter()
232 .find(|m| m.id == id)
233 .cloned()
234 .map(Json)
235 .ok_or(StatusCode::NOT_FOUND)
236}
237
238async fn create_mock(
240 State(state): State<ManagementState>,
241 Json(mut mock): Json<MockConfig>,
242) -> Result<Json<MockConfig>, StatusCode> {
243 let mut mocks = state.mocks.write().await;
244
245 if mock.id.is_empty() {
247 mock.id = uuid::Uuid::new_v4().to_string();
248 }
249
250 if mocks.iter().any(|m| m.id == mock.id) {
252 return Err(StatusCode::CONFLICT);
253 }
254
255 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
256 mocks.push(mock.clone());
257 Ok(Json(mock))
258}
259
260async fn update_mock(
262 State(state): State<ManagementState>,
263 Path(id): Path<String>,
264 Json(updated_mock): Json<MockConfig>,
265) -> Result<Json<MockConfig>, StatusCode> {
266 let mut mocks = state.mocks.write().await;
267
268 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
269
270 info!("Updating mock: {}", id);
271 mocks[position] = updated_mock.clone();
272 Ok(Json(updated_mock))
273}
274
275async fn delete_mock(
277 State(state): State<ManagementState>,
278 Path(id): Path<String>,
279) -> Result<StatusCode, StatusCode> {
280 let mut mocks = state.mocks.write().await;
281
282 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
283
284 info!("Deleting mock: {}", id);
285 mocks.remove(position);
286 Ok(StatusCode::NO_CONTENT)
287}
288
289async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
291 let mocks = state.mocks.read().await;
292 let request_count = *state.request_counter.read().await;
293
294 Json(ServerStats {
295 uptime_seconds: state.start_time.elapsed().as_secs(),
296 total_requests: request_count,
297 active_mocks: mocks.len(),
298 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
299 registered_routes: mocks.len(), })
301}
302
303async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
305 Json(ServerConfig {
306 version: env!("CARGO_PKG_VERSION").to_string(),
307 port: state.port,
308 has_openapi_spec: state.spec.is_some(),
309 spec_path: state.spec_path.clone(),
310 })
311}
312
313async fn health_check() -> Json<serde_json::Value> {
315 Json(serde_json::json!({
316 "status": "healthy",
317 "service": "mockforge-management",
318 "timestamp": chrono::Utc::now().to_rfc3339()
319 }))
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324#[serde(rename_all = "lowercase")]
325pub enum ExportFormat {
326 Json,
328 Yaml,
330}
331
332async fn export_mocks(
334 State(state): State<ManagementState>,
335 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
336) -> Result<(StatusCode, String), StatusCode> {
337 let mocks = state.mocks.read().await;
338
339 let format = params
340 .get("format")
341 .map(|f| match f.as_str() {
342 "yaml" | "yml" => ExportFormat::Yaml,
343 _ => ExportFormat::Json,
344 })
345 .unwrap_or(ExportFormat::Json);
346
347 match format {
348 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
349 .map(|json| (StatusCode::OK, json))
350 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
351 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
352 .map(|yaml| (StatusCode::OK, yaml))
353 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
354 }
355}
356
357async fn import_mocks(
359 State(state): State<ManagementState>,
360 Json(mocks): Json<Vec<MockConfig>>,
361) -> impl IntoResponse {
362 let mut current_mocks = state.mocks.write().await;
363 current_mocks.clear();
364 current_mocks.extend(mocks);
365 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
366}
367
368#[cfg(feature = "smtp")]
369async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
371 if let Some(ref smtp_registry) = state.smtp_registry {
372 match smtp_registry.get_emails() {
373 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
374 Err(e) => (
375 StatusCode::INTERNAL_SERVER_ERROR,
376 Json(serde_json::json!({
377 "error": "Failed to retrieve emails",
378 "message": e.to_string()
379 })),
380 ),
381 }
382 } else {
383 (
384 StatusCode::NOT_IMPLEMENTED,
385 Json(serde_json::json!({
386 "error": "SMTP mailbox management not available",
387 "message": "SMTP server is not enabled or registry not available."
388 })),
389 )
390 }
391}
392
393#[cfg(feature = "smtp")]
395async fn get_smtp_email(
396 State(state): State<ManagementState>,
397 Path(id): Path<String>,
398) -> impl IntoResponse {
399 if let Some(ref smtp_registry) = state.smtp_registry {
400 match smtp_registry.get_email_by_id(&id) {
401 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
402 Ok(None) => (
403 StatusCode::NOT_FOUND,
404 Json(serde_json::json!({
405 "error": "Email not found",
406 "id": id
407 })),
408 ),
409 Err(e) => (
410 StatusCode::INTERNAL_SERVER_ERROR,
411 Json(serde_json::json!({
412 "error": "Failed to retrieve email",
413 "message": e.to_string()
414 })),
415 ),
416 }
417 } else {
418 (
419 StatusCode::NOT_IMPLEMENTED,
420 Json(serde_json::json!({
421 "error": "SMTP mailbox management not available",
422 "message": "SMTP server is not enabled or registry not available."
423 })),
424 )
425 }
426}
427
428#[cfg(feature = "smtp")]
430async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
431 if let Some(ref smtp_registry) = state.smtp_registry {
432 match smtp_registry.clear_mailbox() {
433 Ok(()) => (
434 StatusCode::OK,
435 Json(serde_json::json!({
436 "message": "Mailbox cleared successfully"
437 })),
438 ),
439 Err(e) => (
440 StatusCode::INTERNAL_SERVER_ERROR,
441 Json(serde_json::json!({
442 "error": "Failed to clear mailbox",
443 "message": e.to_string()
444 })),
445 ),
446 }
447 } else {
448 (
449 StatusCode::NOT_IMPLEMENTED,
450 Json(serde_json::json!({
451 "error": "SMTP mailbox management not available",
452 "message": "SMTP server is not enabled or registry not available."
453 })),
454 )
455 }
456}
457
458#[cfg(feature = "smtp")]
460async fn export_smtp_mailbox(
461 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
462) -> impl IntoResponse {
463 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
464 (
465 StatusCode::NOT_IMPLEMENTED,
466 Json(serde_json::json!({
467 "error": "SMTP mailbox management not available via HTTP API",
468 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
469 "requested_format": format
470 })),
471 )
472}
473
474#[cfg(feature = "smtp")]
476async fn search_smtp_emails(
477 State(state): State<ManagementState>,
478 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
479) -> impl IntoResponse {
480 if let Some(ref smtp_registry) = state.smtp_registry {
481 let filters = EmailSearchFilters {
482 sender: params.get("sender").cloned(),
483 recipient: params.get("recipient").cloned(),
484 subject: params.get("subject").cloned(),
485 body: params.get("body").cloned(),
486 since: params
487 .get("since")
488 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
489 .map(|dt| dt.with_timezone(&chrono::Utc)),
490 until: params
491 .get("until")
492 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
493 .map(|dt| dt.with_timezone(&chrono::Utc)),
494 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
495 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
496 };
497
498 match smtp_registry.search_emails(filters) {
499 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
500 Err(e) => (
501 StatusCode::INTERNAL_SERVER_ERROR,
502 Json(serde_json::json!({
503 "error": "Failed to search emails",
504 "message": e.to_string()
505 })),
506 ),
507 }
508 } else {
509 (
510 StatusCode::NOT_IMPLEMENTED,
511 Json(serde_json::json!({
512 "error": "SMTP mailbox management not available",
513 "message": "SMTP server is not enabled or registry not available."
514 })),
515 )
516 }
517}
518
519#[cfg(feature = "mqtt")]
521#[derive(Debug, Clone, Serialize, Deserialize)]
522pub struct MqttBrokerStats {
523 pub connected_clients: usize,
525 pub active_topics: usize,
527 pub retained_messages: usize,
529 pub total_subscriptions: usize,
531}
532
533#[cfg(feature = "mqtt")]
535async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
536 if let Some(broker) = &state.mqtt_broker {
537 let connected_clients = broker.get_connected_clients().await.len();
538 let active_topics = broker.get_active_topics().await.len();
539 let stats = broker.get_topic_stats().await;
540
541 let broker_stats = MqttBrokerStats {
542 connected_clients,
543 active_topics,
544 retained_messages: stats.retained_messages,
545 total_subscriptions: stats.total_subscriptions,
546 };
547
548 Json(broker_stats).into_response()
549 } else {
550 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
551 }
552}
553
554#[cfg(feature = "mqtt")]
555async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
556 if let Some(broker) = &state.mqtt_broker {
557 let clients = broker.get_connected_clients().await;
558 Json(serde_json::json!({
559 "clients": clients
560 }))
561 .into_response()
562 } else {
563 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
564 }
565}
566
567#[cfg(feature = "mqtt")]
568async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
569 if let Some(broker) = &state.mqtt_broker {
570 let topics = broker.get_active_topics().await;
571 Json(serde_json::json!({
572 "topics": topics
573 }))
574 .into_response()
575 } else {
576 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
577 }
578}
579
580#[cfg(feature = "mqtt")]
581async fn disconnect_mqtt_client(
582 State(state): State<ManagementState>,
583 Path(client_id): Path<String>,
584) -> impl IntoResponse {
585 if let Some(broker) = &state.mqtt_broker {
586 match broker.disconnect_client(&client_id).await {
587 Ok(_) => {
588 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
589 }
590 Err(e) => {
591 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
592 .into_response()
593 }
594 }
595 } else {
596 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
597 }
598}
599
600#[cfg(feature = "mqtt")]
603#[derive(Debug, Deserialize)]
605pub struct MqttPublishRequest {
606 pub topic: String,
608 pub payload: String,
610 #[serde(default = "default_qos")]
612 pub qos: u8,
613 #[serde(default)]
615 pub retain: bool,
616}
617
618#[cfg(feature = "mqtt")]
619fn default_qos() -> u8 {
620 0
621}
622
623#[cfg(feature = "mqtt")]
624async fn publish_mqtt_message_handler(
626 State(state): State<ManagementState>,
627 Json(request): Json<serde_json::Value>,
628) -> impl IntoResponse {
629 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
631 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
632 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
633 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
634
635 if topic.is_none() || payload.is_none() {
636 return (
637 StatusCode::BAD_REQUEST,
638 Json(serde_json::json!({
639 "error": "Invalid request",
640 "message": "Missing required fields: topic and payload"
641 })),
642 );
643 }
644
645 let topic = topic.unwrap();
646 let payload = payload.unwrap();
647
648 if let Some(broker) = &state.mqtt_broker {
649 if qos > 2 {
651 return (
652 StatusCode::BAD_REQUEST,
653 Json(serde_json::json!({
654 "error": "Invalid QoS",
655 "message": "QoS must be 0, 1, or 2"
656 })),
657 );
658 }
659
660 let payload_bytes = payload.as_bytes().to_vec();
662 let client_id = "mockforge-management-api".to_string();
663
664 let publish_result = broker
665 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
666 .await
667 .map_err(|e| format!("{}", e));
668
669 match publish_result {
670 Ok(_) => {
671 let event = MessageEvent::Mqtt(MqttMessageEvent {
673 topic: topic.clone(),
674 payload: payload.clone(),
675 qos,
676 retain,
677 timestamp: chrono::Utc::now().to_rfc3339(),
678 });
679 let _ = state.message_events.send(event);
680
681 (
682 StatusCode::OK,
683 Json(serde_json::json!({
684 "success": true,
685 "message": format!("Message published to topic '{}'", topic),
686 "topic": topic,
687 "qos": qos,
688 "retain": retain
689 })),
690 )
691 }
692 Err(error_msg) => (
693 StatusCode::INTERNAL_SERVER_ERROR,
694 Json(serde_json::json!({
695 "error": "Failed to publish message",
696 "message": error_msg
697 })),
698 ),
699 }
700 } else {
701 (
702 StatusCode::SERVICE_UNAVAILABLE,
703 Json(serde_json::json!({
704 "error": "MQTT broker not available",
705 "message": "MQTT broker is not enabled or not available."
706 })),
707 )
708 }
709}
710
711#[cfg(not(feature = "mqtt"))]
712async fn publish_mqtt_message_handler(
714 State(_state): State<ManagementState>,
715 Json(_request): Json<serde_json::Value>,
716) -> impl IntoResponse {
717 (
718 StatusCode::SERVICE_UNAVAILABLE,
719 Json(serde_json::json!({
720 "error": "MQTT feature not enabled",
721 "message": "MQTT support is not compiled into this build"
722 })),
723 )
724}
725
726#[cfg(feature = "mqtt")]
727#[derive(Debug, Deserialize)]
729pub struct MqttBatchPublishRequest {
730 pub messages: Vec<MqttPublishRequest>,
732 #[serde(default = "default_delay")]
734 pub delay_ms: u64,
735}
736
737#[cfg(feature = "mqtt")]
738fn default_delay() -> u64 {
739 100
740}
741
742#[cfg(feature = "mqtt")]
743async fn publish_mqtt_batch_handler(
745 State(state): State<ManagementState>,
746 Json(request): Json<serde_json::Value>,
747) -> impl IntoResponse {
748 let messages_json = request.get("messages").and_then(|v| v.as_array());
750 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
751
752 if messages_json.is_none() {
753 return (
754 StatusCode::BAD_REQUEST,
755 Json(serde_json::json!({
756 "error": "Invalid request",
757 "message": "Missing required field: messages"
758 })),
759 );
760 }
761
762 let messages_json = messages_json.unwrap();
763
764 if let Some(broker) = &state.mqtt_broker {
765 if messages_json.is_empty() {
766 return (
767 StatusCode::BAD_REQUEST,
768 Json(serde_json::json!({
769 "error": "Empty batch",
770 "message": "At least one message is required"
771 })),
772 );
773 }
774
775 let mut results = Vec::new();
776 let client_id = "mockforge-management-api".to_string();
777
778 for (index, msg_json) in messages_json.iter().enumerate() {
779 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
780 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
781 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
782 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
783
784 if topic.is_none() || payload.is_none() {
785 results.push(serde_json::json!({
786 "index": index,
787 "success": false,
788 "error": "Missing required fields: topic and payload"
789 }));
790 continue;
791 }
792
793 let topic = topic.unwrap();
794 let payload = payload.unwrap();
795
796 if qos > 2 {
798 results.push(serde_json::json!({
799 "index": index,
800 "success": false,
801 "error": "Invalid QoS (must be 0, 1, or 2)"
802 }));
803 continue;
804 }
805
806 let payload_bytes = payload.as_bytes().to_vec();
808
809 let publish_result = broker
810 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
811 .await
812 .map_err(|e| format!("{}", e));
813
814 match publish_result {
815 Ok(_) => {
816 let event = MessageEvent::Mqtt(MqttMessageEvent {
818 topic: topic.clone(),
819 payload: payload.clone(),
820 qos,
821 retain,
822 timestamp: chrono::Utc::now().to_rfc3339(),
823 });
824 let _ = state.message_events.send(event);
825
826 results.push(serde_json::json!({
827 "index": index,
828 "success": true,
829 "topic": topic,
830 "qos": qos
831 }));
832 }
833 Err(error_msg) => {
834 results.push(serde_json::json!({
835 "index": index,
836 "success": false,
837 "error": error_msg
838 }));
839 }
840 }
841
842 if index < messages_json.len() - 1 && delay_ms > 0 {
844 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
845 }
846 }
847
848 let success_count =
849 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
850
851 (
852 StatusCode::OK,
853 Json(serde_json::json!({
854 "success": true,
855 "total": messages_json.len(),
856 "succeeded": success_count,
857 "failed": messages_json.len() - success_count,
858 "results": results
859 })),
860 )
861 } else {
862 (
863 StatusCode::SERVICE_UNAVAILABLE,
864 Json(serde_json::json!({
865 "error": "MQTT broker not available",
866 "message": "MQTT broker is not enabled or not available."
867 })),
868 )
869 }
870}
871
872#[cfg(not(feature = "mqtt"))]
873async fn publish_mqtt_batch_handler(
875 State(_state): State<ManagementState>,
876 Json(_request): Json<serde_json::Value>,
877) -> impl IntoResponse {
878 (
879 StatusCode::SERVICE_UNAVAILABLE,
880 Json(serde_json::json!({
881 "error": "MQTT feature not enabled",
882 "message": "MQTT support is not compiled into this build"
883 })),
884 )
885}
886
887pub fn management_router(state: ManagementState) -> Router {
889 let router = Router::new()
890 .route("/health", get(health_check))
891 .route("/stats", get(get_stats))
892 .route("/config", get(get_config))
893 .route("/mocks", get(list_mocks))
894 .route("/mocks", post(create_mock))
895 .route("/mocks/{id}", get(get_mock))
896 .route("/mocks/{id}", put(update_mock))
897 .route("/mocks/{id}", delete(delete_mock))
898 .route("/export", get(export_mocks))
899 .route("/import", post(import_mocks));
900
901 #[cfg(feature = "smtp")]
902 let router = router
903 .route("/smtp/mailbox", get(list_smtp_emails))
904 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
905 .route("/smtp/mailbox/{id}", get(get_smtp_email))
906 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
907 .route("/smtp/mailbox/search", get(search_smtp_emails));
908
909 #[cfg(not(feature = "smtp"))]
910 let router = router;
911
912 #[cfg(feature = "mqtt")]
914 let router = router
915 .route("/mqtt/stats", get(get_mqtt_stats))
916 .route("/mqtt/clients", get(get_mqtt_clients))
917 .route("/mqtt/topics", get(get_mqtt_topics))
918 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
919 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
920 .route("/mqtt/publish", post(publish_mqtt_message_handler))
921 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
922
923 #[cfg(not(feature = "mqtt"))]
924 let router = router
925 .route("/mqtt/publish", post(publish_mqtt_message_handler))
926 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
927
928 #[cfg(feature = "kafka")]
929 let router = router
930 .route("/kafka/stats", get(get_kafka_stats))
931 .route("/kafka/topics", get(get_kafka_topics))
932 .route("/kafka/topics/{topic}", get(get_kafka_topic))
933 .route("/kafka/groups", get(get_kafka_groups))
934 .route("/kafka/groups/{group_id}", get(get_kafka_group))
935 .route("/kafka/produce", post(produce_kafka_message))
936 .route("/kafka/produce/batch", post(produce_kafka_batch))
937 .route("/kafka/messages/stream", get(kafka_messages_stream));
938
939 #[cfg(not(feature = "kafka"))]
940 let router = router;
941
942 let router = router
944 .route("/ai/generate-spec", post(generate_ai_spec))
945 .route("/chaos/config", get(get_chaos_config))
946 .route("/chaos/config", post(update_chaos_config))
947 .route("/network/profiles", get(list_network_profiles))
948 .route("/network/profile/apply", post(apply_network_profile));
949
950 router.with_state(state)
951}
952
953#[cfg(feature = "kafka")]
954#[derive(Debug, Clone, Serialize, Deserialize)]
955pub struct KafkaBrokerStats {
956 pub topics: usize,
958 pub partitions: usize,
960 pub consumer_groups: usize,
962 pub messages_produced: u64,
964 pub messages_consumed: u64,
966}
967
968#[cfg(feature = "kafka")]
969#[derive(Debug, Clone, Serialize, Deserialize)]
970pub struct KafkaTopicInfo {
971 pub name: String,
972 pub partitions: usize,
973 pub replication_factor: i32,
974}
975
976#[cfg(feature = "kafka")]
977#[derive(Debug, Clone, Serialize, Deserialize)]
978pub struct KafkaConsumerGroupInfo {
979 pub group_id: String,
980 pub members: usize,
981 pub state: String,
982}
983
984#[cfg(feature = "kafka")]
985async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
987 if let Some(broker) = &state.kafka_broker {
988 let topics = broker.topics.read().await;
989 let consumer_groups = broker.consumer_groups.read().await;
990 let metrics = broker.metrics.clone();
991
992 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
993 let snapshot = metrics.snapshot();
994 let messages_produced = snapshot.messages_produced_total;
995 let messages_consumed = snapshot.messages_consumed_total;
996
997 let stats = KafkaBrokerStats {
998 topics: topics.len(),
999 partitions: total_partitions,
1000 consumer_groups: consumer_groups.groups().len(),
1001 messages_produced,
1002 messages_consumed,
1003 };
1004
1005 Json(stats).into_response()
1006 } else {
1007 (
1008 StatusCode::SERVICE_UNAVAILABLE,
1009 Json(serde_json::json!({
1010 "error": "Kafka broker not available",
1011 "message": "Kafka broker is not enabled or not available."
1012 })),
1013 )
1014 .into_response()
1015 }
1016}
1017
1018#[cfg(feature = "kafka")]
1019async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1021 if let Some(broker) = &state.kafka_broker {
1022 let topics = broker.topics.read().await;
1023 let topic_list: Vec<KafkaTopicInfo> = topics
1024 .iter()
1025 .map(|(name, topic)| KafkaTopicInfo {
1026 name: name.clone(),
1027 partitions: topic.partitions.len(),
1028 replication_factor: topic.config.replication_factor,
1029 })
1030 .collect();
1031
1032 Json(serde_json::json!({
1033 "topics": topic_list
1034 }))
1035 .into_response()
1036 } else {
1037 (
1038 StatusCode::SERVICE_UNAVAILABLE,
1039 Json(serde_json::json!({
1040 "error": "Kafka broker not available",
1041 "message": "Kafka broker is not enabled or not available."
1042 })),
1043 )
1044 .into_response()
1045 }
1046}
1047
1048#[cfg(feature = "kafka")]
1049async fn get_kafka_topic(
1051 State(state): State<ManagementState>,
1052 Path(topic_name): Path<String>,
1053) -> impl IntoResponse {
1054 if let Some(broker) = &state.kafka_broker {
1055 let topics = broker.topics.read().await;
1056 if let Some(topic) = topics.get(&topic_name) {
1057 Json(serde_json::json!({
1058 "name": topic_name,
1059 "partitions": topic.partitions.len(),
1060 "replication_factor": topic.config.replication_factor,
1061 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
1062 "id": idx as i32,
1063 "leader": 0,
1064 "replicas": vec![0],
1065 "message_count": partition.messages.len()
1066 })).collect::<Vec<_>>()
1067 })).into_response()
1068 } else {
1069 (
1070 StatusCode::NOT_FOUND,
1071 Json(serde_json::json!({
1072 "error": "Topic not found",
1073 "topic": topic_name
1074 })),
1075 )
1076 .into_response()
1077 }
1078 } else {
1079 (
1080 StatusCode::SERVICE_UNAVAILABLE,
1081 Json(serde_json::json!({
1082 "error": "Kafka broker not available",
1083 "message": "Kafka broker is not enabled or not available."
1084 })),
1085 )
1086 .into_response()
1087 }
1088}
1089
1090#[cfg(feature = "kafka")]
1091async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
1093 if let Some(broker) = &state.kafka_broker {
1094 let consumer_groups = broker.consumer_groups.read().await;
1095 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
1096 .groups()
1097 .iter()
1098 .map(|(group_id, group)| KafkaConsumerGroupInfo {
1099 group_id: group_id.clone(),
1100 members: group.members.len(),
1101 state: "Stable".to_string(), })
1103 .collect();
1104
1105 Json(serde_json::json!({
1106 "groups": groups
1107 }))
1108 .into_response()
1109 } else {
1110 (
1111 StatusCode::SERVICE_UNAVAILABLE,
1112 Json(serde_json::json!({
1113 "error": "Kafka broker not available",
1114 "message": "Kafka broker is not enabled or not available."
1115 })),
1116 )
1117 .into_response()
1118 }
1119}
1120
1121#[cfg(feature = "kafka")]
1122async fn get_kafka_group(
1124 State(state): State<ManagementState>,
1125 Path(group_id): Path<String>,
1126) -> impl IntoResponse {
1127 if let Some(broker) = &state.kafka_broker {
1128 let consumer_groups = broker.consumer_groups.read().await;
1129 if let Some(group) = consumer_groups.groups().get(&group_id) {
1130 Json(serde_json::json!({
1131 "group_id": group_id,
1132 "members": group.members.len(),
1133 "state": "Stable",
1134 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
1135 "member_id": member_id,
1136 "client_id": member.client_id,
1137 "assignments": member.assignment.iter().map(|a| serde_json::json!({
1138 "topic": a.topic,
1139 "partitions": a.partitions
1140 })).collect::<Vec<_>>()
1141 })).collect::<Vec<_>>(),
1142 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
1143 "topic": topic,
1144 "partition": partition,
1145 "offset": offset
1146 })).collect::<Vec<_>>()
1147 })).into_response()
1148 } else {
1149 (
1150 StatusCode::NOT_FOUND,
1151 Json(serde_json::json!({
1152 "error": "Consumer group not found",
1153 "group_id": group_id
1154 })),
1155 )
1156 .into_response()
1157 }
1158 } else {
1159 (
1160 StatusCode::SERVICE_UNAVAILABLE,
1161 Json(serde_json::json!({
1162 "error": "Kafka broker not available",
1163 "message": "Kafka broker is not enabled or not available."
1164 })),
1165 )
1166 .into_response()
1167 }
1168}
1169
1170#[cfg(feature = "kafka")]
1173#[derive(Debug, Deserialize)]
1174pub struct KafkaProduceRequest {
1175 pub topic: String,
1177 #[serde(default)]
1179 pub key: Option<String>,
1180 pub value: String,
1182 #[serde(default)]
1184 pub partition: Option<i32>,
1185 #[serde(default)]
1187 pub headers: Option<std::collections::HashMap<String, String>>,
1188}
1189
1190#[cfg(feature = "kafka")]
1191async fn produce_kafka_message(
1193 State(state): State<ManagementState>,
1194 Json(request): Json<KafkaProduceRequest>,
1195) -> impl IntoResponse {
1196 if let Some(broker) = &state.kafka_broker {
1197 let mut topics = broker.topics.write().await;
1198
1199 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
1201 crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
1202 });
1203
1204 let partition_id = if let Some(partition) = request.partition {
1206 partition
1207 } else {
1208 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
1209 };
1210
1211 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
1213 return (
1214 StatusCode::BAD_REQUEST,
1215 Json(serde_json::json!({
1216 "error": "Invalid partition",
1217 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
1218 })),
1219 )
1220 .into_response();
1221 }
1222
1223 let message = crate::partitions::KafkaMessage {
1225 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
1227 key: request.key.map(|k| k.as_bytes().to_vec()),
1228 value: request.value.as_bytes().to_vec(),
1229 headers: request
1230 .headers
1231 .unwrap_or_default()
1232 .into_iter()
1233 .map(|(k, v)| (k, v.as_bytes().to_vec()))
1234 .collect(),
1235 };
1236
1237 match topic_entry.produce(partition_id, message).await {
1239 Ok(offset) => {
1240 broker.metrics.record_messages_produced(1);
1242
1243 #[cfg(feature = "kafka")]
1245 {
1246 let event = MessageEvent::Kafka(KafkaMessageEvent {
1247 topic: request.topic.clone(),
1248 key: request.key.clone(),
1249 value: request.value.clone(),
1250 partition: partition_id,
1251 offset,
1252 headers: request.headers.clone(),
1253 timestamp: chrono::Utc::now().to_rfc3339(),
1254 });
1255 let _ = state.message_events.send(event);
1256 }
1257
1258 Json(serde_json::json!({
1259 "success": true,
1260 "message": format!("Message produced to topic '{}'", request.topic),
1261 "topic": request.topic,
1262 "partition": partition_id,
1263 "offset": offset
1264 }))
1265 .into_response()
1266 }
1267 Err(e) => (
1268 StatusCode::INTERNAL_SERVER_ERROR,
1269 Json(serde_json::json!({
1270 "error": "Failed to produce message",
1271 "message": e.to_string()
1272 })),
1273 )
1274 .into_response(),
1275 }
1276 } else {
1277 (
1278 StatusCode::SERVICE_UNAVAILABLE,
1279 Json(serde_json::json!({
1280 "error": "Kafka broker not available",
1281 "message": "Kafka broker is not enabled or not available."
1282 })),
1283 )
1284 .into_response()
1285 }
1286}
1287
1288#[cfg(feature = "kafka")]
1289#[derive(Debug, Deserialize)]
1290pub struct KafkaBatchProduceRequest {
1291 pub messages: Vec<KafkaProduceRequest>,
1293 #[serde(default = "default_delay")]
1295 pub delay_ms: u64,
1296}
1297
1298#[cfg(feature = "kafka")]
1299async fn produce_kafka_batch(
1301 State(state): State<ManagementState>,
1302 Json(request): Json<KafkaBatchProduceRequest>,
1303) -> impl IntoResponse {
1304 if let Some(broker) = &state.kafka_broker {
1305 if request.messages.is_empty() {
1306 return (
1307 StatusCode::BAD_REQUEST,
1308 Json(serde_json::json!({
1309 "error": "Empty batch",
1310 "message": "At least one message is required"
1311 })),
1312 )
1313 .into_response();
1314 }
1315
1316 let mut results = Vec::new();
1317
1318 for (index, msg_request) in request.messages.iter().enumerate() {
1319 let mut topics = broker.topics.write().await;
1320
1321 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
1323 crate::topics::Topic::new(
1324 msg_request.topic.clone(),
1325 crate::topics::TopicConfig::default(),
1326 )
1327 });
1328
1329 let partition_id = if let Some(partition) = msg_request.partition {
1331 partition
1332 } else {
1333 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
1334 };
1335
1336 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
1338 results.push(serde_json::json!({
1339 "index": index,
1340 "success": false,
1341 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
1342 }));
1343 continue;
1344 }
1345
1346 let message = crate::partitions::KafkaMessage {
1348 offset: 0,
1349 timestamp: chrono::Utc::now().timestamp_millis(),
1350 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
1351 value: msg_request.value.as_bytes().to_vec(),
1352 headers: msg_request
1353 .headers
1354 .clone()
1355 .unwrap_or_default()
1356 .into_iter()
1357 .map(|(k, v)| (k, v.as_bytes().to_vec()))
1358 .collect(),
1359 };
1360
1361 match topic_entry.produce(partition_id, message).await {
1363 Ok(offset) => {
1364 broker.metrics.record_messages_produced(1);
1365
1366 let event = MessageEvent::Kafka(KafkaMessageEvent {
1368 topic: msg_request.topic.clone(),
1369 key: msg_request.key.clone(),
1370 value: msg_request.value.clone(),
1371 partition: partition_id,
1372 offset,
1373 headers: msg_request.headers.clone(),
1374 timestamp: chrono::Utc::now().to_rfc3339(),
1375 });
1376 let _ = state.message_events.send(event);
1377
1378 results.push(serde_json::json!({
1379 "index": index,
1380 "success": true,
1381 "topic": msg_request.topic,
1382 "partition": partition_id,
1383 "offset": offset
1384 }));
1385 }
1386 Err(e) => {
1387 results.push(serde_json::json!({
1388 "index": index,
1389 "success": false,
1390 "error": e.to_string()
1391 }));
1392 }
1393 }
1394
1395 if index < request.messages.len() - 1 && request.delay_ms > 0 {
1397 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
1398 }
1399 }
1400
1401 let success_count =
1402 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1403
1404 Json(serde_json::json!({
1405 "success": true,
1406 "total": request.messages.len(),
1407 "succeeded": success_count,
1408 "failed": request.messages.len() - success_count,
1409 "results": results
1410 }))
1411 .into_response()
1412 } else {
1413 (
1414 StatusCode::SERVICE_UNAVAILABLE,
1415 Json(serde_json::json!({
1416 "error": "Kafka broker not available",
1417 "message": "Kafka broker is not enabled or not available."
1418 })),
1419 )
1420 .into_response()
1421 }
1422}
1423
1424#[cfg(feature = "mqtt")]
1427async fn mqtt_messages_stream(
1429 State(state): State<ManagementState>,
1430 Query(params): Query<std::collections::HashMap<String, String>>,
1431) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1432 let mut rx = state.message_events.subscribe();
1433 let topic_filter = params.get("topic").cloned();
1434
1435 let stream = stream::unfold(rx, move |mut rx| {
1436 let topic_filter = topic_filter.clone();
1437
1438 async move {
1439 loop {
1440 match rx.recv().await {
1441 Ok(MessageEvent::Mqtt(event)) => {
1442 if let Some(filter) = &topic_filter {
1444 if !event.topic.contains(filter) {
1445 continue;
1446 }
1447 }
1448
1449 let event_json = serde_json::json!({
1450 "protocol": "mqtt",
1451 "topic": event.topic,
1452 "payload": event.payload,
1453 "qos": event.qos,
1454 "retain": event.retain,
1455 "timestamp": event.timestamp,
1456 });
1457
1458 if let Ok(event_data) = serde_json::to_string(&event_json) {
1459 let sse_event = Event::default().event("mqtt_message").data(event_data);
1460 return Some((Ok(sse_event), rx));
1461 }
1462 }
1463 #[cfg(feature = "kafka")]
1464 Ok(MessageEvent::Kafka(_)) => {
1465 continue;
1467 }
1468 Err(broadcast::error::RecvError::Closed) => {
1469 return None;
1470 }
1471 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1472 warn!("MQTT message stream lagged, skipped {} messages", skipped);
1473 continue;
1474 }
1475 }
1476 }
1477 }
1478 });
1479
1480 Sse::new(stream).keep_alive(
1481 axum::response::sse::KeepAlive::new()
1482 .interval(std::time::Duration::from_secs(15))
1483 .text("keep-alive-text"),
1484 )
1485}
1486
1487#[cfg(feature = "kafka")]
1488async fn kafka_messages_stream(
1490 State(state): State<ManagementState>,
1491 Query(params): Query<std::collections::HashMap<String, String>>,
1492) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
1493 let mut rx = state.message_events.subscribe();
1494 let topic_filter = params.get("topic").cloned();
1495
1496 let stream = stream::unfold(rx, move |mut rx| {
1497 let topic_filter = topic_filter.clone();
1498
1499 async move {
1500 loop {
1501 match rx.recv().await {
1502 #[cfg(feature = "mqtt")]
1503 Ok(MessageEvent::Mqtt(_)) => {
1504 continue;
1506 }
1507 Ok(MessageEvent::Kafka(event)) => {
1508 if let Some(filter) = &topic_filter {
1510 if !event.topic.contains(filter) {
1511 continue;
1512 }
1513 }
1514
1515 let event_json = serde_json::json!({
1516 "protocol": "kafka",
1517 "topic": event.topic,
1518 "key": event.key,
1519 "value": event.value,
1520 "partition": event.partition,
1521 "offset": event.offset,
1522 "headers": event.headers,
1523 "timestamp": event.timestamp,
1524 });
1525
1526 if let Ok(event_data) = serde_json::to_string(&event_json) {
1527 let sse_event =
1528 Event::default().event("kafka_message").data(event_data);
1529 return Some((Ok(sse_event), rx));
1530 }
1531 }
1532 Err(broadcast::error::RecvError::Closed) => {
1533 return None;
1534 }
1535 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1536 warn!("Kafka message stream lagged, skipped {} messages", skipped);
1537 continue;
1538 }
1539 }
1540 }
1541 }
1542 });
1543
1544 Sse::new(stream).keep_alive(
1545 axum::response::sse::KeepAlive::new()
1546 .interval(std::time::Duration::from_secs(15))
1547 .text("keep-alive-text"),
1548 )
1549}
1550
1551#[derive(Debug, Deserialize)]
1555pub struct GenerateSpecRequest {
1556 pub query: String,
1558 pub spec_type: String,
1560 pub api_version: Option<String>,
1562}
1563
1564#[cfg(feature = "data-faker")]
1566async fn generate_ai_spec(
1567 State(_state): State<ManagementState>,
1568 Json(request): Json<GenerateSpecRequest>,
1569) -> impl IntoResponse {
1570 use mockforge_data::rag::{
1571 config::{EmbeddingProvider, LlmProvider, RagConfig},
1572 engine::RagEngine,
1573 storage::{DocumentStorage, StorageFactory},
1574 };
1575 use std::sync::Arc;
1576
1577 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
1579 .ok()
1580 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
1581
1582 if api_key.is_none() {
1584 return (
1585 StatusCode::SERVICE_UNAVAILABLE,
1586 Json(serde_json::json!({
1587 "error": "AI service not configured",
1588 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
1589 })),
1590 )
1591 .into_response();
1592 }
1593
1594 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
1596 .unwrap_or_else(|_| "openai".to_string())
1597 .to_lowercase();
1598
1599 let provider = match provider_str.as_str() {
1600 "openai" => LlmProvider::OpenAI,
1601 "anthropic" => LlmProvider::Anthropic,
1602 "ollama" => LlmProvider::Ollama,
1603 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
1604 _ => LlmProvider::OpenAI,
1605 };
1606
1607 let api_endpoint =
1608 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
1609 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
1610 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
1611 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
1612 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
1613 });
1614
1615 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
1616 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
1617 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
1618 LlmProvider::Ollama => "llama2".to_string(),
1619 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
1620 });
1621
1622 let mut rag_config = RagConfig::default();
1624 rag_config.provider = provider;
1625 rag_config.api_endpoint = api_endpoint;
1626 rag_config.api_key = api_key;
1627 rag_config.model = model;
1628 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
1629 .unwrap_or_else(|_| "4096".to_string())
1630 .parse()
1631 .unwrap_or(4096);
1632 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
1633 .unwrap_or_else(|_| "0.3".to_string())
1634 .parse()
1635 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
1637 .unwrap_or_else(|_| "60".to_string())
1638 .parse()
1639 .unwrap_or(60);
1640 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
1641 .unwrap_or_else(|_| "4000".to_string())
1642 .parse()
1643 .unwrap_or(4000);
1644
1645 let spec_type_label = match request.spec_type.as_str() {
1647 "openapi" => "OpenAPI 3.0",
1648 "graphql" => "GraphQL",
1649 "asyncapi" => "AsyncAPI",
1650 _ => "OpenAPI 3.0",
1651 };
1652
1653 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
1654
1655 let prompt = format!(
1656 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
1657
1658User Requirements:
1659{}
1660
1661Instructions:
16621. Generate a complete, valid {} specification
16632. Include all paths, operations, request/response schemas, and components
16643. Use realistic field names and data types
16654. Include proper descriptions and examples
16665. Follow {} best practices
16676. Return ONLY the specification, no additional explanation
16687. For OpenAPI, use version {}
1669
1670Return the specification in {} format."#,
1671 spec_type_label,
1672 request.query,
1673 spec_type_label,
1674 spec_type_label,
1675 api_version,
1676 if request.spec_type == "graphql" {
1677 "GraphQL SDL"
1678 } else {
1679 "YAML"
1680 }
1681 );
1682
1683 use mockforge_data::rag::storage::InMemoryStorage;
1688 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
1689
1690 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
1692 Ok(engine) => engine,
1693 Err(e) => {
1694 return (
1695 StatusCode::INTERNAL_SERVER_ERROR,
1696 Json(serde_json::json!({
1697 "error": "Failed to initialize RAG engine",
1698 "message": e.to_string()
1699 })),
1700 )
1701 .into_response();
1702 }
1703 };
1704
1705 match rag_engine.generate(&prompt, None).await {
1707 Ok(generated_text) => {
1708 let spec = if request.spec_type == "graphql" {
1710 extract_graphql_schema(&generated_text)
1712 } else {
1713 extract_yaml_spec(&generated_text)
1715 };
1716
1717 Json(serde_json::json!({
1718 "success": true,
1719 "spec": spec,
1720 "spec_type": request.spec_type,
1721 }))
1722 .into_response()
1723 }
1724 Err(e) => (
1725 StatusCode::INTERNAL_SERVER_ERROR,
1726 Json(serde_json::json!({
1727 "error": "AI generation failed",
1728 "message": e.to_string()
1729 })),
1730 )
1731 .into_response(),
1732 }
1733}
1734
1735#[cfg(not(feature = "data-faker"))]
1736async fn generate_ai_spec(
1737 State(_state): State<ManagementState>,
1738 Json(_request): Json<GenerateSpecRequest>,
1739) -> impl IntoResponse {
1740 (
1741 StatusCode::NOT_IMPLEMENTED,
1742 Json(serde_json::json!({
1743 "error": "AI features not enabled",
1744 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
1745 })),
1746 )
1747 .into_response()
1748}
1749
1750fn extract_yaml_spec(text: &str) -> String {
1751 if let Some(start) = text.find("```yaml") {
1753 let yaml_start = text[start + 7..].trim_start();
1754 if let Some(end) = yaml_start.find("```") {
1755 return yaml_start[..end].trim().to_string();
1756 }
1757 }
1758 if let Some(start) = text.find("```") {
1759 let content_start = text[start + 3..].trim_start();
1760 if let Some(end) = content_start.find("```") {
1761 return content_start[..end].trim().to_string();
1762 }
1763 }
1764
1765 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
1767 return text.trim().to_string();
1768 }
1769
1770 text.trim().to_string()
1772}
1773
1774fn extract_graphql_schema(text: &str) -> String {
1775 if let Some(start) = text.find("```graphql") {
1777 let schema_start = text[start + 10..].trim_start();
1778 if let Some(end) = schema_start.find("```") {
1779 return schema_start[..end].trim().to_string();
1780 }
1781 }
1782 if let Some(start) = text.find("```") {
1783 let content_start = text[start + 3..].trim_start();
1784 if let Some(end) = content_start.find("```") {
1785 return content_start[..end].trim().to_string();
1786 }
1787 }
1788
1789 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
1791 return text.trim().to_string();
1792 }
1793
1794 text.trim().to_string()
1795}
1796
1797async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
1801 Json(serde_json::json!({
1803 "enabled": false,
1804 "latency": null,
1805 "fault_injection": null,
1806 "rate_limit": null,
1807 "traffic_shaping": null,
1808 }))
1809 .into_response()
1810}
1811
1812#[derive(Debug, Deserialize)]
1814pub struct ChaosConfigUpdate {
1815 pub enabled: Option<bool>,
1817 pub latency: Option<serde_json::Value>,
1819 pub fault_injection: Option<serde_json::Value>,
1821 pub rate_limit: Option<serde_json::Value>,
1823 pub traffic_shaping: Option<serde_json::Value>,
1825}
1826
1827async fn update_chaos_config(
1829 State(_state): State<ManagementState>,
1830 Json(config): Json<ChaosConfigUpdate>,
1831) -> impl IntoResponse {
1832 Json(serde_json::json!({
1834 "success": true,
1835 "message": "Chaos configuration updated"
1836 }))
1837 .into_response()
1838}
1839
1840async fn list_network_profiles() -> impl IntoResponse {
1844 use mockforge_core::network_profiles::NetworkProfileCatalog;
1845
1846 let catalog = NetworkProfileCatalog::default();
1847 let profiles: Vec<serde_json::Value> = catalog
1848 .list_profiles_with_description()
1849 .iter()
1850 .map(|(name, description)| {
1851 serde_json::json!({
1852 "name": name,
1853 "description": description,
1854 })
1855 })
1856 .collect();
1857
1858 Json(serde_json::json!({
1859 "profiles": profiles
1860 }))
1861 .into_response()
1862}
1863
1864#[derive(Debug, Deserialize)]
1865pub struct ApplyNetworkProfileRequest {
1867 pub profile_name: String,
1869}
1870
1871async fn apply_network_profile(
1873 State(_state): State<ManagementState>,
1874 Json(request): Json<ApplyNetworkProfileRequest>,
1875) -> impl IntoResponse {
1876 use mockforge_core::network_profiles::NetworkProfileCatalog;
1877
1878 let catalog = NetworkProfileCatalog::default();
1879 if let Some(profile) = catalog.get(&request.profile_name) {
1880 Json(serde_json::json!({
1882 "success": true,
1883 "message": format!("Network profile '{}' applied", request.profile_name),
1884 "profile": {
1885 "name": profile.name,
1886 "description": profile.description,
1887 }
1888 }))
1889 .into_response()
1890 } else {
1891 (
1892 StatusCode::NOT_FOUND,
1893 Json(serde_json::json!({
1894 "error": "Profile not found",
1895 "message": format!("Network profile '{}' not found", request.profile_name)
1896 })),
1897 )
1898 .into_response()
1899 }
1900}
1901
1902pub fn management_router_with_ui_builder(
1904 state: ManagementState,
1905 server_config: mockforge_core::config::ServerConfig,
1906) -> Router {
1907 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
1908
1909 let management = management_router(state);
1911
1912 let ui_builder_state = UIBuilderState::new(server_config);
1914 let ui_builder = create_ui_builder_router(ui_builder_state);
1915
1916 management.nest("/ui-builder", ui_builder)
1918}
1919
1920pub fn management_router_with_spec_import(state: ManagementState) -> Router {
1922 use crate::spec_import::{spec_import_router, SpecImportState};
1923
1924 let management = management_router(state);
1926
1927 Router::new()
1929 .merge(management)
1930 .merge(spec_import_router(SpecImportState::new()))
1931}
1932
1933#[cfg(test)]
1934mod tests {
1935 use super::*;
1936
1937 #[tokio::test]
1938 async fn test_create_and_get_mock() {
1939 let state = ManagementState::new(None, None, 3000);
1940
1941 let mock = MockConfig {
1942 id: "test-1".to_string(),
1943 name: "Test Mock".to_string(),
1944 method: "GET".to_string(),
1945 path: "/test".to_string(),
1946 response: MockResponse {
1947 body: serde_json::json!({"message": "test"}),
1948 headers: None,
1949 },
1950 enabled: true,
1951 latency_ms: None,
1952 status_code: Some(200),
1953 };
1954
1955 {
1957 let mut mocks = state.mocks.write().await;
1958 mocks.push(mock.clone());
1959 }
1960
1961 let mocks = state.mocks.read().await;
1963 let found = mocks.iter().find(|m| m.id == "test-1");
1964 assert!(found.is_some());
1965 assert_eq!(found.unwrap().name, "Test Mock");
1966 }
1967
1968 #[tokio::test]
1969 async fn test_server_stats() {
1970 let state = ManagementState::new(None, None, 3000);
1971
1972 {
1974 let mut mocks = state.mocks.write().await;
1975 mocks.push(MockConfig {
1976 id: "1".to_string(),
1977 name: "Mock 1".to_string(),
1978 method: "GET".to_string(),
1979 path: "/test1".to_string(),
1980 response: MockResponse {
1981 body: serde_json::json!({}),
1982 headers: None,
1983 },
1984 enabled: true,
1985 latency_ms: None,
1986 status_code: Some(200),
1987 });
1988 mocks.push(MockConfig {
1989 id: "2".to_string(),
1990 name: "Mock 2".to_string(),
1991 method: "POST".to_string(),
1992 path: "/test2".to_string(),
1993 response: MockResponse {
1994 body: serde_json::json!({}),
1995 headers: None,
1996 },
1997 enabled: false,
1998 latency_ms: None,
1999 status_code: Some(201),
2000 });
2001 }
2002
2003 let mocks = state.mocks.read().await;
2004 assert_eq!(mocks.len(), 2);
2005 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
2006 }
2007}