1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
30
31fn get_message_broadcast_capacity() -> usize {
33 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
34 .ok()
35 .and_then(|s| s.parse().ok())
36 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "protocol", content = "data")]
42#[serde(rename_all = "lowercase")]
43pub enum MessageEvent {
44 #[cfg(feature = "mqtt")]
45 Mqtt(MqttMessageEvent),
47 #[cfg(feature = "kafka")]
48 Kafka(KafkaMessageEvent),
50}
51
52#[cfg(feature = "mqtt")]
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct MqttMessageEvent {
56 pub topic: String,
58 pub payload: String,
60 pub qos: u8,
62 pub retain: bool,
64 pub timestamp: String,
66}
67
68#[cfg(feature = "kafka")]
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct KafkaMessageEvent {
71 pub topic: String,
72 pub key: Option<String>,
73 pub value: String,
74 pub partition: i32,
75 pub offset: i64,
76 pub headers: Option<std::collections::HashMap<String, String>>,
77 pub timestamp: String,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MockConfig {
83 #[serde(skip_serializing_if = "String::is_empty")]
85 pub id: String,
86 pub name: String,
88 pub method: String,
90 pub path: String,
92 pub response: MockResponse,
94 #[serde(default = "default_true")]
96 pub enabled: bool,
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub latency_ms: Option<u64>,
100 #[serde(skip_serializing_if = "Option::is_none")]
102 pub status_code: Option<u16>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub request_match: Option<RequestMatchCriteria>,
106 #[serde(skip_serializing_if = "Option::is_none")]
108 pub priority: Option<i32>,
109 #[serde(skip_serializing_if = "Option::is_none")]
111 pub scenario: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
114 pub required_scenario_state: Option<String>,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub new_scenario_state: Option<String>,
118}
119
120fn default_true() -> bool {
121 true
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct MockResponse {
127 pub body: serde_json::Value,
129 #[serde(skip_serializing_if = "Option::is_none")]
131 pub headers: Option<std::collections::HashMap<String, String>>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize, Default)]
136pub struct RequestMatchCriteria {
137 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
139 pub headers: std::collections::HashMap<String, String>,
140 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
142 pub query_params: std::collections::HashMap<String, String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub body_pattern: Option<String>,
146 #[serde(skip_serializing_if = "Option::is_none")]
148 pub json_path: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub xpath: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
154 pub custom_matcher: Option<String>,
155}
156
157pub fn mock_matches_request(
166 mock: &MockConfig,
167 method: &str,
168 path: &str,
169 headers: &std::collections::HashMap<String, String>,
170 query_params: &std::collections::HashMap<String, String>,
171 body: Option<&[u8]>,
172) -> bool {
173 use regex::Regex;
174
175 if !mock.enabled {
177 return false;
178 }
179
180 if mock.method.to_uppercase() != method.to_uppercase() {
182 return false;
183 }
184
185 if !path_matches_pattern(&mock.path, path) {
187 return false;
188 }
189
190 if let Some(criteria) = &mock.request_match {
192 for (key, expected_value) in &criteria.headers {
194 let header_key_lower = key.to_lowercase();
195 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
196
197 if let Some((_, actual_value)) = found {
198 if let Ok(re) = Regex::new(expected_value) {
200 if !re.is_match(actual_value) {
201 return false;
202 }
203 } else if actual_value != expected_value {
204 return false;
205 }
206 } else {
207 return false; }
209 }
210
211 for (key, expected_value) in &criteria.query_params {
213 if let Some(actual_value) = query_params.get(key) {
214 if actual_value != expected_value {
215 return false;
216 }
217 } else {
218 return false; }
220 }
221
222 if let Some(pattern) = &criteria.body_pattern {
224 if let Some(body_bytes) = body {
225 let body_str = String::from_utf8_lossy(body_bytes);
226 if let Ok(re) = Regex::new(pattern) {
228 if !re.is_match(&body_str) {
229 return false;
230 }
231 } else if body_str.as_ref() != pattern {
232 return false;
233 }
234 } else {
235 return false; }
237 }
238
239 if let Some(json_path) = &criteria.json_path {
241 if let Some(body_bytes) = body {
242 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
243 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
244 if !json_path_exists(&json_value, json_path) {
246 return false;
247 }
248 }
249 }
250 }
251 }
252
253 if let Some(_xpath) = &criteria.xpath {
255 tracing::warn!("XPath matching not yet fully implemented");
258 }
259
260 if let Some(custom) = &criteria.custom_matcher {
262 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
263 return false;
264 }
265 }
266 }
267
268 true
269}
270
271fn path_matches_pattern(pattern: &str, path: &str) -> bool {
273 if pattern == path {
275 return true;
276 }
277
278 if pattern == "*" {
280 return true;
281 }
282
283 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
285 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
286
287 if pattern_parts.len() != path_parts.len() {
288 if pattern.contains('*') {
290 return matches_wildcard_pattern(pattern, path);
291 }
292 return false;
293 }
294
295 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
296 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
298 continue; }
300
301 if pattern_part != path_part {
302 return false;
303 }
304 }
305
306 true
307}
308
309fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
311 use regex::Regex;
312
313 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
315
316 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
317 return re.is_match(path);
318 }
319
320 false
321}
322
323fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
325 if let Some(path) = json_path.strip_prefix("$.") {
328 let parts: Vec<&str> = path.split('.').collect();
329
330 let mut current = json;
331 for part in parts {
332 if let Some(obj) = current.as_object() {
333 if let Some(value) = obj.get(part) {
334 current = value;
335 } else {
336 return false;
337 }
338 } else {
339 return false;
340 }
341 }
342 true
343 } else {
344 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
346 false
347 }
348}
349
350fn evaluate_custom_matcher(
352 expression: &str,
353 method: &str,
354 path: &str,
355 headers: &std::collections::HashMap<String, String>,
356 query_params: &std::collections::HashMap<String, String>,
357 body: Option<&[u8]>,
358) -> bool {
359 use regex::Regex;
360
361 let expr = expression.trim();
362
363 if expr.contains("==") {
365 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
366 if parts.len() != 2 {
367 return false;
368 }
369
370 let field = parts[0];
371 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
372
373 match field {
374 "method" => method == expected_value,
375 "path" => path == expected_value,
376 _ if field.starts_with("headers.") => {
377 let header_name = &field[8..];
378 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
379 }
380 _ if field.starts_with("query.") => {
381 let param_name = &field[6..];
382 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
383 }
384 _ => false,
385 }
386 }
387 else if expr.contains("=~") {
389 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
390 if parts.len() != 2 {
391 return false;
392 }
393
394 let field = parts[0];
395 let pattern = parts[1].trim_matches('"').trim_matches('\'');
396
397 if let Ok(re) = Regex::new(pattern) {
398 match field {
399 "method" => re.is_match(method),
400 "path" => re.is_match(path),
401 _ if field.starts_with("headers.") => {
402 let header_name = &field[8..];
403 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
404 }
405 _ if field.starts_with("query.") => {
406 let param_name = &field[6..];
407 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
408 }
409 _ => false,
410 }
411 } else {
412 false
413 }
414 }
415 else if expr.contains("contains") {
417 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
418 if parts.len() != 2 {
419 return false;
420 }
421
422 let field = parts[0];
423 let search_value = parts[1].trim_matches('"').trim_matches('\'');
424
425 match field {
426 "path" => path.contains(search_value),
427 _ if field.starts_with("headers.") => {
428 let header_name = &field[8..];
429 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
430 }
431 _ if field.starts_with("body") => {
432 if let Some(body_bytes) = body {
433 let body_str = String::from_utf8_lossy(body_bytes);
434 body_str.contains(search_value)
435 } else {
436 false
437 }
438 }
439 _ => false,
440 }
441 } else {
442 tracing::warn!("Unknown custom matcher expression format: {}", expr);
444 false
445 }
446}
447
448#[derive(Debug, Clone, Serialize, Deserialize)]
450pub struct ServerStats {
451 pub uptime_seconds: u64,
453 pub total_requests: u64,
455 pub active_mocks: usize,
457 pub enabled_mocks: usize,
459 pub registered_routes: usize,
461}
462
463#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct ServerConfig {
466 pub version: String,
468 pub port: u16,
470 pub has_openapi_spec: bool,
472 #[serde(skip_serializing_if = "Option::is_none")]
474 pub spec_path: Option<String>,
475}
476
477#[derive(Clone)]
479pub struct ManagementState {
480 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
482 pub spec: Option<Arc<OpenApiSpec>>,
484 pub spec_path: Option<String>,
486 pub port: u16,
488 pub start_time: std::time::Instant,
490 pub request_counter: Arc<RwLock<u64>>,
492 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
494 #[cfg(feature = "smtp")]
496 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
497 #[cfg(feature = "mqtt")]
499 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
500 #[cfg(feature = "kafka")]
502 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
503 #[cfg(any(feature = "mqtt", feature = "kafka"))]
505 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
506 pub state_machine_manager:
508 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
509 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
511 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
513 pub rule_explanations: Arc<
515 RwLock<
516 std::collections::HashMap<
517 String,
518 mockforge_core::intelligent_behavior::RuleExplanation,
519 >,
520 >,
521 >,
522 #[cfg(feature = "chaos")]
524 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
525 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
527}
528
529impl ManagementState {
530 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
537 Self {
538 mocks: Arc::new(RwLock::new(Vec::new())),
539 spec,
540 spec_path,
541 port,
542 start_time: std::time::Instant::now(),
543 request_counter: Arc::new(RwLock::new(0)),
544 proxy_config: None,
545 #[cfg(feature = "smtp")]
546 smtp_registry: None,
547 #[cfg(feature = "mqtt")]
548 mqtt_broker: None,
549 #[cfg(feature = "kafka")]
550 kafka_broker: None,
551 #[cfg(any(feature = "mqtt", feature = "kafka"))]
552 message_events: {
553 let capacity = get_message_broadcast_capacity();
554 let (tx, _) = broadcast::channel(capacity);
555 Arc::new(tx)
556 },
557 state_machine_manager: Arc::new(RwLock::new(
558 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
559 )),
560 ws_broadcast: None,
561 lifecycle_hooks: None,
562 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
563 #[cfg(feature = "chaos")]
564 chaos_api_state: None,
565 server_config: None,
566 }
567 }
568
569 pub fn with_lifecycle_hooks(
571 mut self,
572 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
573 ) -> Self {
574 self.lifecycle_hooks = Some(hooks);
575 self
576 }
577
578 pub fn with_ws_broadcast(
580 mut self,
581 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
582 ) -> Self {
583 self.ws_broadcast = Some(ws_broadcast);
584 self
585 }
586
587 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
589 self.proxy_config = Some(proxy_config);
590 self
591 }
592
593 #[cfg(feature = "smtp")]
594 pub fn with_smtp_registry(
596 mut self,
597 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
598 ) -> Self {
599 self.smtp_registry = Some(smtp_registry);
600 self
601 }
602
603 #[cfg(feature = "mqtt")]
604 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
606 self.mqtt_broker = Some(mqtt_broker);
607 self
608 }
609
610 #[cfg(feature = "kafka")]
611 pub fn with_kafka_broker(
613 mut self,
614 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
615 ) -> Self {
616 self.kafka_broker = Some(kafka_broker);
617 self
618 }
619
620 #[cfg(feature = "chaos")]
621 pub fn with_chaos_api_state(
623 mut self,
624 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
625 ) -> Self {
626 self.chaos_api_state = Some(chaos_api_state);
627 self
628 }
629
630 pub fn with_server_config(
632 mut self,
633 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
634 ) -> Self {
635 self.server_config = Some(server_config);
636 self
637 }
638}
639
640async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
642 let mocks = state.mocks.read().await;
643 Json(serde_json::json!({
644 "mocks": *mocks,
645 "total": mocks.len(),
646 "enabled": mocks.iter().filter(|m| m.enabled).count()
647 }))
648}
649
650async fn get_mock(
652 State(state): State<ManagementState>,
653 Path(id): Path<String>,
654) -> Result<Json<MockConfig>, StatusCode> {
655 let mocks = state.mocks.read().await;
656 mocks
657 .iter()
658 .find(|m| m.id == id)
659 .cloned()
660 .map(Json)
661 .ok_or(StatusCode::NOT_FOUND)
662}
663
664async fn create_mock(
666 State(state): State<ManagementState>,
667 Json(mut mock): Json<MockConfig>,
668) -> Result<Json<MockConfig>, StatusCode> {
669 let mut mocks = state.mocks.write().await;
670
671 if mock.id.is_empty() {
673 mock.id = uuid::Uuid::new_v4().to_string();
674 }
675
676 if mocks.iter().any(|m| m.id == mock.id) {
678 return Err(StatusCode::CONFLICT);
679 }
680
681 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
682
683 if let Some(hooks) = &state.lifecycle_hooks {
685 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
686 id: mock.id.clone(),
687 name: mock.name.clone(),
688 config: serde_json::to_value(&mock).unwrap_or_default(),
689 };
690 hooks.invoke_mock_created(&event).await;
691 }
692
693 mocks.push(mock.clone());
694
695 if let Some(tx) = &state.ws_broadcast {
697 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
698 }
699
700 Ok(Json(mock))
701}
702
703async fn update_mock(
705 State(state): State<ManagementState>,
706 Path(id): Path<String>,
707 Json(updated_mock): Json<MockConfig>,
708) -> Result<Json<MockConfig>, StatusCode> {
709 let mut mocks = state.mocks.write().await;
710
711 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
712
713 let old_mock = mocks[position].clone();
715
716 info!("Updating mock: {}", id);
717 mocks[position] = updated_mock.clone();
718
719 if let Some(hooks) = &state.lifecycle_hooks {
721 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
722 id: updated_mock.id.clone(),
723 name: updated_mock.name.clone(),
724 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
725 };
726 hooks.invoke_mock_updated(&event).await;
727
728 if old_mock.enabled != updated_mock.enabled {
730 let state_event = if updated_mock.enabled {
731 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
732 id: updated_mock.id.clone(),
733 }
734 } else {
735 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
736 id: updated_mock.id.clone(),
737 }
738 };
739 hooks.invoke_mock_state_changed(&state_event).await;
740 }
741 }
742
743 if let Some(tx) = &state.ws_broadcast {
745 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
746 }
747
748 Ok(Json(updated_mock))
749}
750
751async fn delete_mock(
753 State(state): State<ManagementState>,
754 Path(id): Path<String>,
755) -> Result<StatusCode, StatusCode> {
756 let mut mocks = state.mocks.write().await;
757
758 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
759
760 let deleted_mock = mocks[position].clone();
762
763 info!("Deleting mock: {}", id);
764 mocks.remove(position);
765
766 if let Some(hooks) = &state.lifecycle_hooks {
768 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
769 id: deleted_mock.id.clone(),
770 name: deleted_mock.name.clone(),
771 };
772 hooks.invoke_mock_deleted(&event).await;
773 }
774
775 if let Some(tx) = &state.ws_broadcast {
777 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
778 }
779
780 Ok(StatusCode::NO_CONTENT)
781}
782
783#[derive(Debug, Deserialize)]
785pub struct ValidateConfigRequest {
786 pub config: serde_json::Value,
788 #[serde(default = "default_format")]
790 pub format: String,
791}
792
793fn default_format() -> String {
794 "json".to_string()
795}
796
797async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
799 use mockforge_core::config::ServerConfig;
800
801 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
802 "yaml" | "yml" => {
803 let yaml_str = match serde_json::to_string(&request.config) {
804 Ok(s) => s,
805 Err(e) => {
806 return (
807 StatusCode::BAD_REQUEST,
808 Json(serde_json::json!({
809 "valid": false,
810 "error": format!("Failed to convert to string: {}", e),
811 "message": "Configuration validation failed"
812 })),
813 )
814 .into_response();
815 }
816 };
817 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
818 }
819 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
820 };
821
822 match config_result {
823 Ok(_) => Json(serde_json::json!({
824 "valid": true,
825 "message": "Configuration is valid"
826 }))
827 .into_response(),
828 Err(e) => (
829 StatusCode::BAD_REQUEST,
830 Json(serde_json::json!({
831 "valid": false,
832 "error": format!("Invalid configuration: {}", e),
833 "message": "Configuration validation failed"
834 })),
835 )
836 .into_response(),
837 }
838}
839
840#[derive(Debug, Deserialize)]
842pub struct BulkConfigUpdateRequest {
843 pub updates: serde_json::Value,
845}
846
847async fn bulk_update_config(
855 State(state): State<ManagementState>,
856 Json(request): Json<BulkConfigUpdateRequest>,
857) -> impl IntoResponse {
858 if !request.updates.is_object() {
860 return (
861 StatusCode::BAD_REQUEST,
862 Json(serde_json::json!({
863 "error": "Invalid request",
864 "message": "Updates must be a JSON object"
865 })),
866 )
867 .into_response();
868 }
869
870 use mockforge_core::config::ServerConfig;
872
873 let base_config = ServerConfig::default();
875 let base_json = match serde_json::to_value(&base_config) {
876 Ok(v) => v,
877 Err(e) => {
878 return (
879 StatusCode::INTERNAL_SERVER_ERROR,
880 Json(serde_json::json!({
881 "error": "Internal error",
882 "message": format!("Failed to serialize base config: {}", e)
883 })),
884 )
885 .into_response();
886 }
887 };
888
889 let mut merged = base_json.clone();
891 if let (Some(merged_obj), Some(updates_obj)) =
892 (merged.as_object_mut(), request.updates.as_object())
893 {
894 for (key, value) in updates_obj {
895 merged_obj.insert(key.clone(), value.clone());
896 }
897 }
898
899 match serde_json::from_value::<ServerConfig>(merged) {
901 Ok(_) => {
902 Json(serde_json::json!({
909 "success": true,
910 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
911 "updates_received": request.updates,
912 "validated": true
913 }))
914 .into_response()
915 }
916 Err(e) => (
917 StatusCode::BAD_REQUEST,
918 Json(serde_json::json!({
919 "error": "Invalid configuration",
920 "message": format!("Configuration validation failed: {}", e),
921 "validated": false
922 })),
923 )
924 .into_response(),
925 }
926}
927
928async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
930 let mocks = state.mocks.read().await;
931 let request_count = *state.request_counter.read().await;
932
933 Json(ServerStats {
934 uptime_seconds: state.start_time.elapsed().as_secs(),
935 total_requests: request_count,
936 active_mocks: mocks.len(),
937 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
938 registered_routes: mocks.len(), })
940}
941
942async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
944 Json(ServerConfig {
945 version: env!("CARGO_PKG_VERSION").to_string(),
946 port: state.port,
947 has_openapi_spec: state.spec.is_some(),
948 spec_path: state.spec_path.clone(),
949 })
950}
951
952async fn health_check() -> Json<serde_json::Value> {
954 Json(serde_json::json!({
955 "status": "healthy",
956 "service": "mockforge-management",
957 "timestamp": chrono::Utc::now().to_rfc3339()
958 }))
959}
960
961#[derive(Debug, Clone, Serialize, Deserialize)]
963#[serde(rename_all = "lowercase")]
964pub enum ExportFormat {
965 Json,
967 Yaml,
969}
970
971async fn export_mocks(
973 State(state): State<ManagementState>,
974 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
975) -> Result<(StatusCode, String), StatusCode> {
976 let mocks = state.mocks.read().await;
977
978 let format = params
979 .get("format")
980 .map(|f| match f.as_str() {
981 "yaml" | "yml" => ExportFormat::Yaml,
982 _ => ExportFormat::Json,
983 })
984 .unwrap_or(ExportFormat::Json);
985
986 match format {
987 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
988 .map(|json| (StatusCode::OK, json))
989 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
990 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
991 .map(|yaml| (StatusCode::OK, yaml))
992 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
993 }
994}
995
996async fn import_mocks(
998 State(state): State<ManagementState>,
999 Json(mocks): Json<Vec<MockConfig>>,
1000) -> impl IntoResponse {
1001 let mut current_mocks = state.mocks.write().await;
1002 current_mocks.clear();
1003 current_mocks.extend(mocks);
1004 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1005}
1006
1007#[cfg(feature = "smtp")]
1008async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1010 if let Some(ref smtp_registry) = state.smtp_registry {
1011 match smtp_registry.get_emails() {
1012 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1013 Err(e) => (
1014 StatusCode::INTERNAL_SERVER_ERROR,
1015 Json(serde_json::json!({
1016 "error": "Failed to retrieve emails",
1017 "message": e.to_string()
1018 })),
1019 ),
1020 }
1021 } else {
1022 (
1023 StatusCode::NOT_IMPLEMENTED,
1024 Json(serde_json::json!({
1025 "error": "SMTP mailbox management not available",
1026 "message": "SMTP server is not enabled or registry not available."
1027 })),
1028 )
1029 }
1030}
1031
1032#[cfg(feature = "smtp")]
1034async fn get_smtp_email(
1035 State(state): State<ManagementState>,
1036 Path(id): Path<String>,
1037) -> impl IntoResponse {
1038 if let Some(ref smtp_registry) = state.smtp_registry {
1039 match smtp_registry.get_email_by_id(&id) {
1040 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1041 Ok(None) => (
1042 StatusCode::NOT_FOUND,
1043 Json(serde_json::json!({
1044 "error": "Email not found",
1045 "id": id
1046 })),
1047 ),
1048 Err(e) => (
1049 StatusCode::INTERNAL_SERVER_ERROR,
1050 Json(serde_json::json!({
1051 "error": "Failed to retrieve email",
1052 "message": e.to_string()
1053 })),
1054 ),
1055 }
1056 } else {
1057 (
1058 StatusCode::NOT_IMPLEMENTED,
1059 Json(serde_json::json!({
1060 "error": "SMTP mailbox management not available",
1061 "message": "SMTP server is not enabled or registry not available."
1062 })),
1063 )
1064 }
1065}
1066
1067#[cfg(feature = "smtp")]
1069async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1070 if let Some(ref smtp_registry) = state.smtp_registry {
1071 match smtp_registry.clear_mailbox() {
1072 Ok(()) => (
1073 StatusCode::OK,
1074 Json(serde_json::json!({
1075 "message": "Mailbox cleared successfully"
1076 })),
1077 ),
1078 Err(e) => (
1079 StatusCode::INTERNAL_SERVER_ERROR,
1080 Json(serde_json::json!({
1081 "error": "Failed to clear mailbox",
1082 "message": e.to_string()
1083 })),
1084 ),
1085 }
1086 } else {
1087 (
1088 StatusCode::NOT_IMPLEMENTED,
1089 Json(serde_json::json!({
1090 "error": "SMTP mailbox management not available",
1091 "message": "SMTP server is not enabled or registry not available."
1092 })),
1093 )
1094 }
1095}
1096
1097#[cfg(feature = "smtp")]
1099async fn export_smtp_mailbox(
1100 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1101) -> impl IntoResponse {
1102 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1103 (
1104 StatusCode::NOT_IMPLEMENTED,
1105 Json(serde_json::json!({
1106 "error": "SMTP mailbox management not available via HTTP API",
1107 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1108 "requested_format": format
1109 })),
1110 )
1111}
1112
1113#[cfg(feature = "smtp")]
1115async fn search_smtp_emails(
1116 State(state): State<ManagementState>,
1117 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1118) -> impl IntoResponse {
1119 if let Some(ref smtp_registry) = state.smtp_registry {
1120 let filters = EmailSearchFilters {
1121 sender: params.get("sender").cloned(),
1122 recipient: params.get("recipient").cloned(),
1123 subject: params.get("subject").cloned(),
1124 body: params.get("body").cloned(),
1125 since: params
1126 .get("since")
1127 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1128 .map(|dt| dt.with_timezone(&chrono::Utc)),
1129 until: params
1130 .get("until")
1131 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1132 .map(|dt| dt.with_timezone(&chrono::Utc)),
1133 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1134 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1135 };
1136
1137 match smtp_registry.search_emails(filters) {
1138 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1139 Err(e) => (
1140 StatusCode::INTERNAL_SERVER_ERROR,
1141 Json(serde_json::json!({
1142 "error": "Failed to search emails",
1143 "message": e.to_string()
1144 })),
1145 ),
1146 }
1147 } else {
1148 (
1149 StatusCode::NOT_IMPLEMENTED,
1150 Json(serde_json::json!({
1151 "error": "SMTP mailbox management not available",
1152 "message": "SMTP server is not enabled or registry not available."
1153 })),
1154 )
1155 }
1156}
1157
1158#[cfg(feature = "mqtt")]
1160#[derive(Debug, Clone, Serialize, Deserialize)]
1161pub struct MqttBrokerStats {
1162 pub connected_clients: usize,
1164 pub active_topics: usize,
1166 pub retained_messages: usize,
1168 pub total_subscriptions: usize,
1170}
1171
1172#[cfg(feature = "mqtt")]
1174async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1175 if let Some(broker) = &state.mqtt_broker {
1176 let connected_clients = broker.get_connected_clients().await.len();
1177 let active_topics = broker.get_active_topics().await.len();
1178 let stats = broker.get_topic_stats().await;
1179
1180 let broker_stats = MqttBrokerStats {
1181 connected_clients,
1182 active_topics,
1183 retained_messages: stats.retained_messages,
1184 total_subscriptions: stats.total_subscriptions,
1185 };
1186
1187 Json(broker_stats).into_response()
1188 } else {
1189 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1190 }
1191}
1192
1193#[cfg(feature = "mqtt")]
1194async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1195 if let Some(broker) = &state.mqtt_broker {
1196 let clients = broker.get_connected_clients().await;
1197 Json(serde_json::json!({
1198 "clients": clients
1199 }))
1200 .into_response()
1201 } else {
1202 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1203 }
1204}
1205
1206#[cfg(feature = "mqtt")]
1207async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1208 if let Some(broker) = &state.mqtt_broker {
1209 let topics = broker.get_active_topics().await;
1210 Json(serde_json::json!({
1211 "topics": topics
1212 }))
1213 .into_response()
1214 } else {
1215 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1216 }
1217}
1218
1219#[cfg(feature = "mqtt")]
1220async fn disconnect_mqtt_client(
1221 State(state): State<ManagementState>,
1222 Path(client_id): Path<String>,
1223) -> impl IntoResponse {
1224 if let Some(broker) = &state.mqtt_broker {
1225 match broker.disconnect_client(&client_id).await {
1226 Ok(_) => {
1227 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1228 }
1229 Err(e) => {
1230 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1231 .into_response()
1232 }
1233 }
1234 } else {
1235 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1236 }
1237}
1238
1239#[cfg(feature = "mqtt")]
1242#[derive(Debug, Deserialize)]
1244pub struct MqttPublishRequest {
1245 pub topic: String,
1247 pub payload: String,
1249 #[serde(default = "default_qos")]
1251 pub qos: u8,
1252 #[serde(default)]
1254 pub retain: bool,
1255}
1256
1257#[cfg(feature = "mqtt")]
1258fn default_qos() -> u8 {
1259 0
1260}
1261
1262#[cfg(feature = "mqtt")]
1263async fn publish_mqtt_message_handler(
1265 State(state): State<ManagementState>,
1266 Json(request): Json<serde_json::Value>,
1267) -> impl IntoResponse {
1268 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1270 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1271 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1272 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1273
1274 if topic.is_none() || payload.is_none() {
1275 return (
1276 StatusCode::BAD_REQUEST,
1277 Json(serde_json::json!({
1278 "error": "Invalid request",
1279 "message": "Missing required fields: topic and payload"
1280 })),
1281 );
1282 }
1283
1284 let topic = topic.unwrap();
1285 let payload = payload.unwrap();
1286
1287 if let Some(broker) = &state.mqtt_broker {
1288 if qos > 2 {
1290 return (
1291 StatusCode::BAD_REQUEST,
1292 Json(serde_json::json!({
1293 "error": "Invalid QoS",
1294 "message": "QoS must be 0, 1, or 2"
1295 })),
1296 );
1297 }
1298
1299 let payload_bytes = payload.as_bytes().to_vec();
1301 let client_id = "mockforge-management-api".to_string();
1302
1303 let publish_result = broker
1304 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1305 .await
1306 .map_err(|e| format!("{}", e));
1307
1308 match publish_result {
1309 Ok(_) => {
1310 let event = MessageEvent::Mqtt(MqttMessageEvent {
1312 topic: topic.clone(),
1313 payload: payload.clone(),
1314 qos,
1315 retain,
1316 timestamp: chrono::Utc::now().to_rfc3339(),
1317 });
1318 let _ = state.message_events.send(event);
1319
1320 (
1321 StatusCode::OK,
1322 Json(serde_json::json!({
1323 "success": true,
1324 "message": format!("Message published to topic '{}'", topic),
1325 "topic": topic,
1326 "qos": qos,
1327 "retain": retain
1328 })),
1329 )
1330 }
1331 Err(error_msg) => (
1332 StatusCode::INTERNAL_SERVER_ERROR,
1333 Json(serde_json::json!({
1334 "error": "Failed to publish message",
1335 "message": error_msg
1336 })),
1337 ),
1338 }
1339 } else {
1340 (
1341 StatusCode::SERVICE_UNAVAILABLE,
1342 Json(serde_json::json!({
1343 "error": "MQTT broker not available",
1344 "message": "MQTT broker is not enabled or not available."
1345 })),
1346 )
1347 }
1348}
1349
1350#[cfg(not(feature = "mqtt"))]
1351async fn publish_mqtt_message_handler(
1353 State(_state): State<ManagementState>,
1354 Json(_request): Json<serde_json::Value>,
1355) -> impl IntoResponse {
1356 (
1357 StatusCode::SERVICE_UNAVAILABLE,
1358 Json(serde_json::json!({
1359 "error": "MQTT feature not enabled",
1360 "message": "MQTT support is not compiled into this build"
1361 })),
1362 )
1363}
1364
1365#[cfg(feature = "mqtt")]
1366#[derive(Debug, Deserialize)]
1368pub struct MqttBatchPublishRequest {
1369 pub messages: Vec<MqttPublishRequest>,
1371 #[serde(default = "default_delay")]
1373 pub delay_ms: u64,
1374}
1375
1376#[cfg(feature = "mqtt")]
1377fn default_delay() -> u64 {
1378 100
1379}
1380
1381#[cfg(feature = "mqtt")]
1382async fn publish_mqtt_batch_handler(
1384 State(state): State<ManagementState>,
1385 Json(request): Json<serde_json::Value>,
1386) -> impl IntoResponse {
1387 let messages_json = request.get("messages").and_then(|v| v.as_array());
1389 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1390
1391 if messages_json.is_none() {
1392 return (
1393 StatusCode::BAD_REQUEST,
1394 Json(serde_json::json!({
1395 "error": "Invalid request",
1396 "message": "Missing required field: messages"
1397 })),
1398 );
1399 }
1400
1401 let messages_json = messages_json.unwrap();
1402
1403 if let Some(broker) = &state.mqtt_broker {
1404 if messages_json.is_empty() {
1405 return (
1406 StatusCode::BAD_REQUEST,
1407 Json(serde_json::json!({
1408 "error": "Empty batch",
1409 "message": "At least one message is required"
1410 })),
1411 );
1412 }
1413
1414 let mut results = Vec::new();
1415 let client_id = "mockforge-management-api".to_string();
1416
1417 for (index, msg_json) in messages_json.iter().enumerate() {
1418 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1419 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1420 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1421 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1422
1423 if topic.is_none() || payload.is_none() {
1424 results.push(serde_json::json!({
1425 "index": index,
1426 "success": false,
1427 "error": "Missing required fields: topic and payload"
1428 }));
1429 continue;
1430 }
1431
1432 let topic = topic.unwrap();
1433 let payload = payload.unwrap();
1434
1435 if qos > 2 {
1437 results.push(serde_json::json!({
1438 "index": index,
1439 "success": false,
1440 "error": "Invalid QoS (must be 0, 1, or 2)"
1441 }));
1442 continue;
1443 }
1444
1445 let payload_bytes = payload.as_bytes().to_vec();
1447
1448 let publish_result = broker
1449 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1450 .await
1451 .map_err(|e| format!("{}", e));
1452
1453 match publish_result {
1454 Ok(_) => {
1455 let event = MessageEvent::Mqtt(MqttMessageEvent {
1457 topic: topic.clone(),
1458 payload: payload.clone(),
1459 qos,
1460 retain,
1461 timestamp: chrono::Utc::now().to_rfc3339(),
1462 });
1463 let _ = state.message_events.send(event);
1464
1465 results.push(serde_json::json!({
1466 "index": index,
1467 "success": true,
1468 "topic": topic,
1469 "qos": qos
1470 }));
1471 }
1472 Err(error_msg) => {
1473 results.push(serde_json::json!({
1474 "index": index,
1475 "success": false,
1476 "error": error_msg
1477 }));
1478 }
1479 }
1480
1481 if index < messages_json.len() - 1 && delay_ms > 0 {
1483 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1484 }
1485 }
1486
1487 let success_count =
1488 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1489
1490 (
1491 StatusCode::OK,
1492 Json(serde_json::json!({
1493 "success": true,
1494 "total": messages_json.len(),
1495 "succeeded": success_count,
1496 "failed": messages_json.len() - success_count,
1497 "results": results
1498 })),
1499 )
1500 } else {
1501 (
1502 StatusCode::SERVICE_UNAVAILABLE,
1503 Json(serde_json::json!({
1504 "error": "MQTT broker not available",
1505 "message": "MQTT broker is not enabled or not available."
1506 })),
1507 )
1508 }
1509}
1510
1511#[cfg(not(feature = "mqtt"))]
1512async fn publish_mqtt_batch_handler(
1514 State(_state): State<ManagementState>,
1515 Json(_request): Json<serde_json::Value>,
1516) -> impl IntoResponse {
1517 (
1518 StatusCode::SERVICE_UNAVAILABLE,
1519 Json(serde_json::json!({
1520 "error": "MQTT feature not enabled",
1521 "message": "MQTT support is not compiled into this build"
1522 })),
1523 )
1524}
1525
1526#[derive(Debug, Deserialize)]
1530struct SetMigrationModeRequest {
1531 mode: String,
1532}
1533
1534async fn get_migration_routes(
1536 State(state): State<ManagementState>,
1537) -> Result<Json<serde_json::Value>, StatusCode> {
1538 let proxy_config = match &state.proxy_config {
1539 Some(config) => config,
1540 None => {
1541 return Ok(Json(serde_json::json!({
1542 "error": "Migration not configured. Proxy config not available."
1543 })));
1544 }
1545 };
1546
1547 let config = proxy_config.read().await;
1548 let routes = config.get_migration_routes();
1549
1550 Ok(Json(serde_json::json!({
1551 "routes": routes
1552 })))
1553}
1554
1555async fn toggle_route_migration(
1557 State(state): State<ManagementState>,
1558 Path(pattern): Path<String>,
1559) -> Result<Json<serde_json::Value>, StatusCode> {
1560 let proxy_config = match &state.proxy_config {
1561 Some(config) => config,
1562 None => {
1563 return Ok(Json(serde_json::json!({
1564 "error": "Migration not configured. Proxy config not available."
1565 })));
1566 }
1567 };
1568
1569 let mut config = proxy_config.write().await;
1570 let new_mode = match config.toggle_route_migration(&pattern) {
1571 Some(mode) => mode,
1572 None => {
1573 return Ok(Json(serde_json::json!({
1574 "error": format!("Route pattern not found: {}", pattern)
1575 })));
1576 }
1577 };
1578
1579 Ok(Json(serde_json::json!({
1580 "pattern": pattern,
1581 "mode": format!("{:?}", new_mode).to_lowercase()
1582 })))
1583}
1584
1585async fn set_route_migration_mode(
1587 State(state): State<ManagementState>,
1588 Path(pattern): Path<String>,
1589 Json(request): Json<SetMigrationModeRequest>,
1590) -> Result<Json<serde_json::Value>, StatusCode> {
1591 let proxy_config = match &state.proxy_config {
1592 Some(config) => config,
1593 None => {
1594 return Ok(Json(serde_json::json!({
1595 "error": "Migration not configured. Proxy config not available."
1596 })));
1597 }
1598 };
1599
1600 use mockforge_core::proxy::config::MigrationMode;
1601 let mode = match request.mode.to_lowercase().as_str() {
1602 "mock" => MigrationMode::Mock,
1603 "shadow" => MigrationMode::Shadow,
1604 "real" => MigrationMode::Real,
1605 "auto" => MigrationMode::Auto,
1606 _ => {
1607 return Ok(Json(serde_json::json!({
1608 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1609 })));
1610 }
1611 };
1612
1613 let mut config = proxy_config.write().await;
1614 let updated = config.update_rule_migration_mode(&pattern, mode);
1615
1616 if !updated {
1617 return Ok(Json(serde_json::json!({
1618 "error": format!("Route pattern not found: {}", pattern)
1619 })));
1620 }
1621
1622 Ok(Json(serde_json::json!({
1623 "pattern": pattern,
1624 "mode": format!("{:?}", mode).to_lowercase()
1625 })))
1626}
1627
1628async fn toggle_group_migration(
1630 State(state): State<ManagementState>,
1631 Path(group): Path<String>,
1632) -> Result<Json<serde_json::Value>, StatusCode> {
1633 let proxy_config = match &state.proxy_config {
1634 Some(config) => config,
1635 None => {
1636 return Ok(Json(serde_json::json!({
1637 "error": "Migration not configured. Proxy config not available."
1638 })));
1639 }
1640 };
1641
1642 let mut config = proxy_config.write().await;
1643 let new_mode = config.toggle_group_migration(&group);
1644
1645 Ok(Json(serde_json::json!({
1646 "group": group,
1647 "mode": format!("{:?}", new_mode).to_lowercase()
1648 })))
1649}
1650
1651async fn set_group_migration_mode(
1653 State(state): State<ManagementState>,
1654 Path(group): Path<String>,
1655 Json(request): Json<SetMigrationModeRequest>,
1656) -> Result<Json<serde_json::Value>, StatusCode> {
1657 let proxy_config = match &state.proxy_config {
1658 Some(config) => config,
1659 None => {
1660 return Ok(Json(serde_json::json!({
1661 "error": "Migration not configured. Proxy config not available."
1662 })));
1663 }
1664 };
1665
1666 use mockforge_core::proxy::config::MigrationMode;
1667 let mode = match request.mode.to_lowercase().as_str() {
1668 "mock" => MigrationMode::Mock,
1669 "shadow" => MigrationMode::Shadow,
1670 "real" => MigrationMode::Real,
1671 "auto" => MigrationMode::Auto,
1672 _ => {
1673 return Ok(Json(serde_json::json!({
1674 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1675 })));
1676 }
1677 };
1678
1679 let mut config = proxy_config.write().await;
1680 config.update_group_migration_mode(&group, mode);
1681
1682 Ok(Json(serde_json::json!({
1683 "group": group,
1684 "mode": format!("{:?}", mode).to_lowercase()
1685 })))
1686}
1687
1688async fn get_migration_groups(
1690 State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692 let proxy_config = match &state.proxy_config {
1693 Some(config) => config,
1694 None => {
1695 return Ok(Json(serde_json::json!({
1696 "error": "Migration not configured. Proxy config not available."
1697 })));
1698 }
1699 };
1700
1701 let config = proxy_config.read().await;
1702 let groups = config.get_migration_groups();
1703
1704 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1706 .into_iter()
1707 .map(|(name, info)| {
1708 (
1709 name,
1710 serde_json::json!({
1711 "name": info.name,
1712 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1713 "route_count": info.route_count
1714 }),
1715 )
1716 })
1717 .collect();
1718
1719 Ok(Json(serde_json::json!(groups_json)))
1720}
1721
1722async fn get_migration_status(
1724 State(state): State<ManagementState>,
1725) -> Result<Json<serde_json::Value>, StatusCode> {
1726 let proxy_config = match &state.proxy_config {
1727 Some(config) => config,
1728 None => {
1729 return Ok(Json(serde_json::json!({
1730 "error": "Migration not configured. Proxy config not available."
1731 })));
1732 }
1733 };
1734
1735 let config = proxy_config.read().await;
1736 let routes = config.get_migration_routes();
1737 let groups = config.get_migration_groups();
1738
1739 let mut mock_count = 0;
1740 let mut shadow_count = 0;
1741 let mut real_count = 0;
1742 let mut auto_count = 0;
1743
1744 for route in &routes {
1745 match route.migration_mode {
1746 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1747 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1748 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1749 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1750 }
1751 }
1752
1753 Ok(Json(serde_json::json!({
1754 "total_routes": routes.len(),
1755 "mock_routes": mock_count,
1756 "shadow_routes": shadow_count,
1757 "real_routes": real_count,
1758 "auto_routes": auto_count,
1759 "total_groups": groups.len(),
1760 "migration_enabled": config.migration_enabled
1761 })))
1762}
1763
1764#[derive(Debug, Deserialize, Serialize)]
1768pub struct ProxyRuleRequest {
1769 pub pattern: String,
1771 #[serde(rename = "type")]
1773 pub rule_type: String,
1774 #[serde(default)]
1776 pub status_codes: Vec<u16>,
1777 pub body_transforms: Vec<BodyTransformRequest>,
1779 #[serde(default = "default_true")]
1781 pub enabled: bool,
1782}
1783
1784#[derive(Debug, Deserialize, Serialize)]
1786pub struct BodyTransformRequest {
1787 pub path: String,
1789 pub replace: String,
1791 #[serde(default)]
1793 pub operation: String,
1794}
1795
1796#[derive(Debug, Serialize)]
1798pub struct ProxyRuleResponse {
1799 pub id: usize,
1801 pub pattern: String,
1803 #[serde(rename = "type")]
1805 pub rule_type: String,
1806 pub status_codes: Vec<u16>,
1808 pub body_transforms: Vec<BodyTransformRequest>,
1810 pub enabled: bool,
1812}
1813
1814async fn list_proxy_rules(
1816 State(state): State<ManagementState>,
1817) -> Result<Json<serde_json::Value>, StatusCode> {
1818 let proxy_config = match &state.proxy_config {
1819 Some(config) => config,
1820 None => {
1821 return Ok(Json(serde_json::json!({
1822 "error": "Proxy not configured. Proxy config not available."
1823 })));
1824 }
1825 };
1826
1827 let config = proxy_config.read().await;
1828
1829 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1830
1831 for (idx, rule) in config.request_replacements.iter().enumerate() {
1833 rules.push(ProxyRuleResponse {
1834 id: idx,
1835 pattern: rule.pattern.clone(),
1836 rule_type: "request".to_string(),
1837 status_codes: Vec::new(),
1838 body_transforms: rule
1839 .body_transforms
1840 .iter()
1841 .map(|t| BodyTransformRequest {
1842 path: t.path.clone(),
1843 replace: t.replace.clone(),
1844 operation: format!("{:?}", t.operation).to_lowercase(),
1845 })
1846 .collect(),
1847 enabled: rule.enabled,
1848 });
1849 }
1850
1851 let request_count = config.request_replacements.len();
1853 for (idx, rule) in config.response_replacements.iter().enumerate() {
1854 rules.push(ProxyRuleResponse {
1855 id: request_count + idx,
1856 pattern: rule.pattern.clone(),
1857 rule_type: "response".to_string(),
1858 status_codes: rule.status_codes.clone(),
1859 body_transforms: rule
1860 .body_transforms
1861 .iter()
1862 .map(|t| BodyTransformRequest {
1863 path: t.path.clone(),
1864 replace: t.replace.clone(),
1865 operation: format!("{:?}", t.operation).to_lowercase(),
1866 })
1867 .collect(),
1868 enabled: rule.enabled,
1869 });
1870 }
1871
1872 Ok(Json(serde_json::json!({
1873 "rules": rules
1874 })))
1875}
1876
1877async fn create_proxy_rule(
1879 State(state): State<ManagementState>,
1880 Json(request): Json<ProxyRuleRequest>,
1881) -> Result<Json<serde_json::Value>, StatusCode> {
1882 let proxy_config = match &state.proxy_config {
1883 Some(config) => config,
1884 None => {
1885 return Ok(Json(serde_json::json!({
1886 "error": "Proxy not configured. Proxy config not available."
1887 })));
1888 }
1889 };
1890
1891 if request.body_transforms.is_empty() {
1893 return Ok(Json(serde_json::json!({
1894 "error": "At least one body transform is required"
1895 })));
1896 }
1897
1898 let body_transforms: Vec<BodyTransform> = request
1899 .body_transforms
1900 .iter()
1901 .map(|t| {
1902 let op = match t.operation.as_str() {
1903 "replace" => TransformOperation::Replace,
1904 "add" => TransformOperation::Add,
1905 "remove" => TransformOperation::Remove,
1906 _ => TransformOperation::Replace,
1907 };
1908 BodyTransform {
1909 path: t.path.clone(),
1910 replace: t.replace.clone(),
1911 operation: op,
1912 }
1913 })
1914 .collect();
1915
1916 let new_rule = BodyTransformRule {
1917 pattern: request.pattern.clone(),
1918 status_codes: request.status_codes.clone(),
1919 body_transforms,
1920 enabled: request.enabled,
1921 };
1922
1923 let mut config = proxy_config.write().await;
1924
1925 let rule_id = if request.rule_type == "request" {
1926 config.request_replacements.push(new_rule);
1927 config.request_replacements.len() - 1
1928 } else if request.rule_type == "response" {
1929 config.response_replacements.push(new_rule);
1930 config.request_replacements.len() + config.response_replacements.len() - 1
1931 } else {
1932 return Ok(Json(serde_json::json!({
1933 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1934 })));
1935 };
1936
1937 Ok(Json(serde_json::json!({
1938 "id": rule_id,
1939 "message": "Rule created successfully"
1940 })))
1941}
1942
1943async fn get_proxy_rule(
1945 State(state): State<ManagementState>,
1946 Path(id): Path<String>,
1947) -> Result<Json<serde_json::Value>, StatusCode> {
1948 let proxy_config = match &state.proxy_config {
1949 Some(config) => config,
1950 None => {
1951 return Ok(Json(serde_json::json!({
1952 "error": "Proxy not configured. Proxy config not available."
1953 })));
1954 }
1955 };
1956
1957 let config = proxy_config.read().await;
1958 let rule_id: usize = match id.parse() {
1959 Ok(id) => id,
1960 Err(_) => {
1961 return Ok(Json(serde_json::json!({
1962 "error": format!("Invalid rule ID: {}", id)
1963 })));
1964 }
1965 };
1966
1967 let request_count = config.request_replacements.len();
1968
1969 if rule_id < request_count {
1970 let rule = &config.request_replacements[rule_id];
1972 Ok(Json(serde_json::json!({
1973 "id": rule_id,
1974 "pattern": rule.pattern,
1975 "type": "request",
1976 "status_codes": [],
1977 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1978 "path": t.path,
1979 "replace": t.replace,
1980 "operation": format!("{:?}", t.operation).to_lowercase()
1981 })).collect::<Vec<_>>(),
1982 "enabled": rule.enabled
1983 })))
1984 } else if rule_id < request_count + config.response_replacements.len() {
1985 let response_idx = rule_id - request_count;
1987 let rule = &config.response_replacements[response_idx];
1988 Ok(Json(serde_json::json!({
1989 "id": rule_id,
1990 "pattern": rule.pattern,
1991 "type": "response",
1992 "status_codes": rule.status_codes,
1993 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1994 "path": t.path,
1995 "replace": t.replace,
1996 "operation": format!("{:?}", t.operation).to_lowercase()
1997 })).collect::<Vec<_>>(),
1998 "enabled": rule.enabled
1999 })))
2000 } else {
2001 Ok(Json(serde_json::json!({
2002 "error": format!("Rule ID {} not found", rule_id)
2003 })))
2004 }
2005}
2006
2007async fn update_proxy_rule(
2009 State(state): State<ManagementState>,
2010 Path(id): Path<String>,
2011 Json(request): Json<ProxyRuleRequest>,
2012) -> Result<Json<serde_json::Value>, StatusCode> {
2013 let proxy_config = match &state.proxy_config {
2014 Some(config) => config,
2015 None => {
2016 return Ok(Json(serde_json::json!({
2017 "error": "Proxy not configured. Proxy config not available."
2018 })));
2019 }
2020 };
2021
2022 let mut config = proxy_config.write().await;
2023 let rule_id: usize = match id.parse() {
2024 Ok(id) => id,
2025 Err(_) => {
2026 return Ok(Json(serde_json::json!({
2027 "error": format!("Invalid rule ID: {}", id)
2028 })));
2029 }
2030 };
2031
2032 let body_transforms: Vec<BodyTransform> = request
2033 .body_transforms
2034 .iter()
2035 .map(|t| {
2036 let op = match t.operation.as_str() {
2037 "replace" => TransformOperation::Replace,
2038 "add" => TransformOperation::Add,
2039 "remove" => TransformOperation::Remove,
2040 _ => TransformOperation::Replace,
2041 };
2042 BodyTransform {
2043 path: t.path.clone(),
2044 replace: t.replace.clone(),
2045 operation: op,
2046 }
2047 })
2048 .collect();
2049
2050 let updated_rule = BodyTransformRule {
2051 pattern: request.pattern.clone(),
2052 status_codes: request.status_codes.clone(),
2053 body_transforms,
2054 enabled: request.enabled,
2055 };
2056
2057 let request_count = config.request_replacements.len();
2058
2059 if rule_id < request_count {
2060 config.request_replacements[rule_id] = updated_rule;
2062 } else if rule_id < request_count + config.response_replacements.len() {
2063 let response_idx = rule_id - request_count;
2065 config.response_replacements[response_idx] = updated_rule;
2066 } else {
2067 return Ok(Json(serde_json::json!({
2068 "error": format!("Rule ID {} not found", rule_id)
2069 })));
2070 }
2071
2072 Ok(Json(serde_json::json!({
2073 "id": rule_id,
2074 "message": "Rule updated successfully"
2075 })))
2076}
2077
2078async fn delete_proxy_rule(
2080 State(state): State<ManagementState>,
2081 Path(id): Path<String>,
2082) -> Result<Json<serde_json::Value>, StatusCode> {
2083 let proxy_config = match &state.proxy_config {
2084 Some(config) => config,
2085 None => {
2086 return Ok(Json(serde_json::json!({
2087 "error": "Proxy not configured. Proxy config not available."
2088 })));
2089 }
2090 };
2091
2092 let mut config = proxy_config.write().await;
2093 let rule_id: usize = match id.parse() {
2094 Ok(id) => id,
2095 Err(_) => {
2096 return Ok(Json(serde_json::json!({
2097 "error": format!("Invalid rule ID: {}", id)
2098 })));
2099 }
2100 };
2101
2102 let request_count = config.request_replacements.len();
2103
2104 if rule_id < request_count {
2105 config.request_replacements.remove(rule_id);
2107 } else if rule_id < request_count + config.response_replacements.len() {
2108 let response_idx = rule_id - request_count;
2110 config.response_replacements.remove(response_idx);
2111 } else {
2112 return Ok(Json(serde_json::json!({
2113 "error": format!("Rule ID {} not found", rule_id)
2114 })));
2115 }
2116
2117 Ok(Json(serde_json::json!({
2118 "id": rule_id,
2119 "message": "Rule deleted successfully"
2120 })))
2121}
2122
2123async fn get_proxy_inspect(
2126 State(_state): State<ManagementState>,
2127 Query(params): Query<std::collections::HashMap<String, String>>,
2128) -> Result<Json<serde_json::Value>, StatusCode> {
2129 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2130
2131 Ok(Json(serde_json::json!({
2137 "requests": [],
2138 "responses": [],
2139 "limit": limit,
2140 "total": 0,
2141 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2142 })))
2143}
2144
2145pub fn management_router(state: ManagementState) -> Router {
2147 let router = Router::new()
2148 .route("/health", get(health_check))
2149 .route("/stats", get(get_stats))
2150 .route("/config", get(get_config))
2151 .route("/config/validate", post(validate_config))
2152 .route("/config/bulk", post(bulk_update_config))
2153 .route("/mocks", get(list_mocks))
2154 .route("/mocks", post(create_mock))
2155 .route("/mocks/{id}", get(get_mock))
2156 .route("/mocks/{id}", put(update_mock))
2157 .route("/mocks/{id}", delete(delete_mock))
2158 .route("/export", get(export_mocks))
2159 .route("/import", post(import_mocks));
2160
2161 #[cfg(feature = "smtp")]
2162 let router = router
2163 .route("/smtp/mailbox", get(list_smtp_emails))
2164 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2165 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2166 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2167 .route("/smtp/mailbox/search", get(search_smtp_emails));
2168
2169 #[cfg(not(feature = "smtp"))]
2170 let router = router;
2171
2172 #[cfg(feature = "mqtt")]
2174 let router = router
2175 .route("/mqtt/stats", get(get_mqtt_stats))
2176 .route("/mqtt/clients", get(get_mqtt_clients))
2177 .route("/mqtt/topics", get(get_mqtt_topics))
2178 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2179 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2180 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2181 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2182
2183 #[cfg(not(feature = "mqtt"))]
2184 let router = router
2185 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2186 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2187
2188 #[cfg(feature = "kafka")]
2189 let router = router
2190 .route("/kafka/stats", get(get_kafka_stats))
2191 .route("/kafka/topics", get(get_kafka_topics))
2192 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2193 .route("/kafka/groups", get(get_kafka_groups))
2194 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2195 .route("/kafka/produce", post(produce_kafka_message))
2196 .route("/kafka/produce/batch", post(produce_kafka_batch))
2197 .route("/kafka/messages/stream", get(kafka_messages_stream));
2198
2199 #[cfg(not(feature = "kafka"))]
2200 let router = router;
2201
2202 let router = router
2204 .route("/migration/routes", get(get_migration_routes))
2205 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2206 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2207 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2208 .route("/migration/groups/{group}", put(set_group_migration_mode))
2209 .route("/migration/groups", get(get_migration_groups))
2210 .route("/migration/status", get(get_migration_status));
2211
2212 let router = router
2214 .route("/proxy/rules", get(list_proxy_rules))
2215 .route("/proxy/rules", post(create_proxy_rule))
2216 .route("/proxy/rules/{id}", get(get_proxy_rule))
2217 .route("/proxy/rules/{id}", put(update_proxy_rule))
2218 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2219 .route("/proxy/inspect", get(get_proxy_inspect));
2220
2221 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2223
2224 let router = router.nest(
2226 "/snapshot-diff",
2227 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2228 );
2229
2230 #[cfg(feature = "behavioral-cloning")]
2231 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2232
2233 let router = router
2234 .route("/mockai/learn", post(learn_from_examples))
2235 .route("/mockai/rules/explanations", get(list_rule_explanations))
2236 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2237 .route("/chaos/config", get(get_chaos_config))
2238 .route("/chaos/config", post(update_chaos_config))
2239 .route("/network/profiles", get(list_network_profiles))
2240 .route("/network/profile/apply", post(apply_network_profile));
2241
2242 let router =
2244 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2245
2246 router.with_state(state)
2247}
2248
2249#[cfg(feature = "kafka")]
2250#[derive(Debug, Clone, Serialize, Deserialize)]
2251pub struct KafkaBrokerStats {
2252 pub topics: usize,
2254 pub partitions: usize,
2256 pub consumer_groups: usize,
2258 pub messages_produced: u64,
2260 pub messages_consumed: u64,
2262}
2263
2264#[cfg(feature = "kafka")]
2265#[derive(Debug, Clone, Serialize, Deserialize)]
2266pub struct KafkaTopicInfo {
2267 pub name: String,
2268 pub partitions: usize,
2269 pub replication_factor: i32,
2270}
2271
2272#[cfg(feature = "kafka")]
2273#[derive(Debug, Clone, Serialize, Deserialize)]
2274pub struct KafkaConsumerGroupInfo {
2275 pub group_id: String,
2276 pub members: usize,
2277 pub state: String,
2278}
2279
2280#[cfg(feature = "kafka")]
2281async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2283 if let Some(broker) = &state.kafka_broker {
2284 let topics = broker.topics.read().await;
2285 let consumer_groups = broker.consumer_groups.read().await;
2286
2287 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2288
2289 let metrics_snapshot = broker.metrics().snapshot();
2291
2292 let stats = KafkaBrokerStats {
2293 topics: topics.len(),
2294 partitions: total_partitions,
2295 consumer_groups: consumer_groups.groups().len(),
2296 messages_produced: metrics_snapshot.messages_produced_total,
2297 messages_consumed: metrics_snapshot.messages_consumed_total,
2298 };
2299
2300 Json(stats).into_response()
2301 } else {
2302 (
2303 StatusCode::SERVICE_UNAVAILABLE,
2304 Json(serde_json::json!({
2305 "error": "Kafka broker not available",
2306 "message": "Kafka broker is not enabled or not available."
2307 })),
2308 )
2309 .into_response()
2310 }
2311}
2312
2313#[cfg(feature = "kafka")]
2314async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2316 if let Some(broker) = &state.kafka_broker {
2317 let topics = broker.topics.read().await;
2318 let topic_list: Vec<KafkaTopicInfo> = topics
2319 .iter()
2320 .map(|(name, topic)| KafkaTopicInfo {
2321 name: name.clone(),
2322 partitions: topic.partitions.len(),
2323 replication_factor: topic.config.replication_factor as i32,
2324 })
2325 .collect();
2326
2327 Json(serde_json::json!({
2328 "topics": topic_list
2329 }))
2330 .into_response()
2331 } else {
2332 (
2333 StatusCode::SERVICE_UNAVAILABLE,
2334 Json(serde_json::json!({
2335 "error": "Kafka broker not available",
2336 "message": "Kafka broker is not enabled or not available."
2337 })),
2338 )
2339 .into_response()
2340 }
2341}
2342
2343#[cfg(feature = "kafka")]
2344async fn get_kafka_topic(
2346 State(state): State<ManagementState>,
2347 Path(topic_name): Path<String>,
2348) -> impl IntoResponse {
2349 if let Some(broker) = &state.kafka_broker {
2350 let topics = broker.topics.read().await;
2351 if let Some(topic) = topics.get(&topic_name) {
2352 Json(serde_json::json!({
2353 "name": topic_name,
2354 "partitions": topic.partitions.len(),
2355 "replication_factor": topic.config.replication_factor,
2356 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2357 "id": idx as i32,
2358 "leader": 0,
2359 "replicas": vec![0],
2360 "message_count": partition.messages.len()
2361 })).collect::<Vec<_>>()
2362 })).into_response()
2363 } else {
2364 (
2365 StatusCode::NOT_FOUND,
2366 Json(serde_json::json!({
2367 "error": "Topic not found",
2368 "topic": topic_name
2369 })),
2370 )
2371 .into_response()
2372 }
2373 } else {
2374 (
2375 StatusCode::SERVICE_UNAVAILABLE,
2376 Json(serde_json::json!({
2377 "error": "Kafka broker not available",
2378 "message": "Kafka broker is not enabled or not available."
2379 })),
2380 )
2381 .into_response()
2382 }
2383}
2384
2385#[cfg(feature = "kafka")]
2386async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2388 if let Some(broker) = &state.kafka_broker {
2389 let consumer_groups = broker.consumer_groups.read().await;
2390 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2391 .groups()
2392 .iter()
2393 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2394 group_id: group_id.clone(),
2395 members: group.members.len(),
2396 state: "Stable".to_string(), })
2398 .collect();
2399
2400 Json(serde_json::json!({
2401 "groups": groups
2402 }))
2403 .into_response()
2404 } else {
2405 (
2406 StatusCode::SERVICE_UNAVAILABLE,
2407 Json(serde_json::json!({
2408 "error": "Kafka broker not available",
2409 "message": "Kafka broker is not enabled or not available."
2410 })),
2411 )
2412 .into_response()
2413 }
2414}
2415
2416#[cfg(feature = "kafka")]
2417async fn get_kafka_group(
2419 State(state): State<ManagementState>,
2420 Path(group_id): Path<String>,
2421) -> impl IntoResponse {
2422 if let Some(broker) = &state.kafka_broker {
2423 let consumer_groups = broker.consumer_groups.read().await;
2424 if let Some(group) = consumer_groups.groups().get(&group_id) {
2425 Json(serde_json::json!({
2426 "group_id": group_id,
2427 "members": group.members.len(),
2428 "state": "Stable",
2429 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2430 "member_id": member_id,
2431 "client_id": member.client_id,
2432 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2433 "topic": a.topic,
2434 "partitions": a.partitions
2435 })).collect::<Vec<_>>()
2436 })).collect::<Vec<_>>(),
2437 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2438 "topic": topic,
2439 "partition": partition,
2440 "offset": offset
2441 })).collect::<Vec<_>>()
2442 })).into_response()
2443 } else {
2444 (
2445 StatusCode::NOT_FOUND,
2446 Json(serde_json::json!({
2447 "error": "Consumer group not found",
2448 "group_id": group_id
2449 })),
2450 )
2451 .into_response()
2452 }
2453 } else {
2454 (
2455 StatusCode::SERVICE_UNAVAILABLE,
2456 Json(serde_json::json!({
2457 "error": "Kafka broker not available",
2458 "message": "Kafka broker is not enabled or not available."
2459 })),
2460 )
2461 .into_response()
2462 }
2463}
2464
2465#[cfg(feature = "kafka")]
2468#[derive(Debug, Deserialize)]
2469pub struct KafkaProduceRequest {
2470 pub topic: String,
2472 #[serde(default)]
2474 pub key: Option<String>,
2475 pub value: String,
2477 #[serde(default)]
2479 pub partition: Option<i32>,
2480 #[serde(default)]
2482 pub headers: Option<std::collections::HashMap<String, String>>,
2483}
2484
2485#[cfg(feature = "kafka")]
2486async fn produce_kafka_message(
2488 State(state): State<ManagementState>,
2489 Json(request): Json<KafkaProduceRequest>,
2490) -> impl IntoResponse {
2491 if let Some(broker) = &state.kafka_broker {
2492 let mut topics = broker.topics.write().await;
2493
2494 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2496 mockforge_kafka::topics::Topic::new(
2497 request.topic.clone(),
2498 mockforge_kafka::topics::TopicConfig::default(),
2499 )
2500 });
2501
2502 let partition_id = if let Some(partition) = request.partition {
2504 partition
2505 } else {
2506 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2507 };
2508
2509 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2511 return (
2512 StatusCode::BAD_REQUEST,
2513 Json(serde_json::json!({
2514 "error": "Invalid partition",
2515 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2516 })),
2517 )
2518 .into_response();
2519 }
2520
2521 let key_clone = request.key.clone();
2523 let headers_clone = request.headers.clone();
2524 let message = mockforge_kafka::partitions::KafkaMessage {
2525 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2527 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2528 value: request.value.as_bytes().to_vec(),
2529 headers: headers_clone
2530 .clone()
2531 .unwrap_or_default()
2532 .into_iter()
2533 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2534 .collect(),
2535 };
2536
2537 match topic_entry.produce(partition_id, message).await {
2539 Ok(offset) => {
2540 if let Some(broker) = &state.kafka_broker {
2542 broker.metrics().record_messages_produced(1);
2543 }
2544
2545 #[cfg(feature = "kafka")]
2547 {
2548 let event = MessageEvent::Kafka(KafkaMessageEvent {
2549 topic: request.topic.clone(),
2550 key: key_clone,
2551 value: request.value.clone(),
2552 partition: partition_id,
2553 offset,
2554 headers: headers_clone,
2555 timestamp: chrono::Utc::now().to_rfc3339(),
2556 });
2557 let _ = state.message_events.send(event);
2558 }
2559
2560 Json(serde_json::json!({
2561 "success": true,
2562 "message": format!("Message produced to topic '{}'", request.topic),
2563 "topic": request.topic,
2564 "partition": partition_id,
2565 "offset": offset
2566 }))
2567 .into_response()
2568 }
2569 Err(e) => (
2570 StatusCode::INTERNAL_SERVER_ERROR,
2571 Json(serde_json::json!({
2572 "error": "Failed to produce message",
2573 "message": e.to_string()
2574 })),
2575 )
2576 .into_response(),
2577 }
2578 } else {
2579 (
2580 StatusCode::SERVICE_UNAVAILABLE,
2581 Json(serde_json::json!({
2582 "error": "Kafka broker not available",
2583 "message": "Kafka broker is not enabled or not available."
2584 })),
2585 )
2586 .into_response()
2587 }
2588}
2589
2590#[cfg(feature = "kafka")]
2591#[derive(Debug, Deserialize)]
2592pub struct KafkaBatchProduceRequest {
2593 pub messages: Vec<KafkaProduceRequest>,
2595 #[serde(default = "default_delay")]
2597 pub delay_ms: u64,
2598}
2599
2600#[cfg(feature = "kafka")]
2601async fn produce_kafka_batch(
2603 State(state): State<ManagementState>,
2604 Json(request): Json<KafkaBatchProduceRequest>,
2605) -> impl IntoResponse {
2606 if let Some(broker) = &state.kafka_broker {
2607 if request.messages.is_empty() {
2608 return (
2609 StatusCode::BAD_REQUEST,
2610 Json(serde_json::json!({
2611 "error": "Empty batch",
2612 "message": "At least one message is required"
2613 })),
2614 )
2615 .into_response();
2616 }
2617
2618 let mut results = Vec::new();
2619
2620 for (index, msg_request) in request.messages.iter().enumerate() {
2621 let mut topics = broker.topics.write().await;
2622
2623 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2625 mockforge_kafka::topics::Topic::new(
2626 msg_request.topic.clone(),
2627 mockforge_kafka::topics::TopicConfig::default(),
2628 )
2629 });
2630
2631 let partition_id = if let Some(partition) = msg_request.partition {
2633 partition
2634 } else {
2635 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2636 };
2637
2638 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2640 results.push(serde_json::json!({
2641 "index": index,
2642 "success": false,
2643 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2644 }));
2645 continue;
2646 }
2647
2648 let message = mockforge_kafka::partitions::KafkaMessage {
2650 offset: 0,
2651 timestamp: chrono::Utc::now().timestamp_millis(),
2652 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2653 value: msg_request.value.as_bytes().to_vec(),
2654 headers: msg_request
2655 .headers
2656 .clone()
2657 .unwrap_or_default()
2658 .into_iter()
2659 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2660 .collect(),
2661 };
2662
2663 match topic_entry.produce(partition_id, message).await {
2665 Ok(offset) => {
2666 if let Some(broker) = &state.kafka_broker {
2668 broker.metrics().record_messages_produced(1);
2669 }
2670
2671 let event = MessageEvent::Kafka(KafkaMessageEvent {
2673 topic: msg_request.topic.clone(),
2674 key: msg_request.key.clone(),
2675 value: msg_request.value.clone(),
2676 partition: partition_id,
2677 offset,
2678 headers: msg_request.headers.clone(),
2679 timestamp: chrono::Utc::now().to_rfc3339(),
2680 });
2681 let _ = state.message_events.send(event);
2682
2683 results.push(serde_json::json!({
2684 "index": index,
2685 "success": true,
2686 "topic": msg_request.topic,
2687 "partition": partition_id,
2688 "offset": offset
2689 }));
2690 }
2691 Err(e) => {
2692 results.push(serde_json::json!({
2693 "index": index,
2694 "success": false,
2695 "error": e.to_string()
2696 }));
2697 }
2698 }
2699
2700 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2702 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2703 }
2704 }
2705
2706 let success_count =
2707 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2708
2709 Json(serde_json::json!({
2710 "success": true,
2711 "total": request.messages.len(),
2712 "succeeded": success_count,
2713 "failed": request.messages.len() - success_count,
2714 "results": results
2715 }))
2716 .into_response()
2717 } else {
2718 (
2719 StatusCode::SERVICE_UNAVAILABLE,
2720 Json(serde_json::json!({
2721 "error": "Kafka broker not available",
2722 "message": "Kafka broker is not enabled or not available."
2723 })),
2724 )
2725 .into_response()
2726 }
2727}
2728
2729#[cfg(feature = "mqtt")]
2732async fn mqtt_messages_stream(
2734 State(state): State<ManagementState>,
2735 Query(params): Query<std::collections::HashMap<String, String>>,
2736) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2737 let rx = state.message_events.subscribe();
2738 let topic_filter = params.get("topic").cloned();
2739
2740 let stream = stream::unfold(rx, move |mut rx| {
2741 let topic_filter = topic_filter.clone();
2742
2743 async move {
2744 loop {
2745 match rx.recv().await {
2746 Ok(MessageEvent::Mqtt(event)) => {
2747 if let Some(filter) = &topic_filter {
2749 if !event.topic.contains(filter) {
2750 continue;
2751 }
2752 }
2753
2754 let event_json = serde_json::json!({
2755 "protocol": "mqtt",
2756 "topic": event.topic,
2757 "payload": event.payload,
2758 "qos": event.qos,
2759 "retain": event.retain,
2760 "timestamp": event.timestamp,
2761 });
2762
2763 if let Ok(event_data) = serde_json::to_string(&event_json) {
2764 let sse_event = Event::default().event("mqtt_message").data(event_data);
2765 return Some((Ok(sse_event), rx));
2766 }
2767 }
2768 #[cfg(feature = "kafka")]
2769 Ok(MessageEvent::Kafka(_)) => {
2770 continue;
2772 }
2773 Err(broadcast::error::RecvError::Closed) => {
2774 return None;
2775 }
2776 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2777 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2778 continue;
2779 }
2780 }
2781 }
2782 }
2783 });
2784
2785 Sse::new(stream).keep_alive(
2786 axum::response::sse::KeepAlive::new()
2787 .interval(std::time::Duration::from_secs(15))
2788 .text("keep-alive-text"),
2789 )
2790}
2791
2792#[cfg(feature = "kafka")]
2793async fn kafka_messages_stream(
2795 State(state): State<ManagementState>,
2796 Query(params): Query<std::collections::HashMap<String, String>>,
2797) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2798 let mut rx = state.message_events.subscribe();
2799 let topic_filter = params.get("topic").cloned();
2800
2801 let stream = stream::unfold(rx, move |mut rx| {
2802 let topic_filter = topic_filter.clone();
2803
2804 async move {
2805 loop {
2806 match rx.recv().await {
2807 #[cfg(feature = "mqtt")]
2808 Ok(MessageEvent::Mqtt(_)) => {
2809 continue;
2811 }
2812 Ok(MessageEvent::Kafka(event)) => {
2813 if let Some(filter) = &topic_filter {
2815 if !event.topic.contains(filter) {
2816 continue;
2817 }
2818 }
2819
2820 let event_json = serde_json::json!({
2821 "protocol": "kafka",
2822 "topic": event.topic,
2823 "key": event.key,
2824 "value": event.value,
2825 "partition": event.partition,
2826 "offset": event.offset,
2827 "headers": event.headers,
2828 "timestamp": event.timestamp,
2829 });
2830
2831 if let Ok(event_data) = serde_json::to_string(&event_json) {
2832 let sse_event =
2833 Event::default().event("kafka_message").data(event_data);
2834 return Some((Ok(sse_event), rx));
2835 }
2836 }
2837 Err(broadcast::error::RecvError::Closed) => {
2838 return None;
2839 }
2840 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2841 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2842 continue;
2843 }
2844 }
2845 }
2846 }
2847 });
2848
2849 Sse::new(stream).keep_alive(
2850 axum::response::sse::KeepAlive::new()
2851 .interval(std::time::Duration::from_secs(15))
2852 .text("keep-alive-text"),
2853 )
2854}
2855
2856#[derive(Debug, Deserialize)]
2860pub struct GenerateSpecRequest {
2861 pub query: String,
2863 pub spec_type: String,
2865 pub api_version: Option<String>,
2867}
2868
2869#[derive(Debug, Deserialize)]
2871pub struct GenerateOpenApiFromTrafficRequest {
2872 #[serde(default)]
2874 pub database_path: Option<String>,
2875 #[serde(default)]
2877 pub since: Option<String>,
2878 #[serde(default)]
2880 pub until: Option<String>,
2881 #[serde(default)]
2883 pub path_pattern: Option<String>,
2884 #[serde(default = "default_min_confidence")]
2886 pub min_confidence: f64,
2887}
2888
2889fn default_min_confidence() -> f64 {
2890 0.7
2891}
2892
2893#[cfg(feature = "data-faker")]
2895async fn generate_ai_spec(
2896 State(_state): State<ManagementState>,
2897 Json(request): Json<GenerateSpecRequest>,
2898) -> impl IntoResponse {
2899 use mockforge_data::rag::{
2900 config::{EmbeddingProvider, LlmProvider, RagConfig},
2901 engine::RagEngine,
2902 storage::{DocumentStorage, StorageFactory},
2903 };
2904 use std::sync::Arc;
2905
2906 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2908 .ok()
2909 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2910
2911 if api_key.is_none() {
2913 return (
2914 StatusCode::SERVICE_UNAVAILABLE,
2915 Json(serde_json::json!({
2916 "error": "AI service not configured",
2917 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2918 })),
2919 )
2920 .into_response();
2921 }
2922
2923 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2925 .unwrap_or_else(|_| "openai".to_string())
2926 .to_lowercase();
2927
2928 let provider = match provider_str.as_str() {
2929 "openai" => LlmProvider::OpenAI,
2930 "anthropic" => LlmProvider::Anthropic,
2931 "ollama" => LlmProvider::Ollama,
2932 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2933 _ => LlmProvider::OpenAI,
2934 };
2935
2936 let api_endpoint =
2937 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2938 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2939 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2940 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2941 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2942 });
2943
2944 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2945 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2946 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2947 LlmProvider::Ollama => "llama2".to_string(),
2948 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2949 });
2950
2951 let mut rag_config = RagConfig::default();
2953 rag_config.provider = provider;
2954 rag_config.api_endpoint = api_endpoint;
2955 rag_config.api_key = api_key;
2956 rag_config.model = model;
2957 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2958 .unwrap_or_else(|_| "4096".to_string())
2959 .parse()
2960 .unwrap_or(4096);
2961 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2962 .unwrap_or_else(|_| "0.3".to_string())
2963 .parse()
2964 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2966 .unwrap_or_else(|_| "60".to_string())
2967 .parse()
2968 .unwrap_or(60);
2969 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2970 .unwrap_or_else(|_| "4000".to_string())
2971 .parse()
2972 .unwrap_or(4000);
2973
2974 let spec_type_label = match request.spec_type.as_str() {
2976 "openapi" => "OpenAPI 3.0",
2977 "graphql" => "GraphQL",
2978 "asyncapi" => "AsyncAPI",
2979 _ => "OpenAPI 3.0",
2980 };
2981
2982 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2983
2984 let prompt = format!(
2985 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2986
2987User Requirements:
2988{}
2989
2990Instructions:
29911. Generate a complete, valid {} specification
29922. Include all paths, operations, request/response schemas, and components
29933. Use realistic field names and data types
29944. Include proper descriptions and examples
29955. Follow {} best practices
29966. Return ONLY the specification, no additional explanation
29977. For OpenAPI, use version {}
2998
2999Return the specification in {} format."#,
3000 spec_type_label,
3001 request.query,
3002 spec_type_label,
3003 spec_type_label,
3004 api_version,
3005 if request.spec_type == "graphql" {
3006 "GraphQL SDL"
3007 } else {
3008 "YAML"
3009 }
3010 );
3011
3012 use mockforge_data::rag::storage::InMemoryStorage;
3017 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3018
3019 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3021 Ok(engine) => engine,
3022 Err(e) => {
3023 return (
3024 StatusCode::INTERNAL_SERVER_ERROR,
3025 Json(serde_json::json!({
3026 "error": "Failed to initialize RAG engine",
3027 "message": e.to_string()
3028 })),
3029 )
3030 .into_response();
3031 }
3032 };
3033
3034 match rag_engine.generate(&prompt, None).await {
3036 Ok(generated_text) => {
3037 let spec = if request.spec_type == "graphql" {
3039 extract_graphql_schema(&generated_text)
3041 } else {
3042 extract_yaml_spec(&generated_text)
3044 };
3045
3046 Json(serde_json::json!({
3047 "success": true,
3048 "spec": spec,
3049 "spec_type": request.spec_type,
3050 }))
3051 .into_response()
3052 }
3053 Err(e) => (
3054 StatusCode::INTERNAL_SERVER_ERROR,
3055 Json(serde_json::json!({
3056 "error": "AI generation failed",
3057 "message": e.to_string()
3058 })),
3059 )
3060 .into_response(),
3061 }
3062}
3063
3064#[cfg(not(feature = "data-faker"))]
3065async fn generate_ai_spec(
3066 State(_state): State<ManagementState>,
3067 Json(_request): Json<GenerateSpecRequest>,
3068) -> impl IntoResponse {
3069 (
3070 StatusCode::NOT_IMPLEMENTED,
3071 Json(serde_json::json!({
3072 "error": "AI features not enabled",
3073 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3074 })),
3075 )
3076 .into_response()
3077}
3078
3079#[cfg(feature = "behavioral-cloning")]
3081async fn generate_openapi_from_traffic(
3082 State(_state): State<ManagementState>,
3083 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3084) -> impl IntoResponse {
3085 use chrono::{DateTime, Utc};
3086 use mockforge_core::intelligent_behavior::{
3087 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3088 IntelligentBehaviorConfig,
3089 };
3090 use mockforge_recorder::{
3091 database::RecorderDatabase,
3092 openapi_export::{QueryFilters, RecordingsToOpenApi},
3093 };
3094 use std::path::PathBuf;
3095
3096 let db_path = if let Some(ref path) = request.database_path {
3098 PathBuf::from(path)
3099 } else {
3100 std::env::current_dir()
3101 .unwrap_or_else(|_| PathBuf::from("."))
3102 .join("recordings.db")
3103 };
3104
3105 let db = match RecorderDatabase::new(&db_path).await {
3107 Ok(db) => db,
3108 Err(e) => {
3109 return (
3110 StatusCode::BAD_REQUEST,
3111 Json(serde_json::json!({
3112 "error": "Database error",
3113 "message": format!("Failed to open recorder database: {}", e)
3114 })),
3115 )
3116 .into_response();
3117 }
3118 };
3119
3120 let since_dt = if let Some(ref since_str) = request.since {
3122 match DateTime::parse_from_rfc3339(since_str) {
3123 Ok(dt) => Some(dt.with_timezone(&Utc)),
3124 Err(e) => {
3125 return (
3126 StatusCode::BAD_REQUEST,
3127 Json(serde_json::json!({
3128 "error": "Invalid date format",
3129 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3130 })),
3131 )
3132 .into_response();
3133 }
3134 }
3135 } else {
3136 None
3137 };
3138
3139 let until_dt = if let Some(ref until_str) = request.until {
3140 match DateTime::parse_from_rfc3339(until_str) {
3141 Ok(dt) => Some(dt.with_timezone(&Utc)),
3142 Err(e) => {
3143 return (
3144 StatusCode::BAD_REQUEST,
3145 Json(serde_json::json!({
3146 "error": "Invalid date format",
3147 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3148 })),
3149 )
3150 .into_response();
3151 }
3152 }
3153 } else {
3154 None
3155 };
3156
3157 let query_filters = QueryFilters {
3159 since: since_dt,
3160 until: until_dt,
3161 path_pattern: request.path_pattern.clone(),
3162 min_status_code: None,
3163 max_requests: Some(1000),
3164 };
3165
3166 let exchanges_from_recorder =
3171 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3172 Ok(exchanges) => exchanges,
3173 Err(e) => {
3174 return (
3175 StatusCode::INTERNAL_SERVER_ERROR,
3176 Json(serde_json::json!({
3177 "error": "Query error",
3178 "message": format!("Failed to query HTTP exchanges: {}", e)
3179 })),
3180 )
3181 .into_response();
3182 }
3183 };
3184
3185 if exchanges_from_recorder.is_empty() {
3186 return (
3187 StatusCode::NOT_FOUND,
3188 Json(serde_json::json!({
3189 "error": "No exchanges found",
3190 "message": "No HTTP exchanges found matching the specified filters"
3191 })),
3192 )
3193 .into_response();
3194 }
3195
3196 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3198 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3199 .into_iter()
3200 .map(|e| LocalHttpExchange {
3201 method: e.method,
3202 path: e.path,
3203 query_params: e.query_params,
3204 headers: e.headers,
3205 body: e.body,
3206 body_encoding: e.body_encoding,
3207 status_code: e.status_code,
3208 response_headers: e.response_headers,
3209 response_body: e.response_body,
3210 response_body_encoding: e.response_body_encoding,
3211 timestamp: e.timestamp,
3212 })
3213 .collect();
3214
3215 let behavior_config = IntelligentBehaviorConfig::default();
3217 let gen_config = OpenApiGenerationConfig {
3218 min_confidence: request.min_confidence,
3219 behavior_model: Some(behavior_config.behavior_model),
3220 };
3221
3222 let generator = OpenApiSpecGenerator::new(gen_config);
3224 let result = match generator.generate_from_exchanges(exchanges).await {
3225 Ok(result) => result,
3226 Err(e) => {
3227 return (
3228 StatusCode::INTERNAL_SERVER_ERROR,
3229 Json(serde_json::json!({
3230 "error": "Generation error",
3231 "message": format!("Failed to generate OpenAPI spec: {}", e)
3232 })),
3233 )
3234 .into_response();
3235 }
3236 };
3237
3238 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3240 raw.clone()
3241 } else {
3242 match serde_json::to_value(&result.spec.spec) {
3243 Ok(json) => json,
3244 Err(e) => {
3245 return (
3246 StatusCode::INTERNAL_SERVER_ERROR,
3247 Json(serde_json::json!({
3248 "error": "Serialization error",
3249 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3250 })),
3251 )
3252 .into_response();
3253 }
3254 }
3255 };
3256
3257 let response = serde_json::json!({
3259 "spec": spec_json,
3260 "metadata": {
3261 "requests_analyzed": result.metadata.requests_analyzed,
3262 "paths_inferred": result.metadata.paths_inferred,
3263 "path_confidence": result.metadata.path_confidence,
3264 "generated_at": result.metadata.generated_at.to_rfc3339(),
3265 "duration_ms": result.metadata.duration_ms,
3266 }
3267 });
3268
3269 Json(response).into_response()
3270}
3271
3272async fn list_rule_explanations(
3274 State(state): State<ManagementState>,
3275 Query(params): Query<std::collections::HashMap<String, String>>,
3276) -> impl IntoResponse {
3277 use mockforge_core::intelligent_behavior::RuleType;
3278
3279 let explanations = state.rule_explanations.read().await;
3280 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3281
3282 if let Some(rule_type_str) = params.get("rule_type") {
3284 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3285 explanations_vec.retain(|e| e.rule_type == rule_type);
3286 }
3287 }
3288
3289 if let Some(min_confidence_str) = params.get("min_confidence") {
3291 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3292 explanations_vec.retain(|e| e.confidence >= min_confidence);
3293 }
3294 }
3295
3296 explanations_vec.sort_by(|a, b| {
3298 b.confidence
3299 .partial_cmp(&a.confidence)
3300 .unwrap_or(std::cmp::Ordering::Equal)
3301 .then_with(|| b.generated_at.cmp(&a.generated_at))
3302 });
3303
3304 Json(serde_json::json!({
3305 "explanations": explanations_vec,
3306 "total": explanations_vec.len(),
3307 }))
3308 .into_response()
3309}
3310
3311async fn get_rule_explanation(
3313 State(state): State<ManagementState>,
3314 Path(rule_id): Path<String>,
3315) -> impl IntoResponse {
3316 let explanations = state.rule_explanations.read().await;
3317
3318 match explanations.get(&rule_id) {
3319 Some(explanation) => Json(serde_json::json!({
3320 "explanation": explanation,
3321 }))
3322 .into_response(),
3323 None => (
3324 StatusCode::NOT_FOUND,
3325 Json(serde_json::json!({
3326 "error": "Rule explanation not found",
3327 "message": format!("No explanation found for rule ID: {}", rule_id)
3328 })),
3329 )
3330 .into_response(),
3331 }
3332}
3333
3334#[derive(Debug, Deserialize)]
3336pub struct LearnFromExamplesRequest {
3337 pub examples: Vec<ExamplePairRequest>,
3339 #[serde(default)]
3341 pub config: Option<serde_json::Value>,
3342}
3343
3344#[derive(Debug, Deserialize)]
3346pub struct ExamplePairRequest {
3347 pub request: serde_json::Value,
3349 pub response: serde_json::Value,
3351}
3352
3353async fn learn_from_examples(
3358 State(state): State<ManagementState>,
3359 Json(request): Json<LearnFromExamplesRequest>,
3360) -> impl IntoResponse {
3361 use mockforge_core::intelligent_behavior::{
3362 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3363 rule_generator::{ExamplePair, RuleGenerator},
3364 };
3365
3366 if request.examples.is_empty() {
3367 return (
3368 StatusCode::BAD_REQUEST,
3369 Json(serde_json::json!({
3370 "error": "No examples provided",
3371 "message": "At least one example pair is required"
3372 })),
3373 )
3374 .into_response();
3375 }
3376
3377 let example_pairs: Result<Vec<ExamplePair>, String> = request
3379 .examples
3380 .into_iter()
3381 .enumerate()
3382 .map(|(idx, ex)| {
3383 let method = ex
3385 .request
3386 .get("method")
3387 .and_then(|v| v.as_str())
3388 .map(|s| s.to_string())
3389 .unwrap_or_else(|| "GET".to_string());
3390 let path = ex
3391 .request
3392 .get("path")
3393 .and_then(|v| v.as_str())
3394 .map(|s| s.to_string())
3395 .unwrap_or_else(|| "/".to_string());
3396 let request_body = ex.request.get("body").cloned();
3397 let query_params = ex
3398 .request
3399 .get("query_params")
3400 .and_then(|v| v.as_object())
3401 .map(|obj| {
3402 obj.iter()
3403 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3404 .collect()
3405 })
3406 .unwrap_or_default();
3407 let headers = ex
3408 .request
3409 .get("headers")
3410 .and_then(|v| v.as_object())
3411 .map(|obj| {
3412 obj.iter()
3413 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3414 .collect()
3415 })
3416 .unwrap_or_default();
3417
3418 let status = ex
3420 .response
3421 .get("status_code")
3422 .or_else(|| ex.response.get("status"))
3423 .and_then(|v| v.as_u64())
3424 .map(|n| n as u16)
3425 .unwrap_or(200);
3426 let response_body = ex.response.get("body").cloned();
3427
3428 Ok(ExamplePair {
3429 method,
3430 path,
3431 request: request_body,
3432 status,
3433 response: response_body,
3434 query_params,
3435 headers,
3436 metadata: {
3437 let mut meta = std::collections::HashMap::new();
3438 meta.insert("source".to_string(), "api".to_string());
3439 meta.insert("example_index".to_string(), idx.to_string());
3440 meta
3441 },
3442 })
3443 })
3444 .collect();
3445
3446 let example_pairs = match example_pairs {
3447 Ok(pairs) => pairs,
3448 Err(e) => {
3449 return (
3450 StatusCode::BAD_REQUEST,
3451 Json(serde_json::json!({
3452 "error": "Invalid examples",
3453 "message": e
3454 })),
3455 )
3456 .into_response();
3457 }
3458 };
3459
3460 let behavior_config = if let Some(config_json) = request.config {
3462 serde_json::from_value(config_json)
3464 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3465 .behavior_model
3466 } else {
3467 BehaviorModelConfig::default()
3468 };
3469
3470 let generator = RuleGenerator::new(behavior_config);
3472
3473 let (rules, explanations) =
3475 match generator.generate_rules_with_explanations(example_pairs).await {
3476 Ok(result) => result,
3477 Err(e) => {
3478 return (
3479 StatusCode::INTERNAL_SERVER_ERROR,
3480 Json(serde_json::json!({
3481 "error": "Rule generation failed",
3482 "message": format!("Failed to generate rules: {}", e)
3483 })),
3484 )
3485 .into_response();
3486 }
3487 };
3488
3489 {
3491 let mut stored_explanations = state.rule_explanations.write().await;
3492 for explanation in &explanations {
3493 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3494 }
3495 }
3496
3497 let response = serde_json::json!({
3499 "success": true,
3500 "rules_generated": {
3501 "consistency_rules": rules.consistency_rules.len(),
3502 "schemas": rules.schemas.len(),
3503 "state_machines": rules.state_transitions.len(),
3504 "system_prompt": !rules.system_prompt.is_empty(),
3505 },
3506 "explanations": explanations.iter().map(|e| serde_json::json!({
3507 "rule_id": e.rule_id,
3508 "rule_type": e.rule_type,
3509 "confidence": e.confidence,
3510 "reasoning": e.reasoning,
3511 })).collect::<Vec<_>>(),
3512 "total_explanations": explanations.len(),
3513 });
3514
3515 Json(response).into_response()
3516}
3517
3518fn extract_yaml_spec(text: &str) -> String {
3519 if let Some(start) = text.find("```yaml") {
3521 let yaml_start = text[start + 7..].trim_start();
3522 if let Some(end) = yaml_start.find("```") {
3523 return yaml_start[..end].trim().to_string();
3524 }
3525 }
3526 if let Some(start) = text.find("```") {
3527 let content_start = text[start + 3..].trim_start();
3528 if let Some(end) = content_start.find("```") {
3529 return content_start[..end].trim().to_string();
3530 }
3531 }
3532
3533 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3535 return text.trim().to_string();
3536 }
3537
3538 text.trim().to_string()
3540}
3541
3542fn extract_graphql_schema(text: &str) -> String {
3544 if let Some(start) = text.find("```graphql") {
3546 let schema_start = text[start + 10..].trim_start();
3547 if let Some(end) = schema_start.find("```") {
3548 return schema_start[..end].trim().to_string();
3549 }
3550 }
3551 if let Some(start) = text.find("```") {
3552 let content_start = text[start + 3..].trim_start();
3553 if let Some(end) = content_start.find("```") {
3554 return content_start[..end].trim().to_string();
3555 }
3556 }
3557
3558 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3560 return text.trim().to_string();
3561 }
3562
3563 text.trim().to_string()
3564}
3565
3566async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3570 #[cfg(feature = "chaos")]
3571 {
3572 if let Some(chaos_state) = &state.chaos_api_state {
3573 let config = chaos_state.config.read().await;
3574 Json(serde_json::json!({
3576 "enabled": config.enabled,
3577 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3578 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3579 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3580 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3581 }))
3582 .into_response()
3583 } else {
3584 Json(serde_json::json!({
3586 "enabled": false,
3587 "latency": null,
3588 "fault_injection": null,
3589 "rate_limit": null,
3590 "traffic_shaping": null,
3591 }))
3592 .into_response()
3593 }
3594 }
3595 #[cfg(not(feature = "chaos"))]
3596 {
3597 Json(serde_json::json!({
3599 "enabled": false,
3600 "latency": null,
3601 "fault_injection": null,
3602 "rate_limit": null,
3603 "traffic_shaping": null,
3604 }))
3605 .into_response()
3606 }
3607}
3608
3609#[derive(Debug, Deserialize)]
3611pub struct ChaosConfigUpdate {
3612 pub enabled: Option<bool>,
3614 pub latency: Option<serde_json::Value>,
3616 pub fault_injection: Option<serde_json::Value>,
3618 pub rate_limit: Option<serde_json::Value>,
3620 pub traffic_shaping: Option<serde_json::Value>,
3622}
3623
3624async fn update_chaos_config(
3626 State(state): State<ManagementState>,
3627 Json(config_update): Json<ChaosConfigUpdate>,
3628) -> impl IntoResponse {
3629 #[cfg(feature = "chaos")]
3630 {
3631 if let Some(chaos_state) = &state.chaos_api_state {
3632 use mockforge_chaos::config::{
3633 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3634 TrafficShapingConfig,
3635 };
3636
3637 let mut config = chaos_state.config.write().await;
3638
3639 if let Some(enabled) = config_update.enabled {
3641 config.enabled = enabled;
3642 }
3643
3644 if let Some(latency_json) = config_update.latency {
3646 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3647 config.latency = Some(latency);
3648 }
3649 }
3650
3651 if let Some(fault_json) = config_update.fault_injection {
3653 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3654 config.fault_injection = Some(fault);
3655 }
3656 }
3657
3658 if let Some(rate_json) = config_update.rate_limit {
3660 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3661 config.rate_limit = Some(rate);
3662 }
3663 }
3664
3665 if let Some(traffic_json) = config_update.traffic_shaping {
3667 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3668 config.traffic_shaping = Some(traffic);
3669 }
3670 }
3671
3672 drop(config);
3675
3676 info!("Chaos configuration updated successfully");
3677 Json(serde_json::json!({
3678 "success": true,
3679 "message": "Chaos configuration updated and applied"
3680 }))
3681 .into_response()
3682 } else {
3683 (
3684 StatusCode::SERVICE_UNAVAILABLE,
3685 Json(serde_json::json!({
3686 "success": false,
3687 "error": "Chaos API not available",
3688 "message": "Chaos engineering is not enabled or configured"
3689 })),
3690 )
3691 .into_response()
3692 }
3693 }
3694 #[cfg(not(feature = "chaos"))]
3695 {
3696 (
3697 StatusCode::NOT_IMPLEMENTED,
3698 Json(serde_json::json!({
3699 "success": false,
3700 "error": "Chaos feature not enabled",
3701 "message": "Chaos engineering feature is not compiled into this build"
3702 })),
3703 )
3704 .into_response()
3705 }
3706}
3707
3708async fn list_network_profiles() -> impl IntoResponse {
3712 use mockforge_core::network_profiles::NetworkProfileCatalog;
3713
3714 let catalog = NetworkProfileCatalog::default();
3715 let profiles: Vec<serde_json::Value> = catalog
3716 .list_profiles_with_description()
3717 .iter()
3718 .map(|(name, description)| {
3719 serde_json::json!({
3720 "name": name,
3721 "description": description,
3722 })
3723 })
3724 .collect();
3725
3726 Json(serde_json::json!({
3727 "profiles": profiles
3728 }))
3729 .into_response()
3730}
3731
3732#[derive(Debug, Deserialize)]
3733pub struct ApplyNetworkProfileRequest {
3735 pub profile_name: String,
3737}
3738
3739async fn apply_network_profile(
3741 State(state): State<ManagementState>,
3742 Json(request): Json<ApplyNetworkProfileRequest>,
3743) -> impl IntoResponse {
3744 use mockforge_core::network_profiles::NetworkProfileCatalog;
3745
3746 let catalog = NetworkProfileCatalog::default();
3747 if let Some(profile) = catalog.get(&request.profile_name) {
3748 if let Some(server_config) = &state.server_config {
3751 let mut config = server_config.write().await;
3752
3753 use mockforge_core::config::NetworkShapingConfig;
3755
3756 let network_shaping = NetworkShapingConfig {
3760 enabled: profile.traffic_shaping.bandwidth.enabled
3761 || profile.traffic_shaping.burst_loss.enabled,
3762 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3764 max_connections: 1000, };
3766
3767 if let Some(ref mut chaos) = config.observability.chaos {
3770 chaos.traffic_shaping = Some(network_shaping);
3771 } else {
3772 use mockforge_core::config::ChaosEngConfig;
3774 config.observability.chaos = Some(ChaosEngConfig {
3775 enabled: true,
3776 latency: None,
3777 fault_injection: None,
3778 rate_limit: None,
3779 traffic_shaping: Some(network_shaping),
3780 scenario: None,
3781 });
3782 }
3783
3784 info!("Network profile '{}' applied to server configuration", request.profile_name);
3785 } else {
3786 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3787 }
3788
3789 #[cfg(feature = "chaos")]
3791 {
3792 if let Some(chaos_state) = &state.chaos_api_state {
3793 use mockforge_chaos::config::TrafficShapingConfig;
3794
3795 let mut chaos_config = chaos_state.config.write().await;
3796 let chaos_traffic_shaping = TrafficShapingConfig {
3798 enabled: profile.traffic_shaping.bandwidth.enabled
3799 || profile.traffic_shaping.burst_loss.enabled,
3800 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3802 max_connections: 0,
3803 connection_timeout_ms: 30000,
3804 };
3805 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3806 chaos_config.enabled = true; drop(chaos_config);
3808 info!("Network profile '{}' applied to chaos API state", request.profile_name);
3809 }
3810 }
3811
3812 Json(serde_json::json!({
3813 "success": true,
3814 "message": format!("Network profile '{}' applied", request.profile_name),
3815 "profile": {
3816 "name": profile.name,
3817 "description": profile.description,
3818 }
3819 }))
3820 .into_response()
3821 } else {
3822 (
3823 StatusCode::NOT_FOUND,
3824 Json(serde_json::json!({
3825 "error": "Profile not found",
3826 "message": format!("Network profile '{}' not found", request.profile_name)
3827 })),
3828 )
3829 .into_response()
3830 }
3831}
3832
3833pub fn management_router_with_ui_builder(
3835 state: ManagementState,
3836 server_config: mockforge_core::config::ServerConfig,
3837) -> Router {
3838 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3839
3840 let management = management_router(state);
3842
3843 let ui_builder_state = UIBuilderState::new(server_config);
3845 let ui_builder = create_ui_builder_router(ui_builder_state);
3846
3847 management.nest("/ui-builder", ui_builder)
3849}
3850
3851pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3853 use crate::spec_import::{spec_import_router, SpecImportState};
3854
3855 let management = management_router(state);
3857
3858 Router::new()
3860 .merge(management)
3861 .merge(spec_import_router(SpecImportState::new()))
3862}
3863
3864#[cfg(test)]
3865mod tests {
3866 use super::*;
3867
3868 #[tokio::test]
3869 async fn test_create_and_get_mock() {
3870 let state = ManagementState::new(None, None, 3000);
3871
3872 let mock = MockConfig {
3873 id: "test-1".to_string(),
3874 name: "Test Mock".to_string(),
3875 method: "GET".to_string(),
3876 path: "/test".to_string(),
3877 response: MockResponse {
3878 body: serde_json::json!({"message": "test"}),
3879 headers: None,
3880 },
3881 enabled: true,
3882 latency_ms: None,
3883 status_code: Some(200),
3884 request_match: None,
3885 priority: None,
3886 scenario: None,
3887 required_scenario_state: None,
3888 new_scenario_state: None,
3889 };
3890
3891 {
3893 let mut mocks = state.mocks.write().await;
3894 mocks.push(mock.clone());
3895 }
3896
3897 let mocks = state.mocks.read().await;
3899 let found = mocks.iter().find(|m| m.id == "test-1");
3900 assert!(found.is_some());
3901 assert_eq!(found.unwrap().name, "Test Mock");
3902 }
3903
3904 #[tokio::test]
3905 async fn test_server_stats() {
3906 let state = ManagementState::new(None, None, 3000);
3907
3908 {
3910 let mut mocks = state.mocks.write().await;
3911 mocks.push(MockConfig {
3912 id: "1".to_string(),
3913 name: "Mock 1".to_string(),
3914 method: "GET".to_string(),
3915 path: "/test1".to_string(),
3916 response: MockResponse {
3917 body: serde_json::json!({}),
3918 headers: None,
3919 },
3920 enabled: true,
3921 latency_ms: None,
3922 status_code: Some(200),
3923 request_match: None,
3924 priority: None,
3925 scenario: None,
3926 required_scenario_state: None,
3927 new_scenario_state: None,
3928 });
3929 mocks.push(MockConfig {
3930 id: "2".to_string(),
3931 name: "Mock 2".to_string(),
3932 method: "POST".to_string(),
3933 path: "/test2".to_string(),
3934 response: MockResponse {
3935 body: serde_json::json!({}),
3936 headers: None,
3937 },
3938 enabled: false,
3939 latency_ms: None,
3940 status_code: Some(201),
3941 request_match: None,
3942 priority: None,
3943 scenario: None,
3944 required_scenario_state: None,
3945 new_scenario_state: None,
3946 });
3947 }
3948
3949 let mocks = state.mocks.read().await;
3950 assert_eq!(mocks.len(), 2);
3951 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3952 }
3953}