1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33 #[cfg(feature = "mqtt")]
34 Mqtt(MqttMessageEvent),
36 #[cfg(feature = "kafka")]
37 Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45 pub topic: String,
47 pub payload: String,
49 pub qos: u8,
51 pub retain: bool,
53 pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60 pub topic: String,
61 pub key: Option<String>,
62 pub value: String,
63 pub partition: i32,
64 pub offset: i64,
65 pub headers: Option<std::collections::HashMap<String, String>>,
66 pub timestamp: String,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72 #[serde(skip_serializing_if = "String::is_empty")]
74 pub id: String,
75 pub name: String,
77 pub method: String,
79 pub path: String,
81 pub response: MockResponse,
83 #[serde(default = "default_true")]
85 pub enabled: bool,
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub latency_ms: Option<u64>,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub status_code: Option<u16>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub request_match: Option<RequestMatchCriteria>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub priority: Option<i32>,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub scenario: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub required_scenario_state: Option<String>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116 pub body: serde_json::Value,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128 pub headers: std::collections::HashMap<String, String>,
129 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131 pub query_params: std::collections::HashMap<String, String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub body_pattern: Option<String>,
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub json_path: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub xpath: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub custom_matcher: Option<String>,
144}
145
146pub fn mock_matches_request(
155 mock: &MockConfig,
156 method: &str,
157 path: &str,
158 headers: &std::collections::HashMap<String, String>,
159 query_params: &std::collections::HashMap<String, String>,
160 body: Option<&[u8]>,
161) -> bool {
162 use regex::Regex;
163
164 if !mock.enabled {
166 return false;
167 }
168
169 if mock.method.to_uppercase() != method.to_uppercase() {
171 return false;
172 }
173
174 if !path_matches_pattern(&mock.path, path) {
176 return false;
177 }
178
179 if let Some(criteria) = &mock.request_match {
181 for (key, expected_value) in &criteria.headers {
183 let header_key_lower = key.to_lowercase();
184 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186 if let Some((_, actual_value)) = found {
187 if let Ok(re) = Regex::new(expected_value) {
189 if !re.is_match(actual_value) {
190 return false;
191 }
192 } else if actual_value != expected_value {
193 return false;
194 }
195 } else {
196 return false; }
198 }
199
200 for (key, expected_value) in &criteria.query_params {
202 if let Some(actual_value) = query_params.get(key) {
203 if actual_value != expected_value {
204 return false;
205 }
206 } else {
207 return false; }
209 }
210
211 if let Some(pattern) = &criteria.body_pattern {
213 if let Some(body_bytes) = body {
214 let body_str = String::from_utf8_lossy(body_bytes);
215 if let Ok(re) = Regex::new(pattern) {
217 if !re.is_match(&body_str) {
218 return false;
219 }
220 } else if body_str.as_ref() != pattern {
221 return false;
222 }
223 } else {
224 return false; }
226 }
227
228 if let Some(json_path) = &criteria.json_path {
230 if let Some(body_bytes) = body {
231 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233 if !json_path_exists(&json_value, json_path) {
235 return false;
236 }
237 }
238 }
239 }
240 }
241
242 if let Some(_xpath) = &criteria.xpath {
244 tracing::warn!("XPath matching not yet fully implemented");
247 }
248
249 if let Some(custom) = &criteria.custom_matcher {
251 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252 return false;
253 }
254 }
255 }
256
257 true
258}
259
260fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262 if pattern == path {
264 return true;
265 }
266
267 if pattern == "*" {
269 return true;
270 }
271
272 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276 if pattern_parts.len() != path_parts.len() {
277 if pattern.contains('*') {
279 return matches_wildcard_pattern(pattern, path);
280 }
281 return false;
282 }
283
284 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287 continue; }
289
290 if pattern_part != path_part {
291 return false;
292 }
293 }
294
295 true
296}
297
298fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300 use regex::Regex;
301
302 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306 return re.is_match(path);
307 }
308
309 false
310}
311
312fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314 if let Some(path) = json_path.strip_prefix("$.") {
317 let parts: Vec<&str> = path.split('.').collect();
318
319 let mut current = json;
320 for part in parts {
321 if let Some(obj) = current.as_object() {
322 if let Some(value) = obj.get(part) {
323 current = value;
324 } else {
325 return false;
326 }
327 } else {
328 return false;
329 }
330 }
331 true
332 } else {
333 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
335 false
336 }
337}
338
339fn evaluate_custom_matcher(
341 expression: &str,
342 method: &str,
343 path: &str,
344 headers: &std::collections::HashMap<String, String>,
345 query_params: &std::collections::HashMap<String, String>,
346 body: Option<&[u8]>,
347) -> bool {
348 use regex::Regex;
349
350 let expr = expression.trim();
351
352 if expr.contains("==") {
354 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
355 if parts.len() != 2 {
356 return false;
357 }
358
359 let field = parts[0];
360 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
361
362 match field {
363 "method" => method == expected_value,
364 "path" => path == expected_value,
365 _ if field.starts_with("headers.") => {
366 let header_name = &field[8..];
367 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
368 }
369 _ if field.starts_with("query.") => {
370 let param_name = &field[6..];
371 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
372 }
373 _ => false,
374 }
375 }
376 else if expr.contains("=~") {
378 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
379 if parts.len() != 2 {
380 return false;
381 }
382
383 let field = parts[0];
384 let pattern = parts[1].trim_matches('"').trim_matches('\'');
385
386 if let Ok(re) = Regex::new(pattern) {
387 match field {
388 "method" => re.is_match(method),
389 "path" => re.is_match(path),
390 _ if field.starts_with("headers.") => {
391 let header_name = &field[8..];
392 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
393 }
394 _ if field.starts_with("query.") => {
395 let param_name = &field[6..];
396 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
397 }
398 _ => false,
399 }
400 } else {
401 false
402 }
403 }
404 else if expr.contains("contains") {
406 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
407 if parts.len() != 2 {
408 return false;
409 }
410
411 let field = parts[0];
412 let search_value = parts[1].trim_matches('"').trim_matches('\'');
413
414 match field {
415 "path" => path.contains(search_value),
416 _ if field.starts_with("headers.") => {
417 let header_name = &field[8..];
418 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
419 }
420 _ if field.starts_with("body") => {
421 if let Some(body_bytes) = body {
422 let body_str = String::from_utf8_lossy(body_bytes);
423 body_str.contains(search_value)
424 } else {
425 false
426 }
427 }
428 _ => false,
429 }
430 } else {
431 tracing::warn!("Unknown custom matcher expression format: {}", expr);
433 false
434 }
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ServerStats {
440 pub uptime_seconds: u64,
442 pub total_requests: u64,
444 pub active_mocks: usize,
446 pub enabled_mocks: usize,
448 pub registered_routes: usize,
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct ServerConfig {
455 pub version: String,
457 pub port: u16,
459 pub has_openapi_spec: bool,
461 #[serde(skip_serializing_if = "Option::is_none")]
463 pub spec_path: Option<String>,
464}
465
466#[derive(Clone)]
468pub struct ManagementState {
469 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
471 pub spec: Option<Arc<OpenApiSpec>>,
473 pub spec_path: Option<String>,
475 pub port: u16,
477 pub start_time: std::time::Instant,
479 pub request_counter: Arc<RwLock<u64>>,
481 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
483 #[cfg(feature = "smtp")]
485 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
486 #[cfg(feature = "mqtt")]
488 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
489 #[cfg(feature = "kafka")]
491 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
492 #[cfg(any(feature = "mqtt", feature = "kafka"))]
494 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
495 pub state_machine_manager:
497 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
498 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
500 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
502 pub rule_explanations: Arc<
504 RwLock<
505 std::collections::HashMap<
506 String,
507 mockforge_core::intelligent_behavior::RuleExplanation,
508 >,
509 >,
510 >,
511 #[cfg(feature = "chaos")]
513 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
514 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
516}
517
518impl ManagementState {
519 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
526 Self {
527 mocks: Arc::new(RwLock::new(Vec::new())),
528 spec,
529 spec_path,
530 port,
531 start_time: std::time::Instant::now(),
532 request_counter: Arc::new(RwLock::new(0)),
533 proxy_config: None,
534 #[cfg(feature = "smtp")]
535 smtp_registry: None,
536 #[cfg(feature = "mqtt")]
537 mqtt_broker: None,
538 #[cfg(feature = "kafka")]
539 kafka_broker: None,
540 #[cfg(any(feature = "mqtt", feature = "kafka"))]
541 message_events: {
542 let (tx, _) = broadcast::channel(1000);
543 Arc::new(tx)
544 },
545 state_machine_manager: Arc::new(RwLock::new(
546 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
547 )),
548 ws_broadcast: None,
549 lifecycle_hooks: None,
550 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
551 #[cfg(feature = "chaos")]
552 chaos_api_state: None,
553 server_config: None,
554 }
555 }
556
557 pub fn with_lifecycle_hooks(
559 mut self,
560 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
561 ) -> Self {
562 self.lifecycle_hooks = Some(hooks);
563 self
564 }
565
566 pub fn with_ws_broadcast(
568 mut self,
569 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
570 ) -> Self {
571 self.ws_broadcast = Some(ws_broadcast);
572 self
573 }
574
575 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
577 self.proxy_config = Some(proxy_config);
578 self
579 }
580
581 #[cfg(feature = "smtp")]
582 pub fn with_smtp_registry(
584 mut self,
585 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
586 ) -> Self {
587 self.smtp_registry = Some(smtp_registry);
588 self
589 }
590
591 #[cfg(feature = "mqtt")]
592 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
594 self.mqtt_broker = Some(mqtt_broker);
595 self
596 }
597
598 #[cfg(feature = "kafka")]
599 pub fn with_kafka_broker(
601 mut self,
602 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
603 ) -> Self {
604 self.kafka_broker = Some(kafka_broker);
605 self
606 }
607
608 #[cfg(feature = "chaos")]
609 pub fn with_chaos_api_state(
611 mut self,
612 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
613 ) -> Self {
614 self.chaos_api_state = Some(chaos_api_state);
615 self
616 }
617
618 pub fn with_server_config(
620 mut self,
621 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
622 ) -> Self {
623 self.server_config = Some(server_config);
624 self
625 }
626}
627
628async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
630 let mocks = state.mocks.read().await;
631 Json(serde_json::json!({
632 "mocks": *mocks,
633 "total": mocks.len(),
634 "enabled": mocks.iter().filter(|m| m.enabled).count()
635 }))
636}
637
638async fn get_mock(
640 State(state): State<ManagementState>,
641 Path(id): Path<String>,
642) -> Result<Json<MockConfig>, StatusCode> {
643 let mocks = state.mocks.read().await;
644 mocks
645 .iter()
646 .find(|m| m.id == id)
647 .cloned()
648 .map(Json)
649 .ok_or(StatusCode::NOT_FOUND)
650}
651
652async fn create_mock(
654 State(state): State<ManagementState>,
655 Json(mut mock): Json<MockConfig>,
656) -> Result<Json<MockConfig>, StatusCode> {
657 let mut mocks = state.mocks.write().await;
658
659 if mock.id.is_empty() {
661 mock.id = uuid::Uuid::new_v4().to_string();
662 }
663
664 if mocks.iter().any(|m| m.id == mock.id) {
666 return Err(StatusCode::CONFLICT);
667 }
668
669 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
670
671 if let Some(hooks) = &state.lifecycle_hooks {
673 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
674 id: mock.id.clone(),
675 name: mock.name.clone(),
676 config: serde_json::to_value(&mock).unwrap_or_default(),
677 };
678 hooks.invoke_mock_created(&event).await;
679 }
680
681 mocks.push(mock.clone());
682
683 if let Some(tx) = &state.ws_broadcast {
685 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
686 }
687
688 Ok(Json(mock))
689}
690
691async fn update_mock(
693 State(state): State<ManagementState>,
694 Path(id): Path<String>,
695 Json(updated_mock): Json<MockConfig>,
696) -> Result<Json<MockConfig>, StatusCode> {
697 let mut mocks = state.mocks.write().await;
698
699 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
700
701 let old_mock = mocks[position].clone();
703
704 info!("Updating mock: {}", id);
705 mocks[position] = updated_mock.clone();
706
707 if let Some(hooks) = &state.lifecycle_hooks {
709 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
710 id: updated_mock.id.clone(),
711 name: updated_mock.name.clone(),
712 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
713 };
714 hooks.invoke_mock_updated(&event).await;
715
716 if old_mock.enabled != updated_mock.enabled {
718 let state_event = if updated_mock.enabled {
719 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
720 id: updated_mock.id.clone(),
721 }
722 } else {
723 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
724 id: updated_mock.id.clone(),
725 }
726 };
727 hooks.invoke_mock_state_changed(&state_event).await;
728 }
729 }
730
731 if let Some(tx) = &state.ws_broadcast {
733 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
734 }
735
736 Ok(Json(updated_mock))
737}
738
739async fn delete_mock(
741 State(state): State<ManagementState>,
742 Path(id): Path<String>,
743) -> Result<StatusCode, StatusCode> {
744 let mut mocks = state.mocks.write().await;
745
746 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
747
748 let deleted_mock = mocks[position].clone();
750
751 info!("Deleting mock: {}", id);
752 mocks.remove(position);
753
754 if let Some(hooks) = &state.lifecycle_hooks {
756 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
757 id: deleted_mock.id.clone(),
758 name: deleted_mock.name.clone(),
759 };
760 hooks.invoke_mock_deleted(&event).await;
761 }
762
763 if let Some(tx) = &state.ws_broadcast {
765 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
766 }
767
768 Ok(StatusCode::NO_CONTENT)
769}
770
771#[derive(Debug, Deserialize)]
773pub struct ValidateConfigRequest {
774 pub config: serde_json::Value,
776 #[serde(default = "default_format")]
778 pub format: String,
779}
780
781fn default_format() -> String {
782 "json".to_string()
783}
784
785async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
787 use mockforge_core::config::ServerConfig;
788
789 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
790 "yaml" | "yml" => {
791 let yaml_str = match serde_json::to_string(&request.config) {
792 Ok(s) => s,
793 Err(e) => {
794 return (
795 StatusCode::BAD_REQUEST,
796 Json(serde_json::json!({
797 "valid": false,
798 "error": format!("Failed to convert to string: {}", e),
799 "message": "Configuration validation failed"
800 })),
801 )
802 .into_response();
803 }
804 };
805 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
806 }
807 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
808 };
809
810 match config_result {
811 Ok(_) => Json(serde_json::json!({
812 "valid": true,
813 "message": "Configuration is valid"
814 }))
815 .into_response(),
816 Err(e) => (
817 StatusCode::BAD_REQUEST,
818 Json(serde_json::json!({
819 "valid": false,
820 "error": format!("Invalid configuration: {}", e),
821 "message": "Configuration validation failed"
822 })),
823 )
824 .into_response(),
825 }
826}
827
828#[derive(Debug, Deserialize)]
830pub struct BulkConfigUpdateRequest {
831 pub updates: serde_json::Value,
833}
834
835async fn bulk_update_config(
843 State(state): State<ManagementState>,
844 Json(request): Json<BulkConfigUpdateRequest>,
845) -> impl IntoResponse {
846 if !request.updates.is_object() {
848 return (
849 StatusCode::BAD_REQUEST,
850 Json(serde_json::json!({
851 "error": "Invalid request",
852 "message": "Updates must be a JSON object"
853 })),
854 )
855 .into_response();
856 }
857
858 use mockforge_core::config::ServerConfig;
860
861 let base_config = ServerConfig::default();
863 let base_json = match serde_json::to_value(&base_config) {
864 Ok(v) => v,
865 Err(e) => {
866 return (
867 StatusCode::INTERNAL_SERVER_ERROR,
868 Json(serde_json::json!({
869 "error": "Internal error",
870 "message": format!("Failed to serialize base config: {}", e)
871 })),
872 )
873 .into_response();
874 }
875 };
876
877 let mut merged = base_json.clone();
879 if let (Some(merged_obj), Some(updates_obj)) =
880 (merged.as_object_mut(), request.updates.as_object())
881 {
882 for (key, value) in updates_obj {
883 merged_obj.insert(key.clone(), value.clone());
884 }
885 }
886
887 match serde_json::from_value::<ServerConfig>(merged) {
889 Ok(_) => {
890 Json(serde_json::json!({
897 "success": true,
898 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
899 "updates_received": request.updates,
900 "validated": true
901 }))
902 .into_response()
903 }
904 Err(e) => (
905 StatusCode::BAD_REQUEST,
906 Json(serde_json::json!({
907 "error": "Invalid configuration",
908 "message": format!("Configuration validation failed: {}", e),
909 "validated": false
910 })),
911 )
912 .into_response(),
913 }
914}
915
916async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
918 let mocks = state.mocks.read().await;
919 let request_count = *state.request_counter.read().await;
920
921 Json(ServerStats {
922 uptime_seconds: state.start_time.elapsed().as_secs(),
923 total_requests: request_count,
924 active_mocks: mocks.len(),
925 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
926 registered_routes: mocks.len(), })
928}
929
930async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
932 Json(ServerConfig {
933 version: env!("CARGO_PKG_VERSION").to_string(),
934 port: state.port,
935 has_openapi_spec: state.spec.is_some(),
936 spec_path: state.spec_path.clone(),
937 })
938}
939
940async fn health_check() -> Json<serde_json::Value> {
942 Json(serde_json::json!({
943 "status": "healthy",
944 "service": "mockforge-management",
945 "timestamp": chrono::Utc::now().to_rfc3339()
946 }))
947}
948
949#[derive(Debug, Clone, Serialize, Deserialize)]
951#[serde(rename_all = "lowercase")]
952pub enum ExportFormat {
953 Json,
955 Yaml,
957}
958
959async fn export_mocks(
961 State(state): State<ManagementState>,
962 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
963) -> Result<(StatusCode, String), StatusCode> {
964 let mocks = state.mocks.read().await;
965
966 let format = params
967 .get("format")
968 .map(|f| match f.as_str() {
969 "yaml" | "yml" => ExportFormat::Yaml,
970 _ => ExportFormat::Json,
971 })
972 .unwrap_or(ExportFormat::Json);
973
974 match format {
975 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
976 .map(|json| (StatusCode::OK, json))
977 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
979 .map(|yaml| (StatusCode::OK, yaml))
980 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
981 }
982}
983
984async fn import_mocks(
986 State(state): State<ManagementState>,
987 Json(mocks): Json<Vec<MockConfig>>,
988) -> impl IntoResponse {
989 let mut current_mocks = state.mocks.write().await;
990 current_mocks.clear();
991 current_mocks.extend(mocks);
992 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
993}
994
995#[cfg(feature = "smtp")]
996async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
998 if let Some(ref smtp_registry) = state.smtp_registry {
999 match smtp_registry.get_emails() {
1000 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1001 Err(e) => (
1002 StatusCode::INTERNAL_SERVER_ERROR,
1003 Json(serde_json::json!({
1004 "error": "Failed to retrieve emails",
1005 "message": e.to_string()
1006 })),
1007 ),
1008 }
1009 } else {
1010 (
1011 StatusCode::NOT_IMPLEMENTED,
1012 Json(serde_json::json!({
1013 "error": "SMTP mailbox management not available",
1014 "message": "SMTP server is not enabled or registry not available."
1015 })),
1016 )
1017 }
1018}
1019
1020#[cfg(feature = "smtp")]
1022async fn get_smtp_email(
1023 State(state): State<ManagementState>,
1024 Path(id): Path<String>,
1025) -> impl IntoResponse {
1026 if let Some(ref smtp_registry) = state.smtp_registry {
1027 match smtp_registry.get_email_by_id(&id) {
1028 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1029 Ok(None) => (
1030 StatusCode::NOT_FOUND,
1031 Json(serde_json::json!({
1032 "error": "Email not found",
1033 "id": id
1034 })),
1035 ),
1036 Err(e) => (
1037 StatusCode::INTERNAL_SERVER_ERROR,
1038 Json(serde_json::json!({
1039 "error": "Failed to retrieve email",
1040 "message": e.to_string()
1041 })),
1042 ),
1043 }
1044 } else {
1045 (
1046 StatusCode::NOT_IMPLEMENTED,
1047 Json(serde_json::json!({
1048 "error": "SMTP mailbox management not available",
1049 "message": "SMTP server is not enabled or registry not available."
1050 })),
1051 )
1052 }
1053}
1054
1055#[cfg(feature = "smtp")]
1057async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1058 if let Some(ref smtp_registry) = state.smtp_registry {
1059 match smtp_registry.clear_mailbox() {
1060 Ok(()) => (
1061 StatusCode::OK,
1062 Json(serde_json::json!({
1063 "message": "Mailbox cleared successfully"
1064 })),
1065 ),
1066 Err(e) => (
1067 StatusCode::INTERNAL_SERVER_ERROR,
1068 Json(serde_json::json!({
1069 "error": "Failed to clear mailbox",
1070 "message": e.to_string()
1071 })),
1072 ),
1073 }
1074 } else {
1075 (
1076 StatusCode::NOT_IMPLEMENTED,
1077 Json(serde_json::json!({
1078 "error": "SMTP mailbox management not available",
1079 "message": "SMTP server is not enabled or registry not available."
1080 })),
1081 )
1082 }
1083}
1084
1085#[cfg(feature = "smtp")]
1087async fn export_smtp_mailbox(
1088 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1089) -> impl IntoResponse {
1090 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1091 (
1092 StatusCode::NOT_IMPLEMENTED,
1093 Json(serde_json::json!({
1094 "error": "SMTP mailbox management not available via HTTP API",
1095 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1096 "requested_format": format
1097 })),
1098 )
1099}
1100
1101#[cfg(feature = "smtp")]
1103async fn search_smtp_emails(
1104 State(state): State<ManagementState>,
1105 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1106) -> impl IntoResponse {
1107 if let Some(ref smtp_registry) = state.smtp_registry {
1108 let filters = EmailSearchFilters {
1109 sender: params.get("sender").cloned(),
1110 recipient: params.get("recipient").cloned(),
1111 subject: params.get("subject").cloned(),
1112 body: params.get("body").cloned(),
1113 since: params
1114 .get("since")
1115 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1116 .map(|dt| dt.with_timezone(&chrono::Utc)),
1117 until: params
1118 .get("until")
1119 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1120 .map(|dt| dt.with_timezone(&chrono::Utc)),
1121 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1122 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1123 };
1124
1125 match smtp_registry.search_emails(filters) {
1126 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1127 Err(e) => (
1128 StatusCode::INTERNAL_SERVER_ERROR,
1129 Json(serde_json::json!({
1130 "error": "Failed to search emails",
1131 "message": e.to_string()
1132 })),
1133 ),
1134 }
1135 } else {
1136 (
1137 StatusCode::NOT_IMPLEMENTED,
1138 Json(serde_json::json!({
1139 "error": "SMTP mailbox management not available",
1140 "message": "SMTP server is not enabled or registry not available."
1141 })),
1142 )
1143 }
1144}
1145
1146#[cfg(feature = "mqtt")]
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1149pub struct MqttBrokerStats {
1150 pub connected_clients: usize,
1152 pub active_topics: usize,
1154 pub retained_messages: usize,
1156 pub total_subscriptions: usize,
1158}
1159
1160#[cfg(feature = "mqtt")]
1162async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1163 if let Some(broker) = &state.mqtt_broker {
1164 let connected_clients = broker.get_connected_clients().await.len();
1165 let active_topics = broker.get_active_topics().await.len();
1166 let stats = broker.get_topic_stats().await;
1167
1168 let broker_stats = MqttBrokerStats {
1169 connected_clients,
1170 active_topics,
1171 retained_messages: stats.retained_messages,
1172 total_subscriptions: stats.total_subscriptions,
1173 };
1174
1175 Json(broker_stats).into_response()
1176 } else {
1177 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1178 }
1179}
1180
1181#[cfg(feature = "mqtt")]
1182async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1183 if let Some(broker) = &state.mqtt_broker {
1184 let clients = broker.get_connected_clients().await;
1185 Json(serde_json::json!({
1186 "clients": clients
1187 }))
1188 .into_response()
1189 } else {
1190 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191 }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1196 if let Some(broker) = &state.mqtt_broker {
1197 let topics = broker.get_active_topics().await;
1198 Json(serde_json::json!({
1199 "topics": topics
1200 }))
1201 .into_response()
1202 } else {
1203 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204 }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn disconnect_mqtt_client(
1209 State(state): State<ManagementState>,
1210 Path(client_id): Path<String>,
1211) -> impl IntoResponse {
1212 if let Some(broker) = &state.mqtt_broker {
1213 match broker.disconnect_client(&client_id).await {
1214 Ok(_) => {
1215 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1216 }
1217 Err(e) => {
1218 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1219 .into_response()
1220 }
1221 }
1222 } else {
1223 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1224 }
1225}
1226
1227#[cfg(feature = "mqtt")]
1230#[derive(Debug, Deserialize)]
1232pub struct MqttPublishRequest {
1233 pub topic: String,
1235 pub payload: String,
1237 #[serde(default = "default_qos")]
1239 pub qos: u8,
1240 #[serde(default)]
1242 pub retain: bool,
1243}
1244
1245#[cfg(feature = "mqtt")]
1246fn default_qos() -> u8 {
1247 0
1248}
1249
1250#[cfg(feature = "mqtt")]
1251async fn publish_mqtt_message_handler(
1253 State(state): State<ManagementState>,
1254 Json(request): Json<serde_json::Value>,
1255) -> impl IntoResponse {
1256 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1258 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1259 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1260 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1261
1262 if topic.is_none() || payload.is_none() {
1263 return (
1264 StatusCode::BAD_REQUEST,
1265 Json(serde_json::json!({
1266 "error": "Invalid request",
1267 "message": "Missing required fields: topic and payload"
1268 })),
1269 );
1270 }
1271
1272 let topic = topic.unwrap();
1273 let payload = payload.unwrap();
1274
1275 if let Some(broker) = &state.mqtt_broker {
1276 if qos > 2 {
1278 return (
1279 StatusCode::BAD_REQUEST,
1280 Json(serde_json::json!({
1281 "error": "Invalid QoS",
1282 "message": "QoS must be 0, 1, or 2"
1283 })),
1284 );
1285 }
1286
1287 let payload_bytes = payload.as_bytes().to_vec();
1289 let client_id = "mockforge-management-api".to_string();
1290
1291 let publish_result = broker
1292 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1293 .await
1294 .map_err(|e| format!("{}", e));
1295
1296 match publish_result {
1297 Ok(_) => {
1298 let event = MessageEvent::Mqtt(MqttMessageEvent {
1300 topic: topic.clone(),
1301 payload: payload.clone(),
1302 qos,
1303 retain,
1304 timestamp: chrono::Utc::now().to_rfc3339(),
1305 });
1306 let _ = state.message_events.send(event);
1307
1308 (
1309 StatusCode::OK,
1310 Json(serde_json::json!({
1311 "success": true,
1312 "message": format!("Message published to topic '{}'", topic),
1313 "topic": topic,
1314 "qos": qos,
1315 "retain": retain
1316 })),
1317 )
1318 }
1319 Err(error_msg) => (
1320 StatusCode::INTERNAL_SERVER_ERROR,
1321 Json(serde_json::json!({
1322 "error": "Failed to publish message",
1323 "message": error_msg
1324 })),
1325 ),
1326 }
1327 } else {
1328 (
1329 StatusCode::SERVICE_UNAVAILABLE,
1330 Json(serde_json::json!({
1331 "error": "MQTT broker not available",
1332 "message": "MQTT broker is not enabled or not available."
1333 })),
1334 )
1335 }
1336}
1337
1338#[cfg(not(feature = "mqtt"))]
1339async fn publish_mqtt_message_handler(
1341 State(_state): State<ManagementState>,
1342 Json(_request): Json<serde_json::Value>,
1343) -> impl IntoResponse {
1344 (
1345 StatusCode::SERVICE_UNAVAILABLE,
1346 Json(serde_json::json!({
1347 "error": "MQTT feature not enabled",
1348 "message": "MQTT support is not compiled into this build"
1349 })),
1350 )
1351}
1352
1353#[cfg(feature = "mqtt")]
1354#[derive(Debug, Deserialize)]
1356pub struct MqttBatchPublishRequest {
1357 pub messages: Vec<MqttPublishRequest>,
1359 #[serde(default = "default_delay")]
1361 pub delay_ms: u64,
1362}
1363
1364#[cfg(feature = "mqtt")]
1365fn default_delay() -> u64 {
1366 100
1367}
1368
1369#[cfg(feature = "mqtt")]
1370async fn publish_mqtt_batch_handler(
1372 State(state): State<ManagementState>,
1373 Json(request): Json<serde_json::Value>,
1374) -> impl IntoResponse {
1375 let messages_json = request.get("messages").and_then(|v| v.as_array());
1377 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1378
1379 if messages_json.is_none() {
1380 return (
1381 StatusCode::BAD_REQUEST,
1382 Json(serde_json::json!({
1383 "error": "Invalid request",
1384 "message": "Missing required field: messages"
1385 })),
1386 );
1387 }
1388
1389 let messages_json = messages_json.unwrap();
1390
1391 if let Some(broker) = &state.mqtt_broker {
1392 if messages_json.is_empty() {
1393 return (
1394 StatusCode::BAD_REQUEST,
1395 Json(serde_json::json!({
1396 "error": "Empty batch",
1397 "message": "At least one message is required"
1398 })),
1399 );
1400 }
1401
1402 let mut results = Vec::new();
1403 let client_id = "mockforge-management-api".to_string();
1404
1405 for (index, msg_json) in messages_json.iter().enumerate() {
1406 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1407 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1408 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1409 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1410
1411 if topic.is_none() || payload.is_none() {
1412 results.push(serde_json::json!({
1413 "index": index,
1414 "success": false,
1415 "error": "Missing required fields: topic and payload"
1416 }));
1417 continue;
1418 }
1419
1420 let topic = topic.unwrap();
1421 let payload = payload.unwrap();
1422
1423 if qos > 2 {
1425 results.push(serde_json::json!({
1426 "index": index,
1427 "success": false,
1428 "error": "Invalid QoS (must be 0, 1, or 2)"
1429 }));
1430 continue;
1431 }
1432
1433 let payload_bytes = payload.as_bytes().to_vec();
1435
1436 let publish_result = broker
1437 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1438 .await
1439 .map_err(|e| format!("{}", e));
1440
1441 match publish_result {
1442 Ok(_) => {
1443 let event = MessageEvent::Mqtt(MqttMessageEvent {
1445 topic: topic.clone(),
1446 payload: payload.clone(),
1447 qos,
1448 retain,
1449 timestamp: chrono::Utc::now().to_rfc3339(),
1450 });
1451 let _ = state.message_events.send(event);
1452
1453 results.push(serde_json::json!({
1454 "index": index,
1455 "success": true,
1456 "topic": topic,
1457 "qos": qos
1458 }));
1459 }
1460 Err(error_msg) => {
1461 results.push(serde_json::json!({
1462 "index": index,
1463 "success": false,
1464 "error": error_msg
1465 }));
1466 }
1467 }
1468
1469 if index < messages_json.len() - 1 && delay_ms > 0 {
1471 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1472 }
1473 }
1474
1475 let success_count =
1476 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1477
1478 (
1479 StatusCode::OK,
1480 Json(serde_json::json!({
1481 "success": true,
1482 "total": messages_json.len(),
1483 "succeeded": success_count,
1484 "failed": messages_json.len() - success_count,
1485 "results": results
1486 })),
1487 )
1488 } else {
1489 (
1490 StatusCode::SERVICE_UNAVAILABLE,
1491 Json(serde_json::json!({
1492 "error": "MQTT broker not available",
1493 "message": "MQTT broker is not enabled or not available."
1494 })),
1495 )
1496 }
1497}
1498
1499#[cfg(not(feature = "mqtt"))]
1500async fn publish_mqtt_batch_handler(
1502 State(_state): State<ManagementState>,
1503 Json(_request): Json<serde_json::Value>,
1504) -> impl IntoResponse {
1505 (
1506 StatusCode::SERVICE_UNAVAILABLE,
1507 Json(serde_json::json!({
1508 "error": "MQTT feature not enabled",
1509 "message": "MQTT support is not compiled into this build"
1510 })),
1511 )
1512}
1513
1514#[derive(Debug, Deserialize)]
1518struct SetMigrationModeRequest {
1519 mode: String,
1520}
1521
1522async fn get_migration_routes(
1524 State(state): State<ManagementState>,
1525) -> Result<Json<serde_json::Value>, StatusCode> {
1526 let proxy_config = match &state.proxy_config {
1527 Some(config) => config,
1528 None => {
1529 return Ok(Json(serde_json::json!({
1530 "error": "Migration not configured. Proxy config not available."
1531 })));
1532 }
1533 };
1534
1535 let config = proxy_config.read().await;
1536 let routes = config.get_migration_routes();
1537
1538 Ok(Json(serde_json::json!({
1539 "routes": routes
1540 })))
1541}
1542
1543async fn toggle_route_migration(
1545 State(state): State<ManagementState>,
1546 Path(pattern): Path<String>,
1547) -> Result<Json<serde_json::Value>, StatusCode> {
1548 let proxy_config = match &state.proxy_config {
1549 Some(config) => config,
1550 None => {
1551 return Ok(Json(serde_json::json!({
1552 "error": "Migration not configured. Proxy config not available."
1553 })));
1554 }
1555 };
1556
1557 let mut config = proxy_config.write().await;
1558 let new_mode = match config.toggle_route_migration(&pattern) {
1559 Some(mode) => mode,
1560 None => {
1561 return Ok(Json(serde_json::json!({
1562 "error": format!("Route pattern not found: {}", pattern)
1563 })));
1564 }
1565 };
1566
1567 Ok(Json(serde_json::json!({
1568 "pattern": pattern,
1569 "mode": format!("{:?}", new_mode).to_lowercase()
1570 })))
1571}
1572
1573async fn set_route_migration_mode(
1575 State(state): State<ManagementState>,
1576 Path(pattern): Path<String>,
1577 Json(request): Json<SetMigrationModeRequest>,
1578) -> Result<Json<serde_json::Value>, StatusCode> {
1579 let proxy_config = match &state.proxy_config {
1580 Some(config) => config,
1581 None => {
1582 return Ok(Json(serde_json::json!({
1583 "error": "Migration not configured. Proxy config not available."
1584 })));
1585 }
1586 };
1587
1588 use mockforge_core::proxy::config::MigrationMode;
1589 let mode = match request.mode.to_lowercase().as_str() {
1590 "mock" => MigrationMode::Mock,
1591 "shadow" => MigrationMode::Shadow,
1592 "real" => MigrationMode::Real,
1593 "auto" => MigrationMode::Auto,
1594 _ => {
1595 return Ok(Json(serde_json::json!({
1596 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1597 })));
1598 }
1599 };
1600
1601 let mut config = proxy_config.write().await;
1602 let updated = config.update_rule_migration_mode(&pattern, mode);
1603
1604 if !updated {
1605 return Ok(Json(serde_json::json!({
1606 "error": format!("Route pattern not found: {}", pattern)
1607 })));
1608 }
1609
1610 Ok(Json(serde_json::json!({
1611 "pattern": pattern,
1612 "mode": format!("{:?}", mode).to_lowercase()
1613 })))
1614}
1615
1616async fn toggle_group_migration(
1618 State(state): State<ManagementState>,
1619 Path(group): Path<String>,
1620) -> Result<Json<serde_json::Value>, StatusCode> {
1621 let proxy_config = match &state.proxy_config {
1622 Some(config) => config,
1623 None => {
1624 return Ok(Json(serde_json::json!({
1625 "error": "Migration not configured. Proxy config not available."
1626 })));
1627 }
1628 };
1629
1630 let mut config = proxy_config.write().await;
1631 let new_mode = config.toggle_group_migration(&group);
1632
1633 Ok(Json(serde_json::json!({
1634 "group": group,
1635 "mode": format!("{:?}", new_mode).to_lowercase()
1636 })))
1637}
1638
1639async fn set_group_migration_mode(
1641 State(state): State<ManagementState>,
1642 Path(group): Path<String>,
1643 Json(request): Json<SetMigrationModeRequest>,
1644) -> Result<Json<serde_json::Value>, StatusCode> {
1645 let proxy_config = match &state.proxy_config {
1646 Some(config) => config,
1647 None => {
1648 return Ok(Json(serde_json::json!({
1649 "error": "Migration not configured. Proxy config not available."
1650 })));
1651 }
1652 };
1653
1654 use mockforge_core::proxy::config::MigrationMode;
1655 let mode = match request.mode.to_lowercase().as_str() {
1656 "mock" => MigrationMode::Mock,
1657 "shadow" => MigrationMode::Shadow,
1658 "real" => MigrationMode::Real,
1659 "auto" => MigrationMode::Auto,
1660 _ => {
1661 return Ok(Json(serde_json::json!({
1662 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1663 })));
1664 }
1665 };
1666
1667 let mut config = proxy_config.write().await;
1668 config.update_group_migration_mode(&group, mode);
1669
1670 Ok(Json(serde_json::json!({
1671 "group": group,
1672 "mode": format!("{:?}", mode).to_lowercase()
1673 })))
1674}
1675
1676async fn get_migration_groups(
1678 State(state): State<ManagementState>,
1679) -> Result<Json<serde_json::Value>, StatusCode> {
1680 let proxy_config = match &state.proxy_config {
1681 Some(config) => config,
1682 None => {
1683 return Ok(Json(serde_json::json!({
1684 "error": "Migration not configured. Proxy config not available."
1685 })));
1686 }
1687 };
1688
1689 let config = proxy_config.read().await;
1690 let groups = config.get_migration_groups();
1691
1692 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1694 .into_iter()
1695 .map(|(name, info)| {
1696 (
1697 name,
1698 serde_json::json!({
1699 "name": info.name,
1700 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1701 "route_count": info.route_count
1702 }),
1703 )
1704 })
1705 .collect();
1706
1707 Ok(Json(serde_json::json!(groups_json)))
1708}
1709
1710async fn get_migration_status(
1712 State(state): State<ManagementState>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714 let proxy_config = match &state.proxy_config {
1715 Some(config) => config,
1716 None => {
1717 return Ok(Json(serde_json::json!({
1718 "error": "Migration not configured. Proxy config not available."
1719 })));
1720 }
1721 };
1722
1723 let config = proxy_config.read().await;
1724 let routes = config.get_migration_routes();
1725 let groups = config.get_migration_groups();
1726
1727 let mut mock_count = 0;
1728 let mut shadow_count = 0;
1729 let mut real_count = 0;
1730 let mut auto_count = 0;
1731
1732 for route in &routes {
1733 match route.migration_mode {
1734 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1735 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1736 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1737 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1738 }
1739 }
1740
1741 Ok(Json(serde_json::json!({
1742 "total_routes": routes.len(),
1743 "mock_routes": mock_count,
1744 "shadow_routes": shadow_count,
1745 "real_routes": real_count,
1746 "auto_routes": auto_count,
1747 "total_groups": groups.len(),
1748 "migration_enabled": config.migration_enabled
1749 })))
1750}
1751
1752#[derive(Debug, Deserialize, Serialize)]
1756pub struct ProxyRuleRequest {
1757 pub pattern: String,
1759 #[serde(rename = "type")]
1761 pub rule_type: String,
1762 #[serde(default)]
1764 pub status_codes: Vec<u16>,
1765 pub body_transforms: Vec<BodyTransformRequest>,
1767 #[serde(default = "default_true")]
1769 pub enabled: bool,
1770}
1771
1772#[derive(Debug, Deserialize, Serialize)]
1774pub struct BodyTransformRequest {
1775 pub path: String,
1777 pub replace: String,
1779 #[serde(default)]
1781 pub operation: String,
1782}
1783
1784#[derive(Debug, Serialize)]
1786pub struct ProxyRuleResponse {
1787 pub id: usize,
1789 pub pattern: String,
1791 #[serde(rename = "type")]
1793 pub rule_type: String,
1794 pub status_codes: Vec<u16>,
1796 pub body_transforms: Vec<BodyTransformRequest>,
1798 pub enabled: bool,
1800}
1801
1802async fn list_proxy_rules(
1804 State(state): State<ManagementState>,
1805) -> Result<Json<serde_json::Value>, StatusCode> {
1806 let proxy_config = match &state.proxy_config {
1807 Some(config) => config,
1808 None => {
1809 return Ok(Json(serde_json::json!({
1810 "error": "Proxy not configured. Proxy config not available."
1811 })));
1812 }
1813 };
1814
1815 let config = proxy_config.read().await;
1816
1817 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1818
1819 for (idx, rule) in config.request_replacements.iter().enumerate() {
1821 rules.push(ProxyRuleResponse {
1822 id: idx,
1823 pattern: rule.pattern.clone(),
1824 rule_type: "request".to_string(),
1825 status_codes: Vec::new(),
1826 body_transforms: rule
1827 .body_transforms
1828 .iter()
1829 .map(|t| BodyTransformRequest {
1830 path: t.path.clone(),
1831 replace: t.replace.clone(),
1832 operation: format!("{:?}", t.operation).to_lowercase(),
1833 })
1834 .collect(),
1835 enabled: rule.enabled,
1836 });
1837 }
1838
1839 let request_count = config.request_replacements.len();
1841 for (idx, rule) in config.response_replacements.iter().enumerate() {
1842 rules.push(ProxyRuleResponse {
1843 id: request_count + idx,
1844 pattern: rule.pattern.clone(),
1845 rule_type: "response".to_string(),
1846 status_codes: rule.status_codes.clone(),
1847 body_transforms: rule
1848 .body_transforms
1849 .iter()
1850 .map(|t| BodyTransformRequest {
1851 path: t.path.clone(),
1852 replace: t.replace.clone(),
1853 operation: format!("{:?}", t.operation).to_lowercase(),
1854 })
1855 .collect(),
1856 enabled: rule.enabled,
1857 });
1858 }
1859
1860 Ok(Json(serde_json::json!({
1861 "rules": rules
1862 })))
1863}
1864
1865async fn create_proxy_rule(
1867 State(state): State<ManagementState>,
1868 Json(request): Json<ProxyRuleRequest>,
1869) -> Result<Json<serde_json::Value>, StatusCode> {
1870 let proxy_config = match &state.proxy_config {
1871 Some(config) => config,
1872 None => {
1873 return Ok(Json(serde_json::json!({
1874 "error": "Proxy not configured. Proxy config not available."
1875 })));
1876 }
1877 };
1878
1879 if request.body_transforms.is_empty() {
1881 return Ok(Json(serde_json::json!({
1882 "error": "At least one body transform is required"
1883 })));
1884 }
1885
1886 let body_transforms: Vec<BodyTransform> = request
1887 .body_transforms
1888 .iter()
1889 .map(|t| {
1890 let op = match t.operation.as_str() {
1891 "replace" => TransformOperation::Replace,
1892 "add" => TransformOperation::Add,
1893 "remove" => TransformOperation::Remove,
1894 _ => TransformOperation::Replace,
1895 };
1896 BodyTransform {
1897 path: t.path.clone(),
1898 replace: t.replace.clone(),
1899 operation: op,
1900 }
1901 })
1902 .collect();
1903
1904 let new_rule = BodyTransformRule {
1905 pattern: request.pattern.clone(),
1906 status_codes: request.status_codes.clone(),
1907 body_transforms,
1908 enabled: request.enabled,
1909 };
1910
1911 let mut config = proxy_config.write().await;
1912
1913 let rule_id = if request.rule_type == "request" {
1914 config.request_replacements.push(new_rule);
1915 config.request_replacements.len() - 1
1916 } else if request.rule_type == "response" {
1917 config.response_replacements.push(new_rule);
1918 config.request_replacements.len() + config.response_replacements.len() - 1
1919 } else {
1920 return Ok(Json(serde_json::json!({
1921 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1922 })));
1923 };
1924
1925 Ok(Json(serde_json::json!({
1926 "id": rule_id,
1927 "message": "Rule created successfully"
1928 })))
1929}
1930
1931async fn get_proxy_rule(
1933 State(state): State<ManagementState>,
1934 Path(id): Path<String>,
1935) -> Result<Json<serde_json::Value>, StatusCode> {
1936 let proxy_config = match &state.proxy_config {
1937 Some(config) => config,
1938 None => {
1939 return Ok(Json(serde_json::json!({
1940 "error": "Proxy not configured. Proxy config not available."
1941 })));
1942 }
1943 };
1944
1945 let config = proxy_config.read().await;
1946 let rule_id: usize = match id.parse() {
1947 Ok(id) => id,
1948 Err(_) => {
1949 return Ok(Json(serde_json::json!({
1950 "error": format!("Invalid rule ID: {}", id)
1951 })));
1952 }
1953 };
1954
1955 let request_count = config.request_replacements.len();
1956
1957 if rule_id < request_count {
1958 let rule = &config.request_replacements[rule_id];
1960 Ok(Json(serde_json::json!({
1961 "id": rule_id,
1962 "pattern": rule.pattern,
1963 "type": "request",
1964 "status_codes": [],
1965 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1966 "path": t.path,
1967 "replace": t.replace,
1968 "operation": format!("{:?}", t.operation).to_lowercase()
1969 })).collect::<Vec<_>>(),
1970 "enabled": rule.enabled
1971 })))
1972 } else if rule_id < request_count + config.response_replacements.len() {
1973 let response_idx = rule_id - request_count;
1975 let rule = &config.response_replacements[response_idx];
1976 Ok(Json(serde_json::json!({
1977 "id": rule_id,
1978 "pattern": rule.pattern,
1979 "type": "response",
1980 "status_codes": rule.status_codes,
1981 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1982 "path": t.path,
1983 "replace": t.replace,
1984 "operation": format!("{:?}", t.operation).to_lowercase()
1985 })).collect::<Vec<_>>(),
1986 "enabled": rule.enabled
1987 })))
1988 } else {
1989 Ok(Json(serde_json::json!({
1990 "error": format!("Rule ID {} not found", rule_id)
1991 })))
1992 }
1993}
1994
1995async fn update_proxy_rule(
1997 State(state): State<ManagementState>,
1998 Path(id): Path<String>,
1999 Json(request): Json<ProxyRuleRequest>,
2000) -> Result<Json<serde_json::Value>, StatusCode> {
2001 let proxy_config = match &state.proxy_config {
2002 Some(config) => config,
2003 None => {
2004 return Ok(Json(serde_json::json!({
2005 "error": "Proxy not configured. Proxy config not available."
2006 })));
2007 }
2008 };
2009
2010 let mut config = proxy_config.write().await;
2011 let rule_id: usize = match id.parse() {
2012 Ok(id) => id,
2013 Err(_) => {
2014 return Ok(Json(serde_json::json!({
2015 "error": format!("Invalid rule ID: {}", id)
2016 })));
2017 }
2018 };
2019
2020 let body_transforms: Vec<BodyTransform> = request
2021 .body_transforms
2022 .iter()
2023 .map(|t| {
2024 let op = match t.operation.as_str() {
2025 "replace" => TransformOperation::Replace,
2026 "add" => TransformOperation::Add,
2027 "remove" => TransformOperation::Remove,
2028 _ => TransformOperation::Replace,
2029 };
2030 BodyTransform {
2031 path: t.path.clone(),
2032 replace: t.replace.clone(),
2033 operation: op,
2034 }
2035 })
2036 .collect();
2037
2038 let updated_rule = BodyTransformRule {
2039 pattern: request.pattern.clone(),
2040 status_codes: request.status_codes.clone(),
2041 body_transforms,
2042 enabled: request.enabled,
2043 };
2044
2045 let request_count = config.request_replacements.len();
2046
2047 if rule_id < request_count {
2048 config.request_replacements[rule_id] = updated_rule;
2050 } else if rule_id < request_count + config.response_replacements.len() {
2051 let response_idx = rule_id - request_count;
2053 config.response_replacements[response_idx] = updated_rule;
2054 } else {
2055 return Ok(Json(serde_json::json!({
2056 "error": format!("Rule ID {} not found", rule_id)
2057 })));
2058 }
2059
2060 Ok(Json(serde_json::json!({
2061 "id": rule_id,
2062 "message": "Rule updated successfully"
2063 })))
2064}
2065
2066async fn delete_proxy_rule(
2068 State(state): State<ManagementState>,
2069 Path(id): Path<String>,
2070) -> Result<Json<serde_json::Value>, StatusCode> {
2071 let proxy_config = match &state.proxy_config {
2072 Some(config) => config,
2073 None => {
2074 return Ok(Json(serde_json::json!({
2075 "error": "Proxy not configured. Proxy config not available."
2076 })));
2077 }
2078 };
2079
2080 let mut config = proxy_config.write().await;
2081 let rule_id: usize = match id.parse() {
2082 Ok(id) => id,
2083 Err(_) => {
2084 return Ok(Json(serde_json::json!({
2085 "error": format!("Invalid rule ID: {}", id)
2086 })));
2087 }
2088 };
2089
2090 let request_count = config.request_replacements.len();
2091
2092 if rule_id < request_count {
2093 config.request_replacements.remove(rule_id);
2095 } else if rule_id < request_count + config.response_replacements.len() {
2096 let response_idx = rule_id - request_count;
2098 config.response_replacements.remove(response_idx);
2099 } else {
2100 return Ok(Json(serde_json::json!({
2101 "error": format!("Rule ID {} not found", rule_id)
2102 })));
2103 }
2104
2105 Ok(Json(serde_json::json!({
2106 "id": rule_id,
2107 "message": "Rule deleted successfully"
2108 })))
2109}
2110
2111async fn get_proxy_inspect(
2114 State(_state): State<ManagementState>,
2115 Query(params): Query<std::collections::HashMap<String, String>>,
2116) -> Result<Json<serde_json::Value>, StatusCode> {
2117 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2118
2119 Ok(Json(serde_json::json!({
2125 "requests": [],
2126 "responses": [],
2127 "limit": limit,
2128 "total": 0,
2129 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2130 })))
2131}
2132
2133pub fn management_router(state: ManagementState) -> Router {
2135 let router = Router::new()
2136 .route("/health", get(health_check))
2137 .route("/stats", get(get_stats))
2138 .route("/config", get(get_config))
2139 .route("/config/validate", post(validate_config))
2140 .route("/config/bulk", post(bulk_update_config))
2141 .route("/mocks", get(list_mocks))
2142 .route("/mocks", post(create_mock))
2143 .route("/mocks/{id}", get(get_mock))
2144 .route("/mocks/{id}", put(update_mock))
2145 .route("/mocks/{id}", delete(delete_mock))
2146 .route("/export", get(export_mocks))
2147 .route("/import", post(import_mocks));
2148
2149 #[cfg(feature = "smtp")]
2150 let router = router
2151 .route("/smtp/mailbox", get(list_smtp_emails))
2152 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2153 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2154 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2155 .route("/smtp/mailbox/search", get(search_smtp_emails));
2156
2157 #[cfg(not(feature = "smtp"))]
2158 let router = router;
2159
2160 #[cfg(feature = "mqtt")]
2162 let router = router
2163 .route("/mqtt/stats", get(get_mqtt_stats))
2164 .route("/mqtt/clients", get(get_mqtt_clients))
2165 .route("/mqtt/topics", get(get_mqtt_topics))
2166 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2167 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2168 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2169 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2170
2171 #[cfg(not(feature = "mqtt"))]
2172 let router = router
2173 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2174 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2175
2176 #[cfg(feature = "kafka")]
2177 let router = router
2178 .route("/kafka/stats", get(get_kafka_stats))
2179 .route("/kafka/topics", get(get_kafka_topics))
2180 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2181 .route("/kafka/groups", get(get_kafka_groups))
2182 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2183 .route("/kafka/produce", post(produce_kafka_message))
2184 .route("/kafka/produce/batch", post(produce_kafka_batch))
2185 .route("/kafka/messages/stream", get(kafka_messages_stream));
2186
2187 #[cfg(not(feature = "kafka"))]
2188 let router = router;
2189
2190 let router = router
2192 .route("/migration/routes", get(get_migration_routes))
2193 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2194 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2195 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2196 .route("/migration/groups/{group}", put(set_group_migration_mode))
2197 .route("/migration/groups", get(get_migration_groups))
2198 .route("/migration/status", get(get_migration_status));
2199
2200 let router = router
2202 .route("/proxy/rules", get(list_proxy_rules))
2203 .route("/proxy/rules", post(create_proxy_rule))
2204 .route("/proxy/rules/{id}", get(get_proxy_rule))
2205 .route("/proxy/rules/{id}", put(update_proxy_rule))
2206 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2207 .route("/proxy/inspect", get(get_proxy_inspect));
2208
2209 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2211
2212 let router = router.nest(
2214 "/snapshot-diff",
2215 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2216 );
2217
2218 #[cfg(feature = "behavioral-cloning")]
2219 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2220
2221 let router = router
2222 .route("/mockai/learn", post(learn_from_examples))
2223 .route("/mockai/rules/explanations", get(list_rule_explanations))
2224 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2225 .route("/chaos/config", get(get_chaos_config))
2226 .route("/chaos/config", post(update_chaos_config))
2227 .route("/network/profiles", get(list_network_profiles))
2228 .route("/network/profile/apply", post(apply_network_profile));
2229
2230 let router =
2232 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2233
2234 router.with_state(state)
2235}
2236
2237#[cfg(feature = "kafka")]
2238#[derive(Debug, Clone, Serialize, Deserialize)]
2239pub struct KafkaBrokerStats {
2240 pub topics: usize,
2242 pub partitions: usize,
2244 pub consumer_groups: usize,
2246 pub messages_produced: u64,
2248 pub messages_consumed: u64,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaTopicInfo {
2255 pub name: String,
2256 pub partitions: usize,
2257 pub replication_factor: i32,
2258}
2259
2260#[cfg(feature = "kafka")]
2261#[derive(Debug, Clone, Serialize, Deserialize)]
2262pub struct KafkaConsumerGroupInfo {
2263 pub group_id: String,
2264 pub members: usize,
2265 pub state: String,
2266}
2267
2268#[cfg(feature = "kafka")]
2269async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2271 if let Some(broker) = &state.kafka_broker {
2272 let topics = broker.topics.read().await;
2273 let consumer_groups = broker.consumer_groups.read().await;
2274
2275 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2276
2277 let metrics_snapshot = broker.metrics().snapshot();
2279
2280 let stats = KafkaBrokerStats {
2281 topics: topics.len(),
2282 partitions: total_partitions,
2283 consumer_groups: consumer_groups.groups().len(),
2284 messages_produced: metrics_snapshot.messages_produced_total,
2285 messages_consumed: metrics_snapshot.messages_consumed_total,
2286 };
2287
2288 Json(stats).into_response()
2289 } else {
2290 (
2291 StatusCode::SERVICE_UNAVAILABLE,
2292 Json(serde_json::json!({
2293 "error": "Kafka broker not available",
2294 "message": "Kafka broker is not enabled or not available."
2295 })),
2296 )
2297 .into_response()
2298 }
2299}
2300
2301#[cfg(feature = "kafka")]
2302async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2304 if let Some(broker) = &state.kafka_broker {
2305 let topics = broker.topics.read().await;
2306 let topic_list: Vec<KafkaTopicInfo> = topics
2307 .iter()
2308 .map(|(name, topic)| KafkaTopicInfo {
2309 name: name.clone(),
2310 partitions: topic.partitions.len(),
2311 replication_factor: topic.config.replication_factor as i32,
2312 })
2313 .collect();
2314
2315 Json(serde_json::json!({
2316 "topics": topic_list
2317 }))
2318 .into_response()
2319 } else {
2320 (
2321 StatusCode::SERVICE_UNAVAILABLE,
2322 Json(serde_json::json!({
2323 "error": "Kafka broker not available",
2324 "message": "Kafka broker is not enabled or not available."
2325 })),
2326 )
2327 .into_response()
2328 }
2329}
2330
2331#[cfg(feature = "kafka")]
2332async fn get_kafka_topic(
2334 State(state): State<ManagementState>,
2335 Path(topic_name): Path<String>,
2336) -> impl IntoResponse {
2337 if let Some(broker) = &state.kafka_broker {
2338 let topics = broker.topics.read().await;
2339 if let Some(topic) = topics.get(&topic_name) {
2340 Json(serde_json::json!({
2341 "name": topic_name,
2342 "partitions": topic.partitions.len(),
2343 "replication_factor": topic.config.replication_factor,
2344 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2345 "id": idx as i32,
2346 "leader": 0,
2347 "replicas": vec![0],
2348 "message_count": partition.messages.len()
2349 })).collect::<Vec<_>>()
2350 })).into_response()
2351 } else {
2352 (
2353 StatusCode::NOT_FOUND,
2354 Json(serde_json::json!({
2355 "error": "Topic not found",
2356 "topic": topic_name
2357 })),
2358 )
2359 .into_response()
2360 }
2361 } else {
2362 (
2363 StatusCode::SERVICE_UNAVAILABLE,
2364 Json(serde_json::json!({
2365 "error": "Kafka broker not available",
2366 "message": "Kafka broker is not enabled or not available."
2367 })),
2368 )
2369 .into_response()
2370 }
2371}
2372
2373#[cfg(feature = "kafka")]
2374async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2376 if let Some(broker) = &state.kafka_broker {
2377 let consumer_groups = broker.consumer_groups.read().await;
2378 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2379 .groups()
2380 .iter()
2381 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2382 group_id: group_id.clone(),
2383 members: group.members.len(),
2384 state: "Stable".to_string(), })
2386 .collect();
2387
2388 Json(serde_json::json!({
2389 "groups": groups
2390 }))
2391 .into_response()
2392 } else {
2393 (
2394 StatusCode::SERVICE_UNAVAILABLE,
2395 Json(serde_json::json!({
2396 "error": "Kafka broker not available",
2397 "message": "Kafka broker is not enabled or not available."
2398 })),
2399 )
2400 .into_response()
2401 }
2402}
2403
2404#[cfg(feature = "kafka")]
2405async fn get_kafka_group(
2407 State(state): State<ManagementState>,
2408 Path(group_id): Path<String>,
2409) -> impl IntoResponse {
2410 if let Some(broker) = &state.kafka_broker {
2411 let consumer_groups = broker.consumer_groups.read().await;
2412 if let Some(group) = consumer_groups.groups().get(&group_id) {
2413 Json(serde_json::json!({
2414 "group_id": group_id,
2415 "members": group.members.len(),
2416 "state": "Stable",
2417 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2418 "member_id": member_id,
2419 "client_id": member.client_id,
2420 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2421 "topic": a.topic,
2422 "partitions": a.partitions
2423 })).collect::<Vec<_>>()
2424 })).collect::<Vec<_>>(),
2425 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2426 "topic": topic,
2427 "partition": partition,
2428 "offset": offset
2429 })).collect::<Vec<_>>()
2430 })).into_response()
2431 } else {
2432 (
2433 StatusCode::NOT_FOUND,
2434 Json(serde_json::json!({
2435 "error": "Consumer group not found",
2436 "group_id": group_id
2437 })),
2438 )
2439 .into_response()
2440 }
2441 } else {
2442 (
2443 StatusCode::SERVICE_UNAVAILABLE,
2444 Json(serde_json::json!({
2445 "error": "Kafka broker not available",
2446 "message": "Kafka broker is not enabled or not available."
2447 })),
2448 )
2449 .into_response()
2450 }
2451}
2452
2453#[cfg(feature = "kafka")]
2456#[derive(Debug, Deserialize)]
2457pub struct KafkaProduceRequest {
2458 pub topic: String,
2460 #[serde(default)]
2462 pub key: Option<String>,
2463 pub value: String,
2465 #[serde(default)]
2467 pub partition: Option<i32>,
2468 #[serde(default)]
2470 pub headers: Option<std::collections::HashMap<String, String>>,
2471}
2472
2473#[cfg(feature = "kafka")]
2474async fn produce_kafka_message(
2476 State(state): State<ManagementState>,
2477 Json(request): Json<KafkaProduceRequest>,
2478) -> impl IntoResponse {
2479 if let Some(broker) = &state.kafka_broker {
2480 let mut topics = broker.topics.write().await;
2481
2482 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2484 mockforge_kafka::topics::Topic::new(
2485 request.topic.clone(),
2486 mockforge_kafka::topics::TopicConfig::default(),
2487 )
2488 });
2489
2490 let partition_id = if let Some(partition) = request.partition {
2492 partition
2493 } else {
2494 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2495 };
2496
2497 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2499 return (
2500 StatusCode::BAD_REQUEST,
2501 Json(serde_json::json!({
2502 "error": "Invalid partition",
2503 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2504 })),
2505 )
2506 .into_response();
2507 }
2508
2509 let key_clone = request.key.clone();
2511 let headers_clone = request.headers.clone();
2512 let message = mockforge_kafka::partitions::KafkaMessage {
2513 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2515 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2516 value: request.value.as_bytes().to_vec(),
2517 headers: headers_clone
2518 .clone()
2519 .unwrap_or_default()
2520 .into_iter()
2521 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2522 .collect(),
2523 };
2524
2525 match topic_entry.produce(partition_id, message).await {
2527 Ok(offset) => {
2528 if let Some(broker) = &state.kafka_broker {
2530 broker.metrics().record_messages_produced(1);
2531 }
2532
2533 #[cfg(feature = "kafka")]
2535 {
2536 let event = MessageEvent::Kafka(KafkaMessageEvent {
2537 topic: request.topic.clone(),
2538 key: key_clone,
2539 value: request.value.clone(),
2540 partition: partition_id,
2541 offset,
2542 headers: headers_clone,
2543 timestamp: chrono::Utc::now().to_rfc3339(),
2544 });
2545 let _ = state.message_events.send(event);
2546 }
2547
2548 Json(serde_json::json!({
2549 "success": true,
2550 "message": format!("Message produced to topic '{}'", request.topic),
2551 "topic": request.topic,
2552 "partition": partition_id,
2553 "offset": offset
2554 }))
2555 .into_response()
2556 }
2557 Err(e) => (
2558 StatusCode::INTERNAL_SERVER_ERROR,
2559 Json(serde_json::json!({
2560 "error": "Failed to produce message",
2561 "message": e.to_string()
2562 })),
2563 )
2564 .into_response(),
2565 }
2566 } else {
2567 (
2568 StatusCode::SERVICE_UNAVAILABLE,
2569 Json(serde_json::json!({
2570 "error": "Kafka broker not available",
2571 "message": "Kafka broker is not enabled or not available."
2572 })),
2573 )
2574 .into_response()
2575 }
2576}
2577
2578#[cfg(feature = "kafka")]
2579#[derive(Debug, Deserialize)]
2580pub struct KafkaBatchProduceRequest {
2581 pub messages: Vec<KafkaProduceRequest>,
2583 #[serde(default = "default_delay")]
2585 pub delay_ms: u64,
2586}
2587
2588#[cfg(feature = "kafka")]
2589async fn produce_kafka_batch(
2591 State(state): State<ManagementState>,
2592 Json(request): Json<KafkaBatchProduceRequest>,
2593) -> impl IntoResponse {
2594 if let Some(broker) = &state.kafka_broker {
2595 if request.messages.is_empty() {
2596 return (
2597 StatusCode::BAD_REQUEST,
2598 Json(serde_json::json!({
2599 "error": "Empty batch",
2600 "message": "At least one message is required"
2601 })),
2602 )
2603 .into_response();
2604 }
2605
2606 let mut results = Vec::new();
2607
2608 for (index, msg_request) in request.messages.iter().enumerate() {
2609 let mut topics = broker.topics.write().await;
2610
2611 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2613 mockforge_kafka::topics::Topic::new(
2614 msg_request.topic.clone(),
2615 mockforge_kafka::topics::TopicConfig::default(),
2616 )
2617 });
2618
2619 let partition_id = if let Some(partition) = msg_request.partition {
2621 partition
2622 } else {
2623 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2624 };
2625
2626 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2628 results.push(serde_json::json!({
2629 "index": index,
2630 "success": false,
2631 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2632 }));
2633 continue;
2634 }
2635
2636 let message = mockforge_kafka::partitions::KafkaMessage {
2638 offset: 0,
2639 timestamp: chrono::Utc::now().timestamp_millis(),
2640 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2641 value: msg_request.value.as_bytes().to_vec(),
2642 headers: msg_request
2643 .headers
2644 .clone()
2645 .unwrap_or_default()
2646 .into_iter()
2647 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2648 .collect(),
2649 };
2650
2651 match topic_entry.produce(partition_id, message).await {
2653 Ok(offset) => {
2654 if let Some(broker) = &state.kafka_broker {
2656 broker.metrics().record_messages_produced(1);
2657 }
2658
2659 let event = MessageEvent::Kafka(KafkaMessageEvent {
2661 topic: msg_request.topic.clone(),
2662 key: msg_request.key.clone(),
2663 value: msg_request.value.clone(),
2664 partition: partition_id,
2665 offset,
2666 headers: msg_request.headers.clone(),
2667 timestamp: chrono::Utc::now().to_rfc3339(),
2668 });
2669 let _ = state.message_events.send(event);
2670
2671 results.push(serde_json::json!({
2672 "index": index,
2673 "success": true,
2674 "topic": msg_request.topic,
2675 "partition": partition_id,
2676 "offset": offset
2677 }));
2678 }
2679 Err(e) => {
2680 results.push(serde_json::json!({
2681 "index": index,
2682 "success": false,
2683 "error": e.to_string()
2684 }));
2685 }
2686 }
2687
2688 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2690 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2691 }
2692 }
2693
2694 let success_count =
2695 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2696
2697 Json(serde_json::json!({
2698 "success": true,
2699 "total": request.messages.len(),
2700 "succeeded": success_count,
2701 "failed": request.messages.len() - success_count,
2702 "results": results
2703 }))
2704 .into_response()
2705 } else {
2706 (
2707 StatusCode::SERVICE_UNAVAILABLE,
2708 Json(serde_json::json!({
2709 "error": "Kafka broker not available",
2710 "message": "Kafka broker is not enabled or not available."
2711 })),
2712 )
2713 .into_response()
2714 }
2715}
2716
2717#[cfg(feature = "mqtt")]
2720async fn mqtt_messages_stream(
2722 State(state): State<ManagementState>,
2723 Query(params): Query<std::collections::HashMap<String, String>>,
2724) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2725 let rx = state.message_events.subscribe();
2726 let topic_filter = params.get("topic").cloned();
2727
2728 let stream = stream::unfold(rx, move |mut rx| {
2729 let topic_filter = topic_filter.clone();
2730
2731 async move {
2732 loop {
2733 match rx.recv().await {
2734 Ok(MessageEvent::Mqtt(event)) => {
2735 if let Some(filter) = &topic_filter {
2737 if !event.topic.contains(filter) {
2738 continue;
2739 }
2740 }
2741
2742 let event_json = serde_json::json!({
2743 "protocol": "mqtt",
2744 "topic": event.topic,
2745 "payload": event.payload,
2746 "qos": event.qos,
2747 "retain": event.retain,
2748 "timestamp": event.timestamp,
2749 });
2750
2751 if let Ok(event_data) = serde_json::to_string(&event_json) {
2752 let sse_event = Event::default().event("mqtt_message").data(event_data);
2753 return Some((Ok(sse_event), rx));
2754 }
2755 }
2756 #[cfg(feature = "kafka")]
2757 Ok(MessageEvent::Kafka(_)) => {
2758 continue;
2760 }
2761 Err(broadcast::error::RecvError::Closed) => {
2762 return None;
2763 }
2764 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2765 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2766 continue;
2767 }
2768 }
2769 }
2770 }
2771 });
2772
2773 Sse::new(stream).keep_alive(
2774 axum::response::sse::KeepAlive::new()
2775 .interval(std::time::Duration::from_secs(15))
2776 .text("keep-alive-text"),
2777 )
2778}
2779
2780#[cfg(feature = "kafka")]
2781async fn kafka_messages_stream(
2783 State(state): State<ManagementState>,
2784 Query(params): Query<std::collections::HashMap<String, String>>,
2785) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2786 let mut rx = state.message_events.subscribe();
2787 let topic_filter = params.get("topic").cloned();
2788
2789 let stream = stream::unfold(rx, move |mut rx| {
2790 let topic_filter = topic_filter.clone();
2791
2792 async move {
2793 loop {
2794 match rx.recv().await {
2795 #[cfg(feature = "mqtt")]
2796 Ok(MessageEvent::Mqtt(_)) => {
2797 continue;
2799 }
2800 Ok(MessageEvent::Kafka(event)) => {
2801 if let Some(filter) = &topic_filter {
2803 if !event.topic.contains(filter) {
2804 continue;
2805 }
2806 }
2807
2808 let event_json = serde_json::json!({
2809 "protocol": "kafka",
2810 "topic": event.topic,
2811 "key": event.key,
2812 "value": event.value,
2813 "partition": event.partition,
2814 "offset": event.offset,
2815 "headers": event.headers,
2816 "timestamp": event.timestamp,
2817 });
2818
2819 if let Ok(event_data) = serde_json::to_string(&event_json) {
2820 let sse_event =
2821 Event::default().event("kafka_message").data(event_data);
2822 return Some((Ok(sse_event), rx));
2823 }
2824 }
2825 Err(broadcast::error::RecvError::Closed) => {
2826 return None;
2827 }
2828 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2829 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2830 continue;
2831 }
2832 }
2833 }
2834 }
2835 });
2836
2837 Sse::new(stream).keep_alive(
2838 axum::response::sse::KeepAlive::new()
2839 .interval(std::time::Duration::from_secs(15))
2840 .text("keep-alive-text"),
2841 )
2842}
2843
2844#[derive(Debug, Deserialize)]
2848pub struct GenerateSpecRequest {
2849 pub query: String,
2851 pub spec_type: String,
2853 pub api_version: Option<String>,
2855}
2856
2857#[derive(Debug, Deserialize)]
2859pub struct GenerateOpenApiFromTrafficRequest {
2860 #[serde(default)]
2862 pub database_path: Option<String>,
2863 #[serde(default)]
2865 pub since: Option<String>,
2866 #[serde(default)]
2868 pub until: Option<String>,
2869 #[serde(default)]
2871 pub path_pattern: Option<String>,
2872 #[serde(default = "default_min_confidence")]
2874 pub min_confidence: f64,
2875}
2876
2877fn default_min_confidence() -> f64 {
2878 0.7
2879}
2880
2881#[cfg(feature = "data-faker")]
2883async fn generate_ai_spec(
2884 State(_state): State<ManagementState>,
2885 Json(request): Json<GenerateSpecRequest>,
2886) -> impl IntoResponse {
2887 use mockforge_data::rag::{
2888 config::{EmbeddingProvider, LlmProvider, RagConfig},
2889 engine::RagEngine,
2890 storage::{DocumentStorage, StorageFactory},
2891 };
2892 use std::sync::Arc;
2893
2894 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2896 .ok()
2897 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2898
2899 if api_key.is_none() {
2901 return (
2902 StatusCode::SERVICE_UNAVAILABLE,
2903 Json(serde_json::json!({
2904 "error": "AI service not configured",
2905 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2906 })),
2907 )
2908 .into_response();
2909 }
2910
2911 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2913 .unwrap_or_else(|_| "openai".to_string())
2914 .to_lowercase();
2915
2916 let provider = match provider_str.as_str() {
2917 "openai" => LlmProvider::OpenAI,
2918 "anthropic" => LlmProvider::Anthropic,
2919 "ollama" => LlmProvider::Ollama,
2920 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2921 _ => LlmProvider::OpenAI,
2922 };
2923
2924 let api_endpoint =
2925 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2926 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2927 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2928 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2929 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2930 });
2931
2932 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2933 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2934 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2935 LlmProvider::Ollama => "llama2".to_string(),
2936 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2937 });
2938
2939 let mut rag_config = RagConfig::default();
2941 rag_config.provider = provider;
2942 rag_config.api_endpoint = api_endpoint;
2943 rag_config.api_key = api_key;
2944 rag_config.model = model;
2945 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2946 .unwrap_or_else(|_| "4096".to_string())
2947 .parse()
2948 .unwrap_or(4096);
2949 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2950 .unwrap_or_else(|_| "0.3".to_string())
2951 .parse()
2952 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2954 .unwrap_or_else(|_| "60".to_string())
2955 .parse()
2956 .unwrap_or(60);
2957 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2958 .unwrap_or_else(|_| "4000".to_string())
2959 .parse()
2960 .unwrap_or(4000);
2961
2962 let spec_type_label = match request.spec_type.as_str() {
2964 "openapi" => "OpenAPI 3.0",
2965 "graphql" => "GraphQL",
2966 "asyncapi" => "AsyncAPI",
2967 _ => "OpenAPI 3.0",
2968 };
2969
2970 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2971
2972 let prompt = format!(
2973 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2974
2975User Requirements:
2976{}
2977
2978Instructions:
29791. Generate a complete, valid {} specification
29802. Include all paths, operations, request/response schemas, and components
29813. Use realistic field names and data types
29824. Include proper descriptions and examples
29835. Follow {} best practices
29846. Return ONLY the specification, no additional explanation
29857. For OpenAPI, use version {}
2986
2987Return the specification in {} format."#,
2988 spec_type_label,
2989 request.query,
2990 spec_type_label,
2991 spec_type_label,
2992 api_version,
2993 if request.spec_type == "graphql" {
2994 "GraphQL SDL"
2995 } else {
2996 "YAML"
2997 }
2998 );
2999
3000 use mockforge_data::rag::storage::InMemoryStorage;
3005 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3006
3007 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3009 Ok(engine) => engine,
3010 Err(e) => {
3011 return (
3012 StatusCode::INTERNAL_SERVER_ERROR,
3013 Json(serde_json::json!({
3014 "error": "Failed to initialize RAG engine",
3015 "message": e.to_string()
3016 })),
3017 )
3018 .into_response();
3019 }
3020 };
3021
3022 match rag_engine.generate(&prompt, None).await {
3024 Ok(generated_text) => {
3025 let spec = if request.spec_type == "graphql" {
3027 extract_graphql_schema(&generated_text)
3029 } else {
3030 extract_yaml_spec(&generated_text)
3032 };
3033
3034 Json(serde_json::json!({
3035 "success": true,
3036 "spec": spec,
3037 "spec_type": request.spec_type,
3038 }))
3039 .into_response()
3040 }
3041 Err(e) => (
3042 StatusCode::INTERNAL_SERVER_ERROR,
3043 Json(serde_json::json!({
3044 "error": "AI generation failed",
3045 "message": e.to_string()
3046 })),
3047 )
3048 .into_response(),
3049 }
3050}
3051
3052#[cfg(not(feature = "data-faker"))]
3053async fn generate_ai_spec(
3054 State(_state): State<ManagementState>,
3055 Json(_request): Json<GenerateSpecRequest>,
3056) -> impl IntoResponse {
3057 (
3058 StatusCode::NOT_IMPLEMENTED,
3059 Json(serde_json::json!({
3060 "error": "AI features not enabled",
3061 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3062 })),
3063 )
3064 .into_response()
3065}
3066
3067#[cfg(feature = "behavioral-cloning")]
3069async fn generate_openapi_from_traffic(
3070 State(_state): State<ManagementState>,
3071 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3072) -> impl IntoResponse {
3073 use chrono::{DateTime, Utc};
3074 use mockforge_core::intelligent_behavior::{
3075 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3076 IntelligentBehaviorConfig,
3077 };
3078 use mockforge_recorder::{
3079 database::RecorderDatabase,
3080 openapi_export::{QueryFilters, RecordingsToOpenApi},
3081 };
3082 use std::path::PathBuf;
3083
3084 let db_path = if let Some(ref path) = request.database_path {
3086 PathBuf::from(path)
3087 } else {
3088 std::env::current_dir()
3089 .unwrap_or_else(|_| PathBuf::from("."))
3090 .join("recordings.db")
3091 };
3092
3093 let db = match RecorderDatabase::new(&db_path).await {
3095 Ok(db) => db,
3096 Err(e) => {
3097 return (
3098 StatusCode::BAD_REQUEST,
3099 Json(serde_json::json!({
3100 "error": "Database error",
3101 "message": format!("Failed to open recorder database: {}", e)
3102 })),
3103 )
3104 .into_response();
3105 }
3106 };
3107
3108 let since_dt = if let Some(ref since_str) = request.since {
3110 match DateTime::parse_from_rfc3339(since_str) {
3111 Ok(dt) => Some(dt.with_timezone(&Utc)),
3112 Err(e) => {
3113 return (
3114 StatusCode::BAD_REQUEST,
3115 Json(serde_json::json!({
3116 "error": "Invalid date format",
3117 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3118 })),
3119 )
3120 .into_response();
3121 }
3122 }
3123 } else {
3124 None
3125 };
3126
3127 let until_dt = if let Some(ref until_str) = request.until {
3128 match DateTime::parse_from_rfc3339(until_str) {
3129 Ok(dt) => Some(dt.with_timezone(&Utc)),
3130 Err(e) => {
3131 return (
3132 StatusCode::BAD_REQUEST,
3133 Json(serde_json::json!({
3134 "error": "Invalid date format",
3135 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3136 })),
3137 )
3138 .into_response();
3139 }
3140 }
3141 } else {
3142 None
3143 };
3144
3145 let query_filters = QueryFilters {
3147 since: since_dt,
3148 until: until_dt,
3149 path_pattern: request.path_pattern.clone(),
3150 min_status_code: None,
3151 max_requests: Some(1000),
3152 };
3153
3154 let exchanges_from_recorder =
3159 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3160 Ok(exchanges) => exchanges,
3161 Err(e) => {
3162 return (
3163 StatusCode::INTERNAL_SERVER_ERROR,
3164 Json(serde_json::json!({
3165 "error": "Query error",
3166 "message": format!("Failed to query HTTP exchanges: {}", e)
3167 })),
3168 )
3169 .into_response();
3170 }
3171 };
3172
3173 if exchanges_from_recorder.is_empty() {
3174 return (
3175 StatusCode::NOT_FOUND,
3176 Json(serde_json::json!({
3177 "error": "No exchanges found",
3178 "message": "No HTTP exchanges found matching the specified filters"
3179 })),
3180 )
3181 .into_response();
3182 }
3183
3184 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3186 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3187 .into_iter()
3188 .map(|e| LocalHttpExchange {
3189 method: e.method,
3190 path: e.path,
3191 query_params: e.query_params,
3192 headers: e.headers,
3193 body: e.body,
3194 body_encoding: e.body_encoding,
3195 status_code: e.status_code,
3196 response_headers: e.response_headers,
3197 response_body: e.response_body,
3198 response_body_encoding: e.response_body_encoding,
3199 timestamp: e.timestamp,
3200 })
3201 .collect();
3202
3203 let behavior_config = IntelligentBehaviorConfig::default();
3205 let gen_config = OpenApiGenerationConfig {
3206 min_confidence: request.min_confidence,
3207 behavior_model: Some(behavior_config.behavior_model),
3208 };
3209
3210 let generator = OpenApiSpecGenerator::new(gen_config);
3212 let result = match generator.generate_from_exchanges(exchanges).await {
3213 Ok(result) => result,
3214 Err(e) => {
3215 return (
3216 StatusCode::INTERNAL_SERVER_ERROR,
3217 Json(serde_json::json!({
3218 "error": "Generation error",
3219 "message": format!("Failed to generate OpenAPI spec: {}", e)
3220 })),
3221 )
3222 .into_response();
3223 }
3224 };
3225
3226 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3228 raw.clone()
3229 } else {
3230 match serde_json::to_value(&result.spec.spec) {
3231 Ok(json) => json,
3232 Err(e) => {
3233 return (
3234 StatusCode::INTERNAL_SERVER_ERROR,
3235 Json(serde_json::json!({
3236 "error": "Serialization error",
3237 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3238 })),
3239 )
3240 .into_response();
3241 }
3242 }
3243 };
3244
3245 let response = serde_json::json!({
3247 "spec": spec_json,
3248 "metadata": {
3249 "requests_analyzed": result.metadata.requests_analyzed,
3250 "paths_inferred": result.metadata.paths_inferred,
3251 "path_confidence": result.metadata.path_confidence,
3252 "generated_at": result.metadata.generated_at.to_rfc3339(),
3253 "duration_ms": result.metadata.duration_ms,
3254 }
3255 });
3256
3257 Json(response).into_response()
3258}
3259
3260async fn list_rule_explanations(
3262 State(state): State<ManagementState>,
3263 Query(params): Query<std::collections::HashMap<String, String>>,
3264) -> impl IntoResponse {
3265 use mockforge_core::intelligent_behavior::RuleType;
3266
3267 let explanations = state.rule_explanations.read().await;
3268 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3269
3270 if let Some(rule_type_str) = params.get("rule_type") {
3272 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3273 explanations_vec.retain(|e| e.rule_type == rule_type);
3274 }
3275 }
3276
3277 if let Some(min_confidence_str) = params.get("min_confidence") {
3279 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3280 explanations_vec.retain(|e| e.confidence >= min_confidence);
3281 }
3282 }
3283
3284 explanations_vec.sort_by(|a, b| {
3286 b.confidence
3287 .partial_cmp(&a.confidence)
3288 .unwrap_or(std::cmp::Ordering::Equal)
3289 .then_with(|| b.generated_at.cmp(&a.generated_at))
3290 });
3291
3292 Json(serde_json::json!({
3293 "explanations": explanations_vec,
3294 "total": explanations_vec.len(),
3295 }))
3296 .into_response()
3297}
3298
3299async fn get_rule_explanation(
3301 State(state): State<ManagementState>,
3302 Path(rule_id): Path<String>,
3303) -> impl IntoResponse {
3304 let explanations = state.rule_explanations.read().await;
3305
3306 match explanations.get(&rule_id) {
3307 Some(explanation) => Json(serde_json::json!({
3308 "explanation": explanation,
3309 }))
3310 .into_response(),
3311 None => (
3312 StatusCode::NOT_FOUND,
3313 Json(serde_json::json!({
3314 "error": "Rule explanation not found",
3315 "message": format!("No explanation found for rule ID: {}", rule_id)
3316 })),
3317 )
3318 .into_response(),
3319 }
3320}
3321
3322#[derive(Debug, Deserialize)]
3324pub struct LearnFromExamplesRequest {
3325 pub examples: Vec<ExamplePairRequest>,
3327 #[serde(default)]
3329 pub config: Option<serde_json::Value>,
3330}
3331
3332#[derive(Debug, Deserialize)]
3334pub struct ExamplePairRequest {
3335 pub request: serde_json::Value,
3337 pub response: serde_json::Value,
3339}
3340
3341async fn learn_from_examples(
3346 State(state): State<ManagementState>,
3347 Json(request): Json<LearnFromExamplesRequest>,
3348) -> impl IntoResponse {
3349 use mockforge_core::intelligent_behavior::{
3350 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3351 rule_generator::{ExamplePair, RuleGenerator},
3352 };
3353
3354 if request.examples.is_empty() {
3355 return (
3356 StatusCode::BAD_REQUEST,
3357 Json(serde_json::json!({
3358 "error": "No examples provided",
3359 "message": "At least one example pair is required"
3360 })),
3361 )
3362 .into_response();
3363 }
3364
3365 let example_pairs: Result<Vec<ExamplePair>, String> = request
3367 .examples
3368 .into_iter()
3369 .enumerate()
3370 .map(|(idx, ex)| {
3371 let method = ex
3373 .request
3374 .get("method")
3375 .and_then(|v| v.as_str())
3376 .map(|s| s.to_string())
3377 .unwrap_or_else(|| "GET".to_string());
3378 let path = ex
3379 .request
3380 .get("path")
3381 .and_then(|v| v.as_str())
3382 .map(|s| s.to_string())
3383 .unwrap_or_else(|| "/".to_string());
3384 let request_body = ex.request.get("body").cloned();
3385 let query_params = ex
3386 .request
3387 .get("query_params")
3388 .and_then(|v| v.as_object())
3389 .map(|obj| {
3390 obj.iter()
3391 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3392 .collect()
3393 })
3394 .unwrap_or_default();
3395 let headers = ex
3396 .request
3397 .get("headers")
3398 .and_then(|v| v.as_object())
3399 .map(|obj| {
3400 obj.iter()
3401 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3402 .collect()
3403 })
3404 .unwrap_or_default();
3405
3406 let status = ex
3408 .response
3409 .get("status_code")
3410 .or_else(|| ex.response.get("status"))
3411 .and_then(|v| v.as_u64())
3412 .map(|n| n as u16)
3413 .unwrap_or(200);
3414 let response_body = ex.response.get("body").cloned();
3415
3416 Ok(ExamplePair {
3417 method,
3418 path,
3419 request: request_body,
3420 status,
3421 response: response_body,
3422 query_params,
3423 headers,
3424 metadata: {
3425 let mut meta = std::collections::HashMap::new();
3426 meta.insert("source".to_string(), "api".to_string());
3427 meta.insert("example_index".to_string(), idx.to_string());
3428 meta
3429 },
3430 })
3431 })
3432 .collect();
3433
3434 let example_pairs = match example_pairs {
3435 Ok(pairs) => pairs,
3436 Err(e) => {
3437 return (
3438 StatusCode::BAD_REQUEST,
3439 Json(serde_json::json!({
3440 "error": "Invalid examples",
3441 "message": e
3442 })),
3443 )
3444 .into_response();
3445 }
3446 };
3447
3448 let behavior_config = if let Some(config_json) = request.config {
3450 serde_json::from_value(config_json)
3452 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3453 .behavior_model
3454 } else {
3455 BehaviorModelConfig::default()
3456 };
3457
3458 let generator = RuleGenerator::new(behavior_config);
3460
3461 let (rules, explanations) =
3463 match generator.generate_rules_with_explanations(example_pairs).await {
3464 Ok(result) => result,
3465 Err(e) => {
3466 return (
3467 StatusCode::INTERNAL_SERVER_ERROR,
3468 Json(serde_json::json!({
3469 "error": "Rule generation failed",
3470 "message": format!("Failed to generate rules: {}", e)
3471 })),
3472 )
3473 .into_response();
3474 }
3475 };
3476
3477 {
3479 let mut stored_explanations = state.rule_explanations.write().await;
3480 for explanation in &explanations {
3481 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3482 }
3483 }
3484
3485 let response = serde_json::json!({
3487 "success": true,
3488 "rules_generated": {
3489 "consistency_rules": rules.consistency_rules.len(),
3490 "schemas": rules.schemas.len(),
3491 "state_machines": rules.state_transitions.len(),
3492 "system_prompt": !rules.system_prompt.is_empty(),
3493 },
3494 "explanations": explanations.iter().map(|e| serde_json::json!({
3495 "rule_id": e.rule_id,
3496 "rule_type": e.rule_type,
3497 "confidence": e.confidence,
3498 "reasoning": e.reasoning,
3499 })).collect::<Vec<_>>(),
3500 "total_explanations": explanations.len(),
3501 });
3502
3503 Json(response).into_response()
3504}
3505
3506fn extract_yaml_spec(text: &str) -> String {
3507 if let Some(start) = text.find("```yaml") {
3509 let yaml_start = text[start + 7..].trim_start();
3510 if let Some(end) = yaml_start.find("```") {
3511 return yaml_start[..end].trim().to_string();
3512 }
3513 }
3514 if let Some(start) = text.find("```") {
3515 let content_start = text[start + 3..].trim_start();
3516 if let Some(end) = content_start.find("```") {
3517 return content_start[..end].trim().to_string();
3518 }
3519 }
3520
3521 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3523 return text.trim().to_string();
3524 }
3525
3526 text.trim().to_string()
3528}
3529
3530fn extract_graphql_schema(text: &str) -> String {
3532 if let Some(start) = text.find("```graphql") {
3534 let schema_start = text[start + 10..].trim_start();
3535 if let Some(end) = schema_start.find("```") {
3536 return schema_start[..end].trim().to_string();
3537 }
3538 }
3539 if let Some(start) = text.find("```") {
3540 let content_start = text[start + 3..].trim_start();
3541 if let Some(end) = content_start.find("```") {
3542 return content_start[..end].trim().to_string();
3543 }
3544 }
3545
3546 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3548 return text.trim().to_string();
3549 }
3550
3551 text.trim().to_string()
3552}
3553
3554async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3558 #[cfg(feature = "chaos")]
3559 {
3560 if let Some(chaos_state) = &state.chaos_api_state {
3561 let config = chaos_state.config.read().await;
3562 Json(serde_json::json!({
3564 "enabled": config.enabled,
3565 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3566 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3567 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3568 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3569 }))
3570 .into_response()
3571 } else {
3572 Json(serde_json::json!({
3574 "enabled": false,
3575 "latency": null,
3576 "fault_injection": null,
3577 "rate_limit": null,
3578 "traffic_shaping": null,
3579 }))
3580 .into_response()
3581 }
3582 }
3583 #[cfg(not(feature = "chaos"))]
3584 {
3585 Json(serde_json::json!({
3587 "enabled": false,
3588 "latency": null,
3589 "fault_injection": null,
3590 "rate_limit": null,
3591 "traffic_shaping": null,
3592 }))
3593 .into_response()
3594 }
3595}
3596
3597#[derive(Debug, Deserialize)]
3599pub struct ChaosConfigUpdate {
3600 pub enabled: Option<bool>,
3602 pub latency: Option<serde_json::Value>,
3604 pub fault_injection: Option<serde_json::Value>,
3606 pub rate_limit: Option<serde_json::Value>,
3608 pub traffic_shaping: Option<serde_json::Value>,
3610}
3611
3612async fn update_chaos_config(
3614 State(state): State<ManagementState>,
3615 Json(config_update): Json<ChaosConfigUpdate>,
3616) -> impl IntoResponse {
3617 #[cfg(feature = "chaos")]
3618 {
3619 if let Some(chaos_state) = &state.chaos_api_state {
3620 use mockforge_chaos::config::{
3621 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3622 TrafficShapingConfig,
3623 };
3624
3625 let mut config = chaos_state.config.write().await;
3626
3627 if let Some(enabled) = config_update.enabled {
3629 config.enabled = enabled;
3630 }
3631
3632 if let Some(latency_json) = config_update.latency {
3634 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3635 config.latency = Some(latency);
3636 }
3637 }
3638
3639 if let Some(fault_json) = config_update.fault_injection {
3641 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3642 config.fault_injection = Some(fault);
3643 }
3644 }
3645
3646 if let Some(rate_json) = config_update.rate_limit {
3648 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3649 config.rate_limit = Some(rate);
3650 }
3651 }
3652
3653 if let Some(traffic_json) = config_update.traffic_shaping {
3655 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3656 config.traffic_shaping = Some(traffic);
3657 }
3658 }
3659
3660 drop(config);
3663
3664 info!("Chaos configuration updated successfully");
3665 Json(serde_json::json!({
3666 "success": true,
3667 "message": "Chaos configuration updated and applied"
3668 }))
3669 .into_response()
3670 } else {
3671 (
3672 StatusCode::SERVICE_UNAVAILABLE,
3673 Json(serde_json::json!({
3674 "success": false,
3675 "error": "Chaos API not available",
3676 "message": "Chaos engineering is not enabled or configured"
3677 })),
3678 )
3679 .into_response()
3680 }
3681 }
3682 #[cfg(not(feature = "chaos"))]
3683 {
3684 (
3685 StatusCode::NOT_IMPLEMENTED,
3686 Json(serde_json::json!({
3687 "success": false,
3688 "error": "Chaos feature not enabled",
3689 "message": "Chaos engineering feature is not compiled into this build"
3690 })),
3691 )
3692 .into_response()
3693 }
3694}
3695
3696async fn list_network_profiles() -> impl IntoResponse {
3700 use mockforge_core::network_profiles::NetworkProfileCatalog;
3701
3702 let catalog = NetworkProfileCatalog::default();
3703 let profiles: Vec<serde_json::Value> = catalog
3704 .list_profiles_with_description()
3705 .iter()
3706 .map(|(name, description)| {
3707 serde_json::json!({
3708 "name": name,
3709 "description": description,
3710 })
3711 })
3712 .collect();
3713
3714 Json(serde_json::json!({
3715 "profiles": profiles
3716 }))
3717 .into_response()
3718}
3719
3720#[derive(Debug, Deserialize)]
3721pub struct ApplyNetworkProfileRequest {
3723 pub profile_name: String,
3725}
3726
3727async fn apply_network_profile(
3729 State(state): State<ManagementState>,
3730 Json(request): Json<ApplyNetworkProfileRequest>,
3731) -> impl IntoResponse {
3732 use mockforge_core::network_profiles::NetworkProfileCatalog;
3733
3734 let catalog = NetworkProfileCatalog::default();
3735 if let Some(profile) = catalog.get(&request.profile_name) {
3736 if let Some(server_config) = &state.server_config {
3739 let mut config = server_config.write().await;
3740
3741 use mockforge_core::config::NetworkShapingConfig;
3743
3744 let network_shaping = NetworkShapingConfig {
3748 enabled: profile.traffic_shaping.bandwidth.enabled
3749 || profile.traffic_shaping.burst_loss.enabled,
3750 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3752 max_connections: 1000, };
3754
3755 if let Some(ref mut chaos) = config.observability.chaos {
3758 chaos.traffic_shaping = Some(network_shaping);
3759 } else {
3760 use mockforge_core::config::ChaosEngConfig;
3762 config.observability.chaos = Some(ChaosEngConfig {
3763 enabled: true,
3764 latency: None,
3765 fault_injection: None,
3766 rate_limit: None,
3767 traffic_shaping: Some(network_shaping),
3768 scenario: None,
3769 });
3770 }
3771
3772 info!("Network profile '{}' applied to server configuration", request.profile_name);
3773 } else {
3774 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3775 }
3776
3777 #[cfg(feature = "chaos")]
3779 {
3780 if let Some(chaos_state) = &state.chaos_api_state {
3781 use mockforge_chaos::config::TrafficShapingConfig;
3782
3783 let mut chaos_config = chaos_state.config.write().await;
3784 let chaos_traffic_shaping = TrafficShapingConfig {
3786 enabled: profile.traffic_shaping.bandwidth.enabled
3787 || profile.traffic_shaping.burst_loss.enabled,
3788 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3790 max_connections: 0,
3791 connection_timeout_ms: 30000,
3792 };
3793 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3794 chaos_config.enabled = true; drop(chaos_config);
3796 info!("Network profile '{}' applied to chaos API state", request.profile_name);
3797 }
3798 }
3799
3800 Json(serde_json::json!({
3801 "success": true,
3802 "message": format!("Network profile '{}' applied", request.profile_name),
3803 "profile": {
3804 "name": profile.name,
3805 "description": profile.description,
3806 }
3807 }))
3808 .into_response()
3809 } else {
3810 (
3811 StatusCode::NOT_FOUND,
3812 Json(serde_json::json!({
3813 "error": "Profile not found",
3814 "message": format!("Network profile '{}' not found", request.profile_name)
3815 })),
3816 )
3817 .into_response()
3818 }
3819}
3820
3821pub fn management_router_with_ui_builder(
3823 state: ManagementState,
3824 server_config: mockforge_core::config::ServerConfig,
3825) -> Router {
3826 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3827
3828 let management = management_router(state);
3830
3831 let ui_builder_state = UIBuilderState::new(server_config);
3833 let ui_builder = create_ui_builder_router(ui_builder_state);
3834
3835 management.nest("/ui-builder", ui_builder)
3837}
3838
3839pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3841 use crate::spec_import::{spec_import_router, SpecImportState};
3842
3843 let management = management_router(state);
3845
3846 Router::new()
3848 .merge(management)
3849 .merge(spec_import_router(SpecImportState::new()))
3850}
3851
3852#[cfg(test)]
3853mod tests {
3854 use super::*;
3855
3856 #[tokio::test]
3857 async fn test_create_and_get_mock() {
3858 let state = ManagementState::new(None, None, 3000);
3859
3860 let mock = MockConfig {
3861 id: "test-1".to_string(),
3862 name: "Test Mock".to_string(),
3863 method: "GET".to_string(),
3864 path: "/test".to_string(),
3865 response: MockResponse {
3866 body: serde_json::json!({"message": "test"}),
3867 headers: None,
3868 },
3869 enabled: true,
3870 latency_ms: None,
3871 status_code: Some(200),
3872 request_match: None,
3873 priority: None,
3874 scenario: None,
3875 required_scenario_state: None,
3876 new_scenario_state: None,
3877 };
3878
3879 {
3881 let mut mocks = state.mocks.write().await;
3882 mocks.push(mock.clone());
3883 }
3884
3885 let mocks = state.mocks.read().await;
3887 let found = mocks.iter().find(|m| m.id == "test-1");
3888 assert!(found.is_some());
3889 assert_eq!(found.unwrap().name, "Test Mock");
3890 }
3891
3892 #[tokio::test]
3893 async fn test_server_stats() {
3894 let state = ManagementState::new(None, None, 3000);
3895
3896 {
3898 let mut mocks = state.mocks.write().await;
3899 mocks.push(MockConfig {
3900 id: "1".to_string(),
3901 name: "Mock 1".to_string(),
3902 method: "GET".to_string(),
3903 path: "/test1".to_string(),
3904 response: MockResponse {
3905 body: serde_json::json!({}),
3906 headers: None,
3907 },
3908 enabled: true,
3909 latency_ms: None,
3910 status_code: Some(200),
3911 request_match: None,
3912 priority: None,
3913 scenario: None,
3914 required_scenario_state: None,
3915 new_scenario_state: None,
3916 });
3917 mocks.push(MockConfig {
3918 id: "2".to_string(),
3919 name: "Mock 2".to_string(),
3920 method: "POST".to_string(),
3921 path: "/test2".to_string(),
3922 response: MockResponse {
3923 body: serde_json::json!({}),
3924 headers: None,
3925 },
3926 enabled: false,
3927 latency_ms: None,
3928 status_code: Some(201),
3929 request_match: None,
3930 priority: None,
3931 scenario: None,
3932 required_scenario_state: None,
3933 new_scenario_state: None,
3934 });
3935 }
3936
3937 let mocks = state.mocks.read().await;
3938 assert_eq!(mocks.len(), 2);
3939 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3940 }
3941}