1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json, Response,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33 #[cfg(feature = "mqtt")]
34 Mqtt(MqttMessageEvent),
36 #[cfg(feature = "kafka")]
37 Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45 pub topic: String,
47 pub payload: String,
49 pub qos: u8,
51 pub retain: bool,
53 pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60 pub topic: String,
61 pub key: Option<String>,
62 pub value: String,
63 pub partition: i32,
64 pub offset: i64,
65 pub headers: Option<std::collections::HashMap<String, String>>,
66 pub timestamp: String,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72 #[serde(skip_serializing_if = "String::is_empty")]
74 pub id: String,
75 pub name: String,
77 pub method: String,
79 pub path: String,
81 pub response: MockResponse,
83 #[serde(default = "default_true")]
85 pub enabled: bool,
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub latency_ms: Option<u64>,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub status_code: Option<u16>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub request_match: Option<RequestMatchCriteria>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub priority: Option<i32>,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub scenario: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub required_scenario_state: Option<String>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116 pub body: serde_json::Value,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128 pub headers: std::collections::HashMap<String, String>,
129 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131 pub query_params: std::collections::HashMap<String, String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub body_pattern: Option<String>,
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub json_path: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub xpath: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub custom_matcher: Option<String>,
144}
145
146pub fn mock_matches_request(
155 mock: &MockConfig,
156 method: &str,
157 path: &str,
158 headers: &std::collections::HashMap<String, String>,
159 query_params: &std::collections::HashMap<String, String>,
160 body: Option<&[u8]>,
161) -> bool {
162 use regex::Regex;
163
164 if !mock.enabled {
166 return false;
167 }
168
169 if mock.method.to_uppercase() != method.to_uppercase() {
171 return false;
172 }
173
174 if !path_matches_pattern(&mock.path, path) {
176 return false;
177 }
178
179 if let Some(criteria) = &mock.request_match {
181 for (key, expected_value) in &criteria.headers {
183 let header_key_lower = key.to_lowercase();
184 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186 if let Some((_, actual_value)) = found {
187 if let Ok(re) = Regex::new(expected_value) {
189 if !re.is_match(actual_value) {
190 return false;
191 }
192 } else if actual_value != expected_value {
193 return false;
194 }
195 } else {
196 return false; }
198 }
199
200 for (key, expected_value) in &criteria.query_params {
202 if let Some(actual_value) = query_params.get(key) {
203 if actual_value != expected_value {
204 return false;
205 }
206 } else {
207 return false; }
209 }
210
211 if let Some(pattern) = &criteria.body_pattern {
213 if let Some(body_bytes) = body {
214 let body_str = String::from_utf8_lossy(body_bytes);
215 if let Ok(re) = Regex::new(pattern) {
217 if !re.is_match(&body_str) {
218 return false;
219 }
220 } else if body_str.as_ref() != pattern {
221 return false;
222 }
223 } else {
224 return false; }
226 }
227
228 if let Some(json_path) = &criteria.json_path {
230 if let Some(body_bytes) = body {
231 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233 if !json_path_exists(&json_value, json_path) {
235 return false;
236 }
237 }
238 }
239 }
240 }
241
242 if let Some(_xpath) = &criteria.xpath {
244 tracing::warn!("XPath matching not yet fully implemented");
247 }
248
249 if let Some(custom) = &criteria.custom_matcher {
251 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252 return false;
253 }
254 }
255 }
256
257 true
258}
259
260fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262 if pattern == path {
264 return true;
265 }
266
267 if pattern == "*" {
269 return true;
270 }
271
272 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276 if pattern_parts.len() != path_parts.len() {
277 if pattern.contains('*') {
279 return matches_wildcard_pattern(pattern, path);
280 }
281 return false;
282 }
283
284 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287 continue; }
289
290 if pattern_part != path_part {
291 return false;
292 }
293 }
294
295 true
296}
297
298fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300 use regex::Regex;
301
302 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306 return re.is_match(path);
307 }
308
309 false
310}
311
312fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314 if json_path.starts_with("$.") {
317 let path = &json_path[2..];
318 let parts: Vec<&str> = path.split('.').collect();
319
320 let mut current = json;
321 for part in parts {
322 if let Some(obj) = current.as_object() {
323 if let Some(value) = obj.get(part) {
324 current = value;
325 } else {
326 return false;
327 }
328 } else {
329 return false;
330 }
331 }
332 true
333 } else {
334 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336 false
337 }
338}
339
340fn evaluate_custom_matcher(
342 expression: &str,
343 method: &str,
344 path: &str,
345 headers: &std::collections::HashMap<String, String>,
346 query_params: &std::collections::HashMap<String, String>,
347 body: Option<&[u8]>,
348) -> bool {
349 use regex::Regex;
350
351 let expr = expression.trim();
352
353 if expr.contains("==") {
355 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356 if parts.len() != 2 {
357 return false;
358 }
359
360 let field = parts[0];
361 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363 match field {
364 "method" => method == expected_value,
365 "path" => path == expected_value,
366 _ if field.starts_with("headers.") => {
367 let header_name = &field[8..];
368 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369 }
370 _ if field.starts_with("query.") => {
371 let param_name = &field[6..];
372 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373 }
374 _ => false,
375 }
376 }
377 else if expr.contains("=~") {
379 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380 if parts.len() != 2 {
381 return false;
382 }
383
384 let field = parts[0];
385 let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387 if let Ok(re) = Regex::new(pattern) {
388 match field {
389 "method" => re.is_match(method),
390 "path" => re.is_match(path),
391 _ if field.starts_with("headers.") => {
392 let header_name = &field[8..];
393 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394 }
395 _ if field.starts_with("query.") => {
396 let param_name = &field[6..];
397 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398 }
399 _ => false,
400 }
401 } else {
402 false
403 }
404 }
405 else if expr.contains("contains") {
407 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408 if parts.len() != 2 {
409 return false;
410 }
411
412 let field = parts[0];
413 let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415 match field {
416 "path" => path.contains(search_value),
417 _ if field.starts_with("headers.") => {
418 let header_name = &field[8..];
419 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420 }
421 _ if field.starts_with("body") => {
422 if let Some(body_bytes) = body {
423 let body_str = String::from_utf8_lossy(body_bytes);
424 body_str.contains(search_value)
425 } else {
426 false
427 }
428 }
429 _ => false,
430 }
431 } else {
432 tracing::warn!("Unknown custom matcher expression format: {}", expr);
434 false
435 }
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441 pub uptime_seconds: u64,
443 pub total_requests: u64,
445 pub active_mocks: usize,
447 pub enabled_mocks: usize,
449 pub registered_routes: usize,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456 pub version: String,
458 pub port: u16,
460 pub has_openapi_spec: bool,
462 #[serde(skip_serializing_if = "Option::is_none")]
464 pub spec_path: Option<String>,
465}
466
467#[derive(Clone)]
469pub struct ManagementState {
470 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472 pub spec: Option<Arc<OpenApiSpec>>,
474 pub spec_path: Option<String>,
476 pub port: u16,
478 pub start_time: std::time::Instant,
480 pub request_counter: Arc<RwLock<u64>>,
482 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484 #[cfg(feature = "smtp")]
486 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487 #[cfg(feature = "mqtt")]
489 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490 #[cfg(feature = "kafka")]
492 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493 #[cfg(any(feature = "mqtt", feature = "kafka"))]
495 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496 pub state_machine_manager:
498 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503 pub rule_explanations: Arc<
505 RwLock<
506 std::collections::HashMap<
507 String,
508 mockforge_core::intelligent_behavior::RuleExplanation,
509 >,
510 >,
511 >,
512 #[cfg(feature = "chaos")]
514 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
515 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
517}
518
519impl ManagementState {
520 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
527 Self {
528 mocks: Arc::new(RwLock::new(Vec::new())),
529 spec,
530 spec_path,
531 port,
532 start_time: std::time::Instant::now(),
533 request_counter: Arc::new(RwLock::new(0)),
534 proxy_config: None,
535 #[cfg(feature = "smtp")]
536 smtp_registry: None,
537 #[cfg(feature = "mqtt")]
538 mqtt_broker: None,
539 #[cfg(feature = "kafka")]
540 kafka_broker: None,
541 #[cfg(any(feature = "mqtt", feature = "kafka"))]
542 message_events: {
543 let (tx, _) = broadcast::channel(1000);
544 Arc::new(tx)
545 },
546 state_machine_manager: Arc::new(RwLock::new(
547 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
548 )),
549 ws_broadcast: None,
550 lifecycle_hooks: None,
551 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
552 #[cfg(feature = "chaos")]
553 chaos_api_state: None,
554 server_config: None,
555 }
556 }
557
558 pub fn with_lifecycle_hooks(
560 mut self,
561 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
562 ) -> Self {
563 self.lifecycle_hooks = Some(hooks);
564 self
565 }
566
567 pub fn with_ws_broadcast(
569 mut self,
570 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
571 ) -> Self {
572 self.ws_broadcast = Some(ws_broadcast);
573 self
574 }
575
576 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
578 self.proxy_config = Some(proxy_config);
579 self
580 }
581
582 #[cfg(feature = "smtp")]
583 pub fn with_smtp_registry(
585 mut self,
586 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
587 ) -> Self {
588 self.smtp_registry = Some(smtp_registry);
589 self
590 }
591
592 #[cfg(feature = "mqtt")]
593 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
595 self.mqtt_broker = Some(mqtt_broker);
596 self
597 }
598
599 #[cfg(feature = "kafka")]
600 pub fn with_kafka_broker(
602 mut self,
603 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
604 ) -> Self {
605 self.kafka_broker = Some(kafka_broker);
606 self
607 }
608
609 #[cfg(feature = "chaos")]
610 pub fn with_chaos_api_state(
612 mut self,
613 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
614 ) -> Self {
615 self.chaos_api_state = Some(chaos_api_state);
616 self
617 }
618
619 pub fn with_server_config(
621 mut self,
622 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
623 ) -> Self {
624 self.server_config = Some(server_config);
625 self
626 }
627}
628
629async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
631 let mocks = state.mocks.read().await;
632 Json(serde_json::json!({
633 "mocks": *mocks,
634 "total": mocks.len(),
635 "enabled": mocks.iter().filter(|m| m.enabled).count()
636 }))
637}
638
639async fn get_mock(
641 State(state): State<ManagementState>,
642 Path(id): Path<String>,
643) -> Result<Json<MockConfig>, StatusCode> {
644 let mocks = state.mocks.read().await;
645 mocks
646 .iter()
647 .find(|m| m.id == id)
648 .cloned()
649 .map(Json)
650 .ok_or(StatusCode::NOT_FOUND)
651}
652
653async fn create_mock(
655 State(state): State<ManagementState>,
656 Json(mut mock): Json<MockConfig>,
657) -> Result<Json<MockConfig>, StatusCode> {
658 let mut mocks = state.mocks.write().await;
659
660 if mock.id.is_empty() {
662 mock.id = uuid::Uuid::new_v4().to_string();
663 }
664
665 if mocks.iter().any(|m| m.id == mock.id) {
667 return Err(StatusCode::CONFLICT);
668 }
669
670 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
671
672 if let Some(hooks) = &state.lifecycle_hooks {
674 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
675 id: mock.id.clone(),
676 name: mock.name.clone(),
677 config: serde_json::to_value(&mock).unwrap_or_default(),
678 };
679 hooks.invoke_mock_created(&event).await;
680 }
681
682 mocks.push(mock.clone());
683
684 if let Some(tx) = &state.ws_broadcast {
686 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
687 }
688
689 Ok(Json(mock))
690}
691
692async fn update_mock(
694 State(state): State<ManagementState>,
695 Path(id): Path<String>,
696 Json(updated_mock): Json<MockConfig>,
697) -> Result<Json<MockConfig>, StatusCode> {
698 let mut mocks = state.mocks.write().await;
699
700 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
701
702 let old_mock = mocks[position].clone();
704
705 info!("Updating mock: {}", id);
706 mocks[position] = updated_mock.clone();
707
708 if let Some(hooks) = &state.lifecycle_hooks {
710 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
711 id: updated_mock.id.clone(),
712 name: updated_mock.name.clone(),
713 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
714 };
715 hooks.invoke_mock_updated(&event).await;
716
717 if old_mock.enabled != updated_mock.enabled {
719 let state_event = if updated_mock.enabled {
720 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
721 id: updated_mock.id.clone(),
722 }
723 } else {
724 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
725 id: updated_mock.id.clone(),
726 }
727 };
728 hooks.invoke_mock_state_changed(&state_event).await;
729 }
730 }
731
732 if let Some(tx) = &state.ws_broadcast {
734 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
735 }
736
737 Ok(Json(updated_mock))
738}
739
740async fn delete_mock(
742 State(state): State<ManagementState>,
743 Path(id): Path<String>,
744) -> Result<StatusCode, StatusCode> {
745 let mut mocks = state.mocks.write().await;
746
747 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
748
749 let deleted_mock = mocks[position].clone();
751
752 info!("Deleting mock: {}", id);
753 mocks.remove(position);
754
755 if let Some(hooks) = &state.lifecycle_hooks {
757 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
758 id: deleted_mock.id.clone(),
759 name: deleted_mock.name.clone(),
760 };
761 hooks.invoke_mock_deleted(&event).await;
762 }
763
764 if let Some(tx) = &state.ws_broadcast {
766 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
767 }
768
769 Ok(StatusCode::NO_CONTENT)
770}
771
772#[derive(Debug, Deserialize)]
774pub struct ValidateConfigRequest {
775 pub config: serde_json::Value,
777 #[serde(default = "default_format")]
779 pub format: String,
780}
781
782fn default_format() -> String {
783 "json".to_string()
784}
785
786async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
788 use mockforge_core::config::ServerConfig;
789
790 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
791 "yaml" | "yml" => {
792 let yaml_str = match serde_json::to_string(&request.config) {
793 Ok(s) => s,
794 Err(e) => {
795 return (
796 StatusCode::BAD_REQUEST,
797 Json(serde_json::json!({
798 "valid": false,
799 "error": format!("Failed to convert to string: {}", e),
800 "message": "Configuration validation failed"
801 })),
802 )
803 .into_response();
804 }
805 };
806 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
807 }
808 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
809 };
810
811 match config_result {
812 Ok(_) => Json(serde_json::json!({
813 "valid": true,
814 "message": "Configuration is valid"
815 }))
816 .into_response(),
817 Err(e) => (
818 StatusCode::BAD_REQUEST,
819 Json(serde_json::json!({
820 "valid": false,
821 "error": format!("Invalid configuration: {}", e),
822 "message": "Configuration validation failed"
823 })),
824 )
825 .into_response(),
826 }
827}
828
829#[derive(Debug, Deserialize)]
831pub struct BulkConfigUpdateRequest {
832 pub updates: serde_json::Value,
834}
835
836async fn bulk_update_config(
844 State(state): State<ManagementState>,
845 Json(request): Json<BulkConfigUpdateRequest>,
846) -> impl IntoResponse {
847 if !request.updates.is_object() {
849 return (
850 StatusCode::BAD_REQUEST,
851 Json(serde_json::json!({
852 "error": "Invalid request",
853 "message": "Updates must be a JSON object"
854 })),
855 )
856 .into_response();
857 }
858
859 use mockforge_core::config::ServerConfig;
861
862 let base_config = ServerConfig::default();
864 let base_json = match serde_json::to_value(&base_config) {
865 Ok(v) => v,
866 Err(e) => {
867 return (
868 StatusCode::INTERNAL_SERVER_ERROR,
869 Json(serde_json::json!({
870 "error": "Internal error",
871 "message": format!("Failed to serialize base config: {}", e)
872 })),
873 )
874 .into_response();
875 }
876 };
877
878 let mut merged = base_json.clone();
880 if let (Some(merged_obj), Some(updates_obj)) =
881 (merged.as_object_mut(), request.updates.as_object())
882 {
883 for (key, value) in updates_obj {
884 merged_obj.insert(key.clone(), value.clone());
885 }
886 }
887
888 match serde_json::from_value::<ServerConfig>(merged) {
890 Ok(_) => {
891 Json(serde_json::json!({
894 "success": true,
895 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState.",
896 "updates_received": request.updates,
897 "validated": true
898 }))
899 .into_response()
900 }
901 Err(e) => (
902 StatusCode::BAD_REQUEST,
903 Json(serde_json::json!({
904 "error": "Invalid configuration",
905 "message": format!("Configuration validation failed: {}", e),
906 "validated": false
907 })),
908 )
909 .into_response(),
910 }
911}
912
913async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
915 let mocks = state.mocks.read().await;
916 let request_count = *state.request_counter.read().await;
917
918 Json(ServerStats {
919 uptime_seconds: state.start_time.elapsed().as_secs(),
920 total_requests: request_count,
921 active_mocks: mocks.len(),
922 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
923 registered_routes: mocks.len(), })
925}
926
927async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
929 Json(ServerConfig {
930 version: env!("CARGO_PKG_VERSION").to_string(),
931 port: state.port,
932 has_openapi_spec: state.spec.is_some(),
933 spec_path: state.spec_path.clone(),
934 })
935}
936
937async fn health_check() -> Json<serde_json::Value> {
939 Json(serde_json::json!({
940 "status": "healthy",
941 "service": "mockforge-management",
942 "timestamp": chrono::Utc::now().to_rfc3339()
943 }))
944}
945
946#[derive(Debug, Clone, Serialize, Deserialize)]
948#[serde(rename_all = "lowercase")]
949pub enum ExportFormat {
950 Json,
952 Yaml,
954}
955
956async fn export_mocks(
958 State(state): State<ManagementState>,
959 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
960) -> Result<(StatusCode, String), StatusCode> {
961 let mocks = state.mocks.read().await;
962
963 let format = params
964 .get("format")
965 .map(|f| match f.as_str() {
966 "yaml" | "yml" => ExportFormat::Yaml,
967 _ => ExportFormat::Json,
968 })
969 .unwrap_or(ExportFormat::Json);
970
971 match format {
972 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
973 .map(|json| (StatusCode::OK, json))
974 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
975 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
976 .map(|yaml| (StatusCode::OK, yaml))
977 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
978 }
979}
980
981async fn import_mocks(
983 State(state): State<ManagementState>,
984 Json(mocks): Json<Vec<MockConfig>>,
985) -> impl IntoResponse {
986 let mut current_mocks = state.mocks.write().await;
987 current_mocks.clear();
988 current_mocks.extend(mocks);
989 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
990}
991
992#[cfg(feature = "smtp")]
993async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
995 if let Some(ref smtp_registry) = state.smtp_registry {
996 match smtp_registry.get_emails() {
997 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
998 Err(e) => (
999 StatusCode::INTERNAL_SERVER_ERROR,
1000 Json(serde_json::json!({
1001 "error": "Failed to retrieve emails",
1002 "message": e.to_string()
1003 })),
1004 ),
1005 }
1006 } else {
1007 (
1008 StatusCode::NOT_IMPLEMENTED,
1009 Json(serde_json::json!({
1010 "error": "SMTP mailbox management not available",
1011 "message": "SMTP server is not enabled or registry not available."
1012 })),
1013 )
1014 }
1015}
1016
1017#[cfg(feature = "smtp")]
1019async fn get_smtp_email(
1020 State(state): State<ManagementState>,
1021 Path(id): Path<String>,
1022) -> impl IntoResponse {
1023 if let Some(ref smtp_registry) = state.smtp_registry {
1024 match smtp_registry.get_email_by_id(&id) {
1025 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1026 Ok(None) => (
1027 StatusCode::NOT_FOUND,
1028 Json(serde_json::json!({
1029 "error": "Email not found",
1030 "id": id
1031 })),
1032 ),
1033 Err(e) => (
1034 StatusCode::INTERNAL_SERVER_ERROR,
1035 Json(serde_json::json!({
1036 "error": "Failed to retrieve email",
1037 "message": e.to_string()
1038 })),
1039 ),
1040 }
1041 } else {
1042 (
1043 StatusCode::NOT_IMPLEMENTED,
1044 Json(serde_json::json!({
1045 "error": "SMTP mailbox management not available",
1046 "message": "SMTP server is not enabled or registry not available."
1047 })),
1048 )
1049 }
1050}
1051
1052#[cfg(feature = "smtp")]
1054async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1055 if let Some(ref smtp_registry) = state.smtp_registry {
1056 match smtp_registry.clear_mailbox() {
1057 Ok(()) => (
1058 StatusCode::OK,
1059 Json(serde_json::json!({
1060 "message": "Mailbox cleared successfully"
1061 })),
1062 ),
1063 Err(e) => (
1064 StatusCode::INTERNAL_SERVER_ERROR,
1065 Json(serde_json::json!({
1066 "error": "Failed to clear mailbox",
1067 "message": e.to_string()
1068 })),
1069 ),
1070 }
1071 } else {
1072 (
1073 StatusCode::NOT_IMPLEMENTED,
1074 Json(serde_json::json!({
1075 "error": "SMTP mailbox management not available",
1076 "message": "SMTP server is not enabled or registry not available."
1077 })),
1078 )
1079 }
1080}
1081
1082#[cfg(feature = "smtp")]
1084async fn export_smtp_mailbox(
1085 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1086) -> impl IntoResponse {
1087 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1088 (
1089 StatusCode::NOT_IMPLEMENTED,
1090 Json(serde_json::json!({
1091 "error": "SMTP mailbox management not available via HTTP API",
1092 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1093 "requested_format": format
1094 })),
1095 )
1096}
1097
1098#[cfg(feature = "smtp")]
1100async fn search_smtp_emails(
1101 State(state): State<ManagementState>,
1102 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1103) -> impl IntoResponse {
1104 if let Some(ref smtp_registry) = state.smtp_registry {
1105 let filters = EmailSearchFilters {
1106 sender: params.get("sender").cloned(),
1107 recipient: params.get("recipient").cloned(),
1108 subject: params.get("subject").cloned(),
1109 body: params.get("body").cloned(),
1110 since: params
1111 .get("since")
1112 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1113 .map(|dt| dt.with_timezone(&chrono::Utc)),
1114 until: params
1115 .get("until")
1116 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1117 .map(|dt| dt.with_timezone(&chrono::Utc)),
1118 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1119 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1120 };
1121
1122 match smtp_registry.search_emails(filters) {
1123 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1124 Err(e) => (
1125 StatusCode::INTERNAL_SERVER_ERROR,
1126 Json(serde_json::json!({
1127 "error": "Failed to search emails",
1128 "message": e.to_string()
1129 })),
1130 ),
1131 }
1132 } else {
1133 (
1134 StatusCode::NOT_IMPLEMENTED,
1135 Json(serde_json::json!({
1136 "error": "SMTP mailbox management not available",
1137 "message": "SMTP server is not enabled or registry not available."
1138 })),
1139 )
1140 }
1141}
1142
1143#[cfg(feature = "mqtt")]
1145#[derive(Debug, Clone, Serialize, Deserialize)]
1146pub struct MqttBrokerStats {
1147 pub connected_clients: usize,
1149 pub active_topics: usize,
1151 pub retained_messages: usize,
1153 pub total_subscriptions: usize,
1155}
1156
1157#[cfg(feature = "mqtt")]
1159async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1160 if let Some(broker) = &state.mqtt_broker {
1161 let connected_clients = broker.get_connected_clients().await.len();
1162 let active_topics = broker.get_active_topics().await.len();
1163 let stats = broker.get_topic_stats().await;
1164
1165 let broker_stats = MqttBrokerStats {
1166 connected_clients,
1167 active_topics,
1168 retained_messages: stats.retained_messages,
1169 total_subscriptions: stats.total_subscriptions,
1170 };
1171
1172 Json(broker_stats).into_response()
1173 } else {
1174 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1175 }
1176}
1177
1178#[cfg(feature = "mqtt")]
1179async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1180 if let Some(broker) = &state.mqtt_broker {
1181 let clients = broker.get_connected_clients().await;
1182 Json(serde_json::json!({
1183 "clients": clients
1184 }))
1185 .into_response()
1186 } else {
1187 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1188 }
1189}
1190
1191#[cfg(feature = "mqtt")]
1192async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1193 if let Some(broker) = &state.mqtt_broker {
1194 let topics = broker.get_active_topics().await;
1195 Json(serde_json::json!({
1196 "topics": topics
1197 }))
1198 .into_response()
1199 } else {
1200 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1201 }
1202}
1203
1204#[cfg(feature = "mqtt")]
1205async fn disconnect_mqtt_client(
1206 State(state): State<ManagementState>,
1207 Path(client_id): Path<String>,
1208) -> impl IntoResponse {
1209 if let Some(broker) = &state.mqtt_broker {
1210 match broker.disconnect_client(&client_id).await {
1211 Ok(_) => {
1212 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1213 }
1214 Err(e) => {
1215 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1216 .into_response()
1217 }
1218 }
1219 } else {
1220 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1221 }
1222}
1223
1224#[cfg(feature = "mqtt")]
1227#[derive(Debug, Deserialize)]
1229pub struct MqttPublishRequest {
1230 pub topic: String,
1232 pub payload: String,
1234 #[serde(default = "default_qos")]
1236 pub qos: u8,
1237 #[serde(default)]
1239 pub retain: bool,
1240}
1241
1242#[cfg(feature = "mqtt")]
1243fn default_qos() -> u8 {
1244 0
1245}
1246
1247#[cfg(feature = "mqtt")]
1248async fn publish_mqtt_message_handler(
1250 State(state): State<ManagementState>,
1251 Json(request): Json<serde_json::Value>,
1252) -> impl IntoResponse {
1253 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1255 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1256 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1257 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1258
1259 if topic.is_none() || payload.is_none() {
1260 return (
1261 StatusCode::BAD_REQUEST,
1262 Json(serde_json::json!({
1263 "error": "Invalid request",
1264 "message": "Missing required fields: topic and payload"
1265 })),
1266 );
1267 }
1268
1269 let topic = topic.unwrap();
1270 let payload = payload.unwrap();
1271
1272 if let Some(broker) = &state.mqtt_broker {
1273 if qos > 2 {
1275 return (
1276 StatusCode::BAD_REQUEST,
1277 Json(serde_json::json!({
1278 "error": "Invalid QoS",
1279 "message": "QoS must be 0, 1, or 2"
1280 })),
1281 );
1282 }
1283
1284 let payload_bytes = payload.as_bytes().to_vec();
1286 let client_id = "mockforge-management-api".to_string();
1287
1288 let publish_result = broker
1289 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1290 .await
1291 .map_err(|e| format!("{}", e));
1292
1293 match publish_result {
1294 Ok(_) => {
1295 let event = MessageEvent::Mqtt(MqttMessageEvent {
1297 topic: topic.clone(),
1298 payload: payload.clone(),
1299 qos,
1300 retain,
1301 timestamp: chrono::Utc::now().to_rfc3339(),
1302 });
1303 let _ = state.message_events.send(event);
1304
1305 (
1306 StatusCode::OK,
1307 Json(serde_json::json!({
1308 "success": true,
1309 "message": format!("Message published to topic '{}'", topic),
1310 "topic": topic,
1311 "qos": qos,
1312 "retain": retain
1313 })),
1314 )
1315 }
1316 Err(error_msg) => (
1317 StatusCode::INTERNAL_SERVER_ERROR,
1318 Json(serde_json::json!({
1319 "error": "Failed to publish message",
1320 "message": error_msg
1321 })),
1322 ),
1323 }
1324 } else {
1325 (
1326 StatusCode::SERVICE_UNAVAILABLE,
1327 Json(serde_json::json!({
1328 "error": "MQTT broker not available",
1329 "message": "MQTT broker is not enabled or not available."
1330 })),
1331 )
1332 }
1333}
1334
1335#[cfg(not(feature = "mqtt"))]
1336async fn publish_mqtt_message_handler(
1338 State(_state): State<ManagementState>,
1339 Json(_request): Json<serde_json::Value>,
1340) -> impl IntoResponse {
1341 (
1342 StatusCode::SERVICE_UNAVAILABLE,
1343 Json(serde_json::json!({
1344 "error": "MQTT feature not enabled",
1345 "message": "MQTT support is not compiled into this build"
1346 })),
1347 )
1348}
1349
1350#[cfg(feature = "mqtt")]
1351#[derive(Debug, Deserialize)]
1353pub struct MqttBatchPublishRequest {
1354 pub messages: Vec<MqttPublishRequest>,
1356 #[serde(default = "default_delay")]
1358 pub delay_ms: u64,
1359}
1360
1361#[cfg(feature = "mqtt")]
1362fn default_delay() -> u64 {
1363 100
1364}
1365
1366#[cfg(feature = "mqtt")]
1367async fn publish_mqtt_batch_handler(
1369 State(state): State<ManagementState>,
1370 Json(request): Json<serde_json::Value>,
1371) -> impl IntoResponse {
1372 let messages_json = request.get("messages").and_then(|v| v.as_array());
1374 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1375
1376 if messages_json.is_none() {
1377 return (
1378 StatusCode::BAD_REQUEST,
1379 Json(serde_json::json!({
1380 "error": "Invalid request",
1381 "message": "Missing required field: messages"
1382 })),
1383 );
1384 }
1385
1386 let messages_json = messages_json.unwrap();
1387
1388 if let Some(broker) = &state.mqtt_broker {
1389 if messages_json.is_empty() {
1390 return (
1391 StatusCode::BAD_REQUEST,
1392 Json(serde_json::json!({
1393 "error": "Empty batch",
1394 "message": "At least one message is required"
1395 })),
1396 );
1397 }
1398
1399 let mut results = Vec::new();
1400 let client_id = "mockforge-management-api".to_string();
1401
1402 for (index, msg_json) in messages_json.iter().enumerate() {
1403 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1404 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1405 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1406 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1407
1408 if topic.is_none() || payload.is_none() {
1409 results.push(serde_json::json!({
1410 "index": index,
1411 "success": false,
1412 "error": "Missing required fields: topic and payload"
1413 }));
1414 continue;
1415 }
1416
1417 let topic = topic.unwrap();
1418 let payload = payload.unwrap();
1419
1420 if qos > 2 {
1422 results.push(serde_json::json!({
1423 "index": index,
1424 "success": false,
1425 "error": "Invalid QoS (must be 0, 1, or 2)"
1426 }));
1427 continue;
1428 }
1429
1430 let payload_bytes = payload.as_bytes().to_vec();
1432
1433 let publish_result = broker
1434 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1435 .await
1436 .map_err(|e| format!("{}", e));
1437
1438 match publish_result {
1439 Ok(_) => {
1440 let event = MessageEvent::Mqtt(MqttMessageEvent {
1442 topic: topic.clone(),
1443 payload: payload.clone(),
1444 qos,
1445 retain,
1446 timestamp: chrono::Utc::now().to_rfc3339(),
1447 });
1448 let _ = state.message_events.send(event);
1449
1450 results.push(serde_json::json!({
1451 "index": index,
1452 "success": true,
1453 "topic": topic,
1454 "qos": qos
1455 }));
1456 }
1457 Err(error_msg) => {
1458 results.push(serde_json::json!({
1459 "index": index,
1460 "success": false,
1461 "error": error_msg
1462 }));
1463 }
1464 }
1465
1466 if index < messages_json.len() - 1 && delay_ms > 0 {
1468 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1469 }
1470 }
1471
1472 let success_count =
1473 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1474
1475 (
1476 StatusCode::OK,
1477 Json(serde_json::json!({
1478 "success": true,
1479 "total": messages_json.len(),
1480 "succeeded": success_count,
1481 "failed": messages_json.len() - success_count,
1482 "results": results
1483 })),
1484 )
1485 } else {
1486 (
1487 StatusCode::SERVICE_UNAVAILABLE,
1488 Json(serde_json::json!({
1489 "error": "MQTT broker not available",
1490 "message": "MQTT broker is not enabled or not available."
1491 })),
1492 )
1493 }
1494}
1495
1496#[cfg(not(feature = "mqtt"))]
1497async fn publish_mqtt_batch_handler(
1499 State(_state): State<ManagementState>,
1500 Json(_request): Json<serde_json::Value>,
1501) -> impl IntoResponse {
1502 (
1503 StatusCode::SERVICE_UNAVAILABLE,
1504 Json(serde_json::json!({
1505 "error": "MQTT feature not enabled",
1506 "message": "MQTT support is not compiled into this build"
1507 })),
1508 )
1509}
1510
1511#[derive(Debug, Deserialize)]
1515struct SetMigrationModeRequest {
1516 mode: String,
1517}
1518
1519async fn get_migration_routes(
1521 State(state): State<ManagementState>,
1522) -> Result<Json<serde_json::Value>, StatusCode> {
1523 let proxy_config = match &state.proxy_config {
1524 Some(config) => config,
1525 None => {
1526 return Ok(Json(serde_json::json!({
1527 "error": "Migration not configured. Proxy config not available."
1528 })));
1529 }
1530 };
1531
1532 let config = proxy_config.read().await;
1533 let routes = config.get_migration_routes();
1534
1535 Ok(Json(serde_json::json!({
1536 "routes": routes
1537 })))
1538}
1539
1540async fn toggle_route_migration(
1542 State(state): State<ManagementState>,
1543 Path(pattern): Path<String>,
1544) -> Result<Json<serde_json::Value>, StatusCode> {
1545 let proxy_config = match &state.proxy_config {
1546 Some(config) => config,
1547 None => {
1548 return Ok(Json(serde_json::json!({
1549 "error": "Migration not configured. Proxy config not available."
1550 })));
1551 }
1552 };
1553
1554 let mut config = proxy_config.write().await;
1555 let new_mode = match config.toggle_route_migration(&pattern) {
1556 Some(mode) => mode,
1557 None => {
1558 return Ok(Json(serde_json::json!({
1559 "error": format!("Route pattern not found: {}", pattern)
1560 })));
1561 }
1562 };
1563
1564 Ok(Json(serde_json::json!({
1565 "pattern": pattern,
1566 "mode": format!("{:?}", new_mode).to_lowercase()
1567 })))
1568}
1569
1570async fn set_route_migration_mode(
1572 State(state): State<ManagementState>,
1573 Path(pattern): Path<String>,
1574 Json(request): Json<SetMigrationModeRequest>,
1575) -> Result<Json<serde_json::Value>, StatusCode> {
1576 let proxy_config = match &state.proxy_config {
1577 Some(config) => config,
1578 None => {
1579 return Ok(Json(serde_json::json!({
1580 "error": "Migration not configured. Proxy config not available."
1581 })));
1582 }
1583 };
1584
1585 use mockforge_core::proxy::config::MigrationMode;
1586 let mode = match request.mode.to_lowercase().as_str() {
1587 "mock" => MigrationMode::Mock,
1588 "shadow" => MigrationMode::Shadow,
1589 "real" => MigrationMode::Real,
1590 "auto" => MigrationMode::Auto,
1591 _ => {
1592 return Ok(Json(serde_json::json!({
1593 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1594 })));
1595 }
1596 };
1597
1598 let mut config = proxy_config.write().await;
1599 let updated = config.update_rule_migration_mode(&pattern, mode);
1600
1601 if !updated {
1602 return Ok(Json(serde_json::json!({
1603 "error": format!("Route pattern not found: {}", pattern)
1604 })));
1605 }
1606
1607 Ok(Json(serde_json::json!({
1608 "pattern": pattern,
1609 "mode": format!("{:?}", mode).to_lowercase()
1610 })))
1611}
1612
1613async fn toggle_group_migration(
1615 State(state): State<ManagementState>,
1616 Path(group): Path<String>,
1617) -> Result<Json<serde_json::Value>, StatusCode> {
1618 let proxy_config = match &state.proxy_config {
1619 Some(config) => config,
1620 None => {
1621 return Ok(Json(serde_json::json!({
1622 "error": "Migration not configured. Proxy config not available."
1623 })));
1624 }
1625 };
1626
1627 let mut config = proxy_config.write().await;
1628 let new_mode = config.toggle_group_migration(&group);
1629
1630 Ok(Json(serde_json::json!({
1631 "group": group,
1632 "mode": format!("{:?}", new_mode).to_lowercase()
1633 })))
1634}
1635
1636async fn set_group_migration_mode(
1638 State(state): State<ManagementState>,
1639 Path(group): Path<String>,
1640 Json(request): Json<SetMigrationModeRequest>,
1641) -> Result<Json<serde_json::Value>, StatusCode> {
1642 let proxy_config = match &state.proxy_config {
1643 Some(config) => config,
1644 None => {
1645 return Ok(Json(serde_json::json!({
1646 "error": "Migration not configured. Proxy config not available."
1647 })));
1648 }
1649 };
1650
1651 use mockforge_core::proxy::config::MigrationMode;
1652 let mode = match request.mode.to_lowercase().as_str() {
1653 "mock" => MigrationMode::Mock,
1654 "shadow" => MigrationMode::Shadow,
1655 "real" => MigrationMode::Real,
1656 "auto" => MigrationMode::Auto,
1657 _ => {
1658 return Ok(Json(serde_json::json!({
1659 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1660 })));
1661 }
1662 };
1663
1664 let mut config = proxy_config.write().await;
1665 config.update_group_migration_mode(&group, mode);
1666
1667 Ok(Json(serde_json::json!({
1668 "group": group,
1669 "mode": format!("{:?}", mode).to_lowercase()
1670 })))
1671}
1672
1673async fn get_migration_groups(
1675 State(state): State<ManagementState>,
1676) -> Result<Json<serde_json::Value>, StatusCode> {
1677 let proxy_config = match &state.proxy_config {
1678 Some(config) => config,
1679 None => {
1680 return Ok(Json(serde_json::json!({
1681 "error": "Migration not configured. Proxy config not available."
1682 })));
1683 }
1684 };
1685
1686 let config = proxy_config.read().await;
1687 let groups = config.get_migration_groups();
1688
1689 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1691 .into_iter()
1692 .map(|(name, info)| {
1693 (
1694 name,
1695 serde_json::json!({
1696 "name": info.name,
1697 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1698 "route_count": info.route_count
1699 }),
1700 )
1701 })
1702 .collect();
1703
1704 Ok(Json(serde_json::json!(groups_json)))
1705}
1706
1707async fn get_migration_status(
1709 State(state): State<ManagementState>,
1710) -> Result<Json<serde_json::Value>, StatusCode> {
1711 let proxy_config = match &state.proxy_config {
1712 Some(config) => config,
1713 None => {
1714 return Ok(Json(serde_json::json!({
1715 "error": "Migration not configured. Proxy config not available."
1716 })));
1717 }
1718 };
1719
1720 let config = proxy_config.read().await;
1721 let routes = config.get_migration_routes();
1722 let groups = config.get_migration_groups();
1723
1724 let mut mock_count = 0;
1725 let mut shadow_count = 0;
1726 let mut real_count = 0;
1727 let mut auto_count = 0;
1728
1729 for route in &routes {
1730 match route.migration_mode {
1731 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1732 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1733 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1734 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1735 }
1736 }
1737
1738 Ok(Json(serde_json::json!({
1739 "total_routes": routes.len(),
1740 "mock_routes": mock_count,
1741 "shadow_routes": shadow_count,
1742 "real_routes": real_count,
1743 "auto_routes": auto_count,
1744 "total_groups": groups.len(),
1745 "migration_enabled": config.migration_enabled
1746 })))
1747}
1748
1749#[derive(Debug, Deserialize, Serialize)]
1753pub struct ProxyRuleRequest {
1754 pub pattern: String,
1756 #[serde(rename = "type")]
1758 pub rule_type: String,
1759 #[serde(default)]
1761 pub status_codes: Vec<u16>,
1762 pub body_transforms: Vec<BodyTransformRequest>,
1764 #[serde(default = "default_true")]
1766 pub enabled: bool,
1767}
1768
1769#[derive(Debug, Deserialize, Serialize)]
1771pub struct BodyTransformRequest {
1772 pub path: String,
1774 pub replace: String,
1776 #[serde(default)]
1778 pub operation: String,
1779}
1780
1781#[derive(Debug, Serialize)]
1783pub struct ProxyRuleResponse {
1784 pub id: usize,
1786 pub pattern: String,
1788 #[serde(rename = "type")]
1790 pub rule_type: String,
1791 pub status_codes: Vec<u16>,
1793 pub body_transforms: Vec<BodyTransformRequest>,
1795 pub enabled: bool,
1797}
1798
1799async fn list_proxy_rules(
1801 State(state): State<ManagementState>,
1802) -> Result<Json<serde_json::Value>, StatusCode> {
1803 let proxy_config = match &state.proxy_config {
1804 Some(config) => config,
1805 None => {
1806 return Ok(Json(serde_json::json!({
1807 "error": "Proxy not configured. Proxy config not available."
1808 })));
1809 }
1810 };
1811
1812 let config = proxy_config.read().await;
1813
1814 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1815
1816 for (idx, rule) in config.request_replacements.iter().enumerate() {
1818 rules.push(ProxyRuleResponse {
1819 id: idx,
1820 pattern: rule.pattern.clone(),
1821 rule_type: "request".to_string(),
1822 status_codes: Vec::new(),
1823 body_transforms: rule
1824 .body_transforms
1825 .iter()
1826 .map(|t| BodyTransformRequest {
1827 path: t.path.clone(),
1828 replace: t.replace.clone(),
1829 operation: format!("{:?}", t.operation).to_lowercase(),
1830 })
1831 .collect(),
1832 enabled: rule.enabled,
1833 });
1834 }
1835
1836 let request_count = config.request_replacements.len();
1838 for (idx, rule) in config.response_replacements.iter().enumerate() {
1839 rules.push(ProxyRuleResponse {
1840 id: request_count + idx,
1841 pattern: rule.pattern.clone(),
1842 rule_type: "response".to_string(),
1843 status_codes: rule.status_codes.clone(),
1844 body_transforms: rule
1845 .body_transforms
1846 .iter()
1847 .map(|t| BodyTransformRequest {
1848 path: t.path.clone(),
1849 replace: t.replace.clone(),
1850 operation: format!("{:?}", t.operation).to_lowercase(),
1851 })
1852 .collect(),
1853 enabled: rule.enabled,
1854 });
1855 }
1856
1857 Ok(Json(serde_json::json!({
1858 "rules": rules
1859 })))
1860}
1861
1862async fn create_proxy_rule(
1864 State(state): State<ManagementState>,
1865 Json(request): Json<ProxyRuleRequest>,
1866) -> Result<Json<serde_json::Value>, StatusCode> {
1867 let proxy_config = match &state.proxy_config {
1868 Some(config) => config,
1869 None => {
1870 return Ok(Json(serde_json::json!({
1871 "error": "Proxy not configured. Proxy config not available."
1872 })));
1873 }
1874 };
1875
1876 if request.body_transforms.is_empty() {
1878 return Ok(Json(serde_json::json!({
1879 "error": "At least one body transform is required"
1880 })));
1881 }
1882
1883 let body_transforms: Vec<BodyTransform> = request
1884 .body_transforms
1885 .iter()
1886 .map(|t| {
1887 let op = match t.operation.as_str() {
1888 "replace" => TransformOperation::Replace,
1889 "add" => TransformOperation::Add,
1890 "remove" => TransformOperation::Remove,
1891 _ => TransformOperation::Replace,
1892 };
1893 BodyTransform {
1894 path: t.path.clone(),
1895 replace: t.replace.clone(),
1896 operation: op,
1897 }
1898 })
1899 .collect();
1900
1901 let new_rule = BodyTransformRule {
1902 pattern: request.pattern.clone(),
1903 status_codes: request.status_codes.clone(),
1904 body_transforms,
1905 enabled: request.enabled,
1906 };
1907
1908 let mut config = proxy_config.write().await;
1909
1910 let rule_id = if request.rule_type == "request" {
1911 config.request_replacements.push(new_rule);
1912 config.request_replacements.len() - 1
1913 } else if request.rule_type == "response" {
1914 config.response_replacements.push(new_rule);
1915 config.request_replacements.len() + config.response_replacements.len() - 1
1916 } else {
1917 return Ok(Json(serde_json::json!({
1918 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1919 })));
1920 };
1921
1922 Ok(Json(serde_json::json!({
1923 "id": rule_id,
1924 "message": "Rule created successfully"
1925 })))
1926}
1927
1928async fn get_proxy_rule(
1930 State(state): State<ManagementState>,
1931 Path(id): Path<String>,
1932) -> Result<Json<serde_json::Value>, StatusCode> {
1933 let proxy_config = match &state.proxy_config {
1934 Some(config) => config,
1935 None => {
1936 return Ok(Json(serde_json::json!({
1937 "error": "Proxy not configured. Proxy config not available."
1938 })));
1939 }
1940 };
1941
1942 let config = proxy_config.read().await;
1943 let rule_id: usize = match id.parse() {
1944 Ok(id) => id,
1945 Err(_) => {
1946 return Ok(Json(serde_json::json!({
1947 "error": format!("Invalid rule ID: {}", id)
1948 })));
1949 }
1950 };
1951
1952 let request_count = config.request_replacements.len();
1953
1954 if rule_id < request_count {
1955 let rule = &config.request_replacements[rule_id];
1957 Ok(Json(serde_json::json!({
1958 "id": rule_id,
1959 "pattern": rule.pattern,
1960 "type": "request",
1961 "status_codes": [],
1962 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1963 "path": t.path,
1964 "replace": t.replace,
1965 "operation": format!("{:?}", t.operation).to_lowercase()
1966 })).collect::<Vec<_>>(),
1967 "enabled": rule.enabled
1968 })))
1969 } else if rule_id < request_count + config.response_replacements.len() {
1970 let response_idx = rule_id - request_count;
1972 let rule = &config.response_replacements[response_idx];
1973 Ok(Json(serde_json::json!({
1974 "id": rule_id,
1975 "pattern": rule.pattern,
1976 "type": "response",
1977 "status_codes": rule.status_codes,
1978 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1979 "path": t.path,
1980 "replace": t.replace,
1981 "operation": format!("{:?}", t.operation).to_lowercase()
1982 })).collect::<Vec<_>>(),
1983 "enabled": rule.enabled
1984 })))
1985 } else {
1986 Ok(Json(serde_json::json!({
1987 "error": format!("Rule ID {} not found", rule_id)
1988 })))
1989 }
1990}
1991
1992async fn update_proxy_rule(
1994 State(state): State<ManagementState>,
1995 Path(id): Path<String>,
1996 Json(request): Json<ProxyRuleRequest>,
1997) -> Result<Json<serde_json::Value>, StatusCode> {
1998 let proxy_config = match &state.proxy_config {
1999 Some(config) => config,
2000 None => {
2001 return Ok(Json(serde_json::json!({
2002 "error": "Proxy not configured. Proxy config not available."
2003 })));
2004 }
2005 };
2006
2007 let mut config = proxy_config.write().await;
2008 let rule_id: usize = match id.parse() {
2009 Ok(id) => id,
2010 Err(_) => {
2011 return Ok(Json(serde_json::json!({
2012 "error": format!("Invalid rule ID: {}", id)
2013 })));
2014 }
2015 };
2016
2017 let body_transforms: Vec<BodyTransform> = request
2018 .body_transforms
2019 .iter()
2020 .map(|t| {
2021 let op = match t.operation.as_str() {
2022 "replace" => TransformOperation::Replace,
2023 "add" => TransformOperation::Add,
2024 "remove" => TransformOperation::Remove,
2025 _ => TransformOperation::Replace,
2026 };
2027 BodyTransform {
2028 path: t.path.clone(),
2029 replace: t.replace.clone(),
2030 operation: op,
2031 }
2032 })
2033 .collect();
2034
2035 let updated_rule = BodyTransformRule {
2036 pattern: request.pattern.clone(),
2037 status_codes: request.status_codes.clone(),
2038 body_transforms,
2039 enabled: request.enabled,
2040 };
2041
2042 let request_count = config.request_replacements.len();
2043
2044 if rule_id < request_count {
2045 config.request_replacements[rule_id] = updated_rule;
2047 } else if rule_id < request_count + config.response_replacements.len() {
2048 let response_idx = rule_id - request_count;
2050 config.response_replacements[response_idx] = updated_rule;
2051 } else {
2052 return Ok(Json(serde_json::json!({
2053 "error": format!("Rule ID {} not found", rule_id)
2054 })));
2055 }
2056
2057 Ok(Json(serde_json::json!({
2058 "id": rule_id,
2059 "message": "Rule updated successfully"
2060 })))
2061}
2062
2063async fn delete_proxy_rule(
2065 State(state): State<ManagementState>,
2066 Path(id): Path<String>,
2067) -> Result<Json<serde_json::Value>, StatusCode> {
2068 let proxy_config = match &state.proxy_config {
2069 Some(config) => config,
2070 None => {
2071 return Ok(Json(serde_json::json!({
2072 "error": "Proxy not configured. Proxy config not available."
2073 })));
2074 }
2075 };
2076
2077 let mut config = proxy_config.write().await;
2078 let rule_id: usize = match id.parse() {
2079 Ok(id) => id,
2080 Err(_) => {
2081 return Ok(Json(serde_json::json!({
2082 "error": format!("Invalid rule ID: {}", id)
2083 })));
2084 }
2085 };
2086
2087 let request_count = config.request_replacements.len();
2088
2089 if rule_id < request_count {
2090 config.request_replacements.remove(rule_id);
2092 } else if rule_id < request_count + config.response_replacements.len() {
2093 let response_idx = rule_id - request_count;
2095 config.response_replacements.remove(response_idx);
2096 } else {
2097 return Ok(Json(serde_json::json!({
2098 "error": format!("Rule ID {} not found", rule_id)
2099 })));
2100 }
2101
2102 Ok(Json(serde_json::json!({
2103 "id": rule_id,
2104 "message": "Rule deleted successfully"
2105 })))
2106}
2107
2108async fn get_proxy_inspect(
2111 State(_state): State<ManagementState>,
2112 Query(params): Query<std::collections::HashMap<String, String>>,
2113) -> Result<Json<serde_json::Value>, StatusCode> {
2114 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2115
2116 Ok(Json(serde_json::json!({
2119 "requests": [],
2120 "responses": [],
2121 "limit": limit,
2122 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic in a future version."
2123 })))
2124}
2125
2126pub fn management_router(state: ManagementState) -> Router {
2128 let router = Router::new()
2129 .route("/health", get(health_check))
2130 .route("/stats", get(get_stats))
2131 .route("/config", get(get_config))
2132 .route("/config/validate", post(validate_config))
2133 .route("/config/bulk", post(bulk_update_config))
2134 .route("/mocks", get(list_mocks))
2135 .route("/mocks", post(create_mock))
2136 .route("/mocks/{id}", get(get_mock))
2137 .route("/mocks/{id}", put(update_mock))
2138 .route("/mocks/{id}", delete(delete_mock))
2139 .route("/export", get(export_mocks))
2140 .route("/import", post(import_mocks));
2141
2142 #[cfg(feature = "smtp")]
2143 let router = router
2144 .route("/smtp/mailbox", get(list_smtp_emails))
2145 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2146 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2147 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2148 .route("/smtp/mailbox/search", get(search_smtp_emails));
2149
2150 #[cfg(not(feature = "smtp"))]
2151 let router = router;
2152
2153 #[cfg(feature = "mqtt")]
2155 let router = router
2156 .route("/mqtt/stats", get(get_mqtt_stats))
2157 .route("/mqtt/clients", get(get_mqtt_clients))
2158 .route("/mqtt/topics", get(get_mqtt_topics))
2159 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2160 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2161 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2162 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2163
2164 #[cfg(not(feature = "mqtt"))]
2165 let router = router
2166 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2167 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2168
2169 #[cfg(feature = "kafka")]
2170 let router = router
2171 .route("/kafka/stats", get(get_kafka_stats))
2172 .route("/kafka/topics", get(get_kafka_topics))
2173 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2174 .route("/kafka/groups", get(get_kafka_groups))
2175 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2176 .route("/kafka/produce", post(produce_kafka_message))
2177 .route("/kafka/produce/batch", post(produce_kafka_batch))
2178 .route("/kafka/messages/stream", get(kafka_messages_stream));
2179
2180 #[cfg(not(feature = "kafka"))]
2181 let router = router;
2182
2183 let router = router
2185 .route("/migration/routes", get(get_migration_routes))
2186 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2187 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2188 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2189 .route("/migration/groups/{group}", put(set_group_migration_mode))
2190 .route("/migration/groups", get(get_migration_groups))
2191 .route("/migration/status", get(get_migration_status));
2192
2193 let router = router
2195 .route("/proxy/rules", get(list_proxy_rules))
2196 .route("/proxy/rules", post(create_proxy_rule))
2197 .route("/proxy/rules/{id}", get(get_proxy_rule))
2198 .route("/proxy/rules/{id}", put(update_proxy_rule))
2199 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2200 .route("/proxy/inspect", get(get_proxy_inspect));
2201
2202 let router = router
2204 .route("/ai/generate-spec", post(generate_ai_spec))
2205 .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2206 .route("/mockai/learn", post(learn_from_examples))
2207 .route("/mockai/rules/explanations", get(list_rule_explanations))
2208 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2209 .route("/chaos/config", get(get_chaos_config))
2210 .route("/chaos/config", post(update_chaos_config))
2211 .route("/network/profiles", get(list_network_profiles))
2212 .route("/network/profile/apply", post(apply_network_profile));
2213
2214 let router =
2216 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2217
2218 router.with_state(state)
2219}
2220
2221#[cfg(feature = "kafka")]
2222#[derive(Debug, Clone, Serialize, Deserialize)]
2223pub struct KafkaBrokerStats {
2224 pub topics: usize,
2226 pub partitions: usize,
2228 pub consumer_groups: usize,
2230 pub messages_produced: u64,
2232 pub messages_consumed: u64,
2234}
2235
2236#[cfg(feature = "kafka")]
2237#[derive(Debug, Clone, Serialize, Deserialize)]
2238pub struct KafkaTopicInfo {
2239 pub name: String,
2240 pub partitions: usize,
2241 pub replication_factor: i32,
2242}
2243
2244#[cfg(feature = "kafka")]
2245#[derive(Debug, Clone, Serialize, Deserialize)]
2246pub struct KafkaConsumerGroupInfo {
2247 pub group_id: String,
2248 pub members: usize,
2249 pub state: String,
2250}
2251
2252#[cfg(feature = "kafka")]
2253async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2255 if let Some(broker) = &state.kafka_broker {
2256 let topics = broker.topics.read().await;
2257 let consumer_groups = broker.consumer_groups.read().await;
2258 let metrics = broker.metrics.clone();
2259
2260 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2261 let snapshot = metrics.snapshot();
2262 let messages_produced = snapshot.messages_produced_total;
2263 let messages_consumed = snapshot.messages_consumed_total;
2264
2265 let stats = KafkaBrokerStats {
2266 topics: topics.len(),
2267 partitions: total_partitions,
2268 consumer_groups: consumer_groups.groups().len(),
2269 messages_produced,
2270 messages_consumed,
2271 };
2272
2273 Json(stats).into_response()
2274 } else {
2275 (
2276 StatusCode::SERVICE_UNAVAILABLE,
2277 Json(serde_json::json!({
2278 "error": "Kafka broker not available",
2279 "message": "Kafka broker is not enabled or not available."
2280 })),
2281 )
2282 .into_response()
2283 }
2284}
2285
2286#[cfg(feature = "kafka")]
2287async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2289 if let Some(broker) = &state.kafka_broker {
2290 let topics = broker.topics.read().await;
2291 let topic_list: Vec<KafkaTopicInfo> = topics
2292 .iter()
2293 .map(|(name, topic)| KafkaTopicInfo {
2294 name: name.clone(),
2295 partitions: topic.partitions.len(),
2296 replication_factor: topic.config.replication_factor,
2297 })
2298 .collect();
2299
2300 Json(serde_json::json!({
2301 "topics": topic_list
2302 }))
2303 .into_response()
2304 } else {
2305 (
2306 StatusCode::SERVICE_UNAVAILABLE,
2307 Json(serde_json::json!({
2308 "error": "Kafka broker not available",
2309 "message": "Kafka broker is not enabled or not available."
2310 })),
2311 )
2312 .into_response()
2313 }
2314}
2315
2316#[cfg(feature = "kafka")]
2317async fn get_kafka_topic(
2319 State(state): State<ManagementState>,
2320 Path(topic_name): Path<String>,
2321) -> impl IntoResponse {
2322 if let Some(broker) = &state.kafka_broker {
2323 let topics = broker.topics.read().await;
2324 if let Some(topic) = topics.get(&topic_name) {
2325 Json(serde_json::json!({
2326 "name": topic_name,
2327 "partitions": topic.partitions.len(),
2328 "replication_factor": topic.config.replication_factor,
2329 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2330 "id": idx as i32,
2331 "leader": 0,
2332 "replicas": vec![0],
2333 "message_count": partition.messages.len()
2334 })).collect::<Vec<_>>()
2335 })).into_response()
2336 } else {
2337 (
2338 StatusCode::NOT_FOUND,
2339 Json(serde_json::json!({
2340 "error": "Topic not found",
2341 "topic": topic_name
2342 })),
2343 )
2344 .into_response()
2345 }
2346 } else {
2347 (
2348 StatusCode::SERVICE_UNAVAILABLE,
2349 Json(serde_json::json!({
2350 "error": "Kafka broker not available",
2351 "message": "Kafka broker is not enabled or not available."
2352 })),
2353 )
2354 .into_response()
2355 }
2356}
2357
2358#[cfg(feature = "kafka")]
2359async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2361 if let Some(broker) = &state.kafka_broker {
2362 let consumer_groups = broker.consumer_groups.read().await;
2363 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2364 .groups()
2365 .iter()
2366 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2367 group_id: group_id.clone(),
2368 members: group.members.len(),
2369 state: "Stable".to_string(), })
2371 .collect();
2372
2373 Json(serde_json::json!({
2374 "groups": groups
2375 }))
2376 .into_response()
2377 } else {
2378 (
2379 StatusCode::SERVICE_UNAVAILABLE,
2380 Json(serde_json::json!({
2381 "error": "Kafka broker not available",
2382 "message": "Kafka broker is not enabled or not available."
2383 })),
2384 )
2385 .into_response()
2386 }
2387}
2388
2389#[cfg(feature = "kafka")]
2390async fn get_kafka_group(
2392 State(state): State<ManagementState>,
2393 Path(group_id): Path<String>,
2394) -> impl IntoResponse {
2395 if let Some(broker) = &state.kafka_broker {
2396 let consumer_groups = broker.consumer_groups.read().await;
2397 if let Some(group) = consumer_groups.groups().get(&group_id) {
2398 Json(serde_json::json!({
2399 "group_id": group_id,
2400 "members": group.members.len(),
2401 "state": "Stable",
2402 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2403 "member_id": member_id,
2404 "client_id": member.client_id,
2405 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2406 "topic": a.topic,
2407 "partitions": a.partitions
2408 })).collect::<Vec<_>>()
2409 })).collect::<Vec<_>>(),
2410 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2411 "topic": topic,
2412 "partition": partition,
2413 "offset": offset
2414 })).collect::<Vec<_>>()
2415 })).into_response()
2416 } else {
2417 (
2418 StatusCode::NOT_FOUND,
2419 Json(serde_json::json!({
2420 "error": "Consumer group not found",
2421 "group_id": group_id
2422 })),
2423 )
2424 .into_response()
2425 }
2426 } else {
2427 (
2428 StatusCode::SERVICE_UNAVAILABLE,
2429 Json(serde_json::json!({
2430 "error": "Kafka broker not available",
2431 "message": "Kafka broker is not enabled or not available."
2432 })),
2433 )
2434 .into_response()
2435 }
2436}
2437
2438#[cfg(feature = "kafka")]
2441#[derive(Debug, Deserialize)]
2442pub struct KafkaProduceRequest {
2443 pub topic: String,
2445 #[serde(default)]
2447 pub key: Option<String>,
2448 pub value: String,
2450 #[serde(default)]
2452 pub partition: Option<i32>,
2453 #[serde(default)]
2455 pub headers: Option<std::collections::HashMap<String, String>>,
2456}
2457
2458#[cfg(feature = "kafka")]
2459async fn produce_kafka_message(
2461 State(state): State<ManagementState>,
2462 Json(request): Json<KafkaProduceRequest>,
2463) -> impl IntoResponse {
2464 if let Some(broker) = &state.kafka_broker {
2465 let mut topics = broker.topics.write().await;
2466
2467 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2469 crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2470 });
2471
2472 let partition_id = if let Some(partition) = request.partition {
2474 partition
2475 } else {
2476 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2477 };
2478
2479 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2481 return (
2482 StatusCode::BAD_REQUEST,
2483 Json(serde_json::json!({
2484 "error": "Invalid partition",
2485 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2486 })),
2487 )
2488 .into_response();
2489 }
2490
2491 let message = crate::partitions::KafkaMessage {
2493 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2495 key: request.key.map(|k| k.as_bytes().to_vec()),
2496 value: request.value.as_bytes().to_vec(),
2497 headers: request
2498 .headers
2499 .unwrap_or_default()
2500 .into_iter()
2501 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2502 .collect(),
2503 };
2504
2505 match topic_entry.produce(partition_id, message).await {
2507 Ok(offset) => {
2508 broker.metrics.record_messages_produced(1);
2510
2511 #[cfg(feature = "kafka")]
2513 {
2514 let event = MessageEvent::Kafka(KafkaMessageEvent {
2515 topic: request.topic.clone(),
2516 key: request.key.clone(),
2517 value: request.value.clone(),
2518 partition: partition_id,
2519 offset,
2520 headers: request.headers.clone(),
2521 timestamp: chrono::Utc::now().to_rfc3339(),
2522 });
2523 let _ = state.message_events.send(event);
2524 }
2525
2526 Json(serde_json::json!({
2527 "success": true,
2528 "message": format!("Message produced to topic '{}'", request.topic),
2529 "topic": request.topic,
2530 "partition": partition_id,
2531 "offset": offset
2532 }))
2533 .into_response()
2534 }
2535 Err(e) => (
2536 StatusCode::INTERNAL_SERVER_ERROR,
2537 Json(serde_json::json!({
2538 "error": "Failed to produce message",
2539 "message": e.to_string()
2540 })),
2541 )
2542 .into_response(),
2543 }
2544 } else {
2545 (
2546 StatusCode::SERVICE_UNAVAILABLE,
2547 Json(serde_json::json!({
2548 "error": "Kafka broker not available",
2549 "message": "Kafka broker is not enabled or not available."
2550 })),
2551 )
2552 .into_response()
2553 }
2554}
2555
2556#[cfg(feature = "kafka")]
2557#[derive(Debug, Deserialize)]
2558pub struct KafkaBatchProduceRequest {
2559 pub messages: Vec<KafkaProduceRequest>,
2561 #[serde(default = "default_delay")]
2563 pub delay_ms: u64,
2564}
2565
2566#[cfg(feature = "kafka")]
2567async fn produce_kafka_batch(
2569 State(state): State<ManagementState>,
2570 Json(request): Json<KafkaBatchProduceRequest>,
2571) -> impl IntoResponse {
2572 if let Some(broker) = &state.kafka_broker {
2573 if request.messages.is_empty() {
2574 return (
2575 StatusCode::BAD_REQUEST,
2576 Json(serde_json::json!({
2577 "error": "Empty batch",
2578 "message": "At least one message is required"
2579 })),
2580 )
2581 .into_response();
2582 }
2583
2584 let mut results = Vec::new();
2585
2586 for (index, msg_request) in request.messages.iter().enumerate() {
2587 let mut topics = broker.topics.write().await;
2588
2589 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2591 crate::topics::Topic::new(
2592 msg_request.topic.clone(),
2593 crate::topics::TopicConfig::default(),
2594 )
2595 });
2596
2597 let partition_id = if let Some(partition) = msg_request.partition {
2599 partition
2600 } else {
2601 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2602 };
2603
2604 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2606 results.push(serde_json::json!({
2607 "index": index,
2608 "success": false,
2609 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2610 }));
2611 continue;
2612 }
2613
2614 let message = crate::partitions::KafkaMessage {
2616 offset: 0,
2617 timestamp: chrono::Utc::now().timestamp_millis(),
2618 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2619 value: msg_request.value.as_bytes().to_vec(),
2620 headers: msg_request
2621 .headers
2622 .clone()
2623 .unwrap_or_default()
2624 .into_iter()
2625 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2626 .collect(),
2627 };
2628
2629 match topic_entry.produce(partition_id, message).await {
2631 Ok(offset) => {
2632 broker.metrics.record_messages_produced(1);
2633
2634 let event = MessageEvent::Kafka(KafkaMessageEvent {
2636 topic: msg_request.topic.clone(),
2637 key: msg_request.key.clone(),
2638 value: msg_request.value.clone(),
2639 partition: partition_id,
2640 offset,
2641 headers: msg_request.headers.clone(),
2642 timestamp: chrono::Utc::now().to_rfc3339(),
2643 });
2644 let _ = state.message_events.send(event);
2645
2646 results.push(serde_json::json!({
2647 "index": index,
2648 "success": true,
2649 "topic": msg_request.topic,
2650 "partition": partition_id,
2651 "offset": offset
2652 }));
2653 }
2654 Err(e) => {
2655 results.push(serde_json::json!({
2656 "index": index,
2657 "success": false,
2658 "error": e.to_string()
2659 }));
2660 }
2661 }
2662
2663 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2665 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2666 }
2667 }
2668
2669 let success_count =
2670 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2671
2672 Json(serde_json::json!({
2673 "success": true,
2674 "total": request.messages.len(),
2675 "succeeded": success_count,
2676 "failed": request.messages.len() - success_count,
2677 "results": results
2678 }))
2679 .into_response()
2680 } else {
2681 (
2682 StatusCode::SERVICE_UNAVAILABLE,
2683 Json(serde_json::json!({
2684 "error": "Kafka broker not available",
2685 "message": "Kafka broker is not enabled or not available."
2686 })),
2687 )
2688 .into_response()
2689 }
2690}
2691
2692#[cfg(feature = "mqtt")]
2695async fn mqtt_messages_stream(
2697 State(state): State<ManagementState>,
2698 Query(params): Query<std::collections::HashMap<String, String>>,
2699) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2700 let mut rx = state.message_events.subscribe();
2701 let topic_filter = params.get("topic").cloned();
2702
2703 let stream = stream::unfold(rx, move |mut rx| {
2704 let topic_filter = topic_filter.clone();
2705
2706 async move {
2707 loop {
2708 match rx.recv().await {
2709 Ok(MessageEvent::Mqtt(event)) => {
2710 if let Some(filter) = &topic_filter {
2712 if !event.topic.contains(filter) {
2713 continue;
2714 }
2715 }
2716
2717 let event_json = serde_json::json!({
2718 "protocol": "mqtt",
2719 "topic": event.topic,
2720 "payload": event.payload,
2721 "qos": event.qos,
2722 "retain": event.retain,
2723 "timestamp": event.timestamp,
2724 });
2725
2726 if let Ok(event_data) = serde_json::to_string(&event_json) {
2727 let sse_event = Event::default().event("mqtt_message").data(event_data);
2728 return Some((Ok(sse_event), rx));
2729 }
2730 }
2731 #[cfg(feature = "kafka")]
2732 Ok(MessageEvent::Kafka(_)) => {
2733 continue;
2735 }
2736 Err(broadcast::error::RecvError::Closed) => {
2737 return None;
2738 }
2739 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2740 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2741 continue;
2742 }
2743 }
2744 }
2745 }
2746 });
2747
2748 Sse::new(stream).keep_alive(
2749 axum::response::sse::KeepAlive::new()
2750 .interval(std::time::Duration::from_secs(15))
2751 .text("keep-alive-text"),
2752 )
2753}
2754
2755#[cfg(feature = "kafka")]
2756async fn kafka_messages_stream(
2758 State(state): State<ManagementState>,
2759 Query(params): Query<std::collections::HashMap<String, String>>,
2760) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2761 let mut rx = state.message_events.subscribe();
2762 let topic_filter = params.get("topic").cloned();
2763
2764 let stream = stream::unfold(rx, move |mut rx| {
2765 let topic_filter = topic_filter.clone();
2766
2767 async move {
2768 loop {
2769 match rx.recv().await {
2770 #[cfg(feature = "mqtt")]
2771 Ok(MessageEvent::Mqtt(_)) => {
2772 continue;
2774 }
2775 Ok(MessageEvent::Kafka(event)) => {
2776 if let Some(filter) = &topic_filter {
2778 if !event.topic.contains(filter) {
2779 continue;
2780 }
2781 }
2782
2783 let event_json = serde_json::json!({
2784 "protocol": "kafka",
2785 "topic": event.topic,
2786 "key": event.key,
2787 "value": event.value,
2788 "partition": event.partition,
2789 "offset": event.offset,
2790 "headers": event.headers,
2791 "timestamp": event.timestamp,
2792 });
2793
2794 if let Ok(event_data) = serde_json::to_string(&event_json) {
2795 let sse_event =
2796 Event::default().event("kafka_message").data(event_data);
2797 return Some((Ok(sse_event), rx));
2798 }
2799 }
2800 Err(broadcast::error::RecvError::Closed) => {
2801 return None;
2802 }
2803 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2804 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2805 continue;
2806 }
2807 }
2808 }
2809 }
2810 });
2811
2812 Sse::new(stream).keep_alive(
2813 axum::response::sse::KeepAlive::new()
2814 .interval(std::time::Duration::from_secs(15))
2815 .text("keep-alive-text"),
2816 )
2817}
2818
2819#[derive(Debug, Deserialize)]
2823pub struct GenerateSpecRequest {
2824 pub query: String,
2826 pub spec_type: String,
2828 pub api_version: Option<String>,
2830}
2831
2832#[derive(Debug, Deserialize)]
2834pub struct GenerateOpenApiFromTrafficRequest {
2835 #[serde(default)]
2837 pub database_path: Option<String>,
2838 #[serde(default)]
2840 pub since: Option<String>,
2841 #[serde(default)]
2843 pub until: Option<String>,
2844 #[serde(default)]
2846 pub path_pattern: Option<String>,
2847 #[serde(default = "default_min_confidence")]
2849 pub min_confidence: f64,
2850}
2851
2852fn default_min_confidence() -> f64 {
2853 0.7
2854}
2855
2856#[cfg(feature = "data-faker")]
2858async fn generate_ai_spec(
2859 State(_state): State<ManagementState>,
2860 Json(request): Json<GenerateSpecRequest>,
2861) -> impl IntoResponse {
2862 use mockforge_data::rag::{
2863 config::{EmbeddingProvider, LlmProvider, RagConfig},
2864 engine::RagEngine,
2865 storage::{DocumentStorage, StorageFactory},
2866 };
2867 use std::sync::Arc;
2868
2869 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2871 .ok()
2872 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2873
2874 if api_key.is_none() {
2876 return (
2877 StatusCode::SERVICE_UNAVAILABLE,
2878 Json(serde_json::json!({
2879 "error": "AI service not configured",
2880 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2881 })),
2882 )
2883 .into_response();
2884 }
2885
2886 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2888 .unwrap_or_else(|_| "openai".to_string())
2889 .to_lowercase();
2890
2891 let provider = match provider_str.as_str() {
2892 "openai" => LlmProvider::OpenAI,
2893 "anthropic" => LlmProvider::Anthropic,
2894 "ollama" => LlmProvider::Ollama,
2895 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2896 _ => LlmProvider::OpenAI,
2897 };
2898
2899 let api_endpoint =
2900 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2901 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2902 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2903 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2904 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2905 });
2906
2907 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2908 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2909 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2910 LlmProvider::Ollama => "llama2".to_string(),
2911 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2912 });
2913
2914 let mut rag_config = RagConfig::default();
2916 rag_config.provider = provider;
2917 rag_config.api_endpoint = api_endpoint;
2918 rag_config.api_key = api_key;
2919 rag_config.model = model;
2920 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2921 .unwrap_or_else(|_| "4096".to_string())
2922 .parse()
2923 .unwrap_or(4096);
2924 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2925 .unwrap_or_else(|_| "0.3".to_string())
2926 .parse()
2927 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2929 .unwrap_or_else(|_| "60".to_string())
2930 .parse()
2931 .unwrap_or(60);
2932 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2933 .unwrap_or_else(|_| "4000".to_string())
2934 .parse()
2935 .unwrap_or(4000);
2936
2937 let spec_type_label = match request.spec_type.as_str() {
2939 "openapi" => "OpenAPI 3.0",
2940 "graphql" => "GraphQL",
2941 "asyncapi" => "AsyncAPI",
2942 _ => "OpenAPI 3.0",
2943 };
2944
2945 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2946
2947 let prompt = format!(
2948 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2949
2950User Requirements:
2951{}
2952
2953Instructions:
29541. Generate a complete, valid {} specification
29552. Include all paths, operations, request/response schemas, and components
29563. Use realistic field names and data types
29574. Include proper descriptions and examples
29585. Follow {} best practices
29596. Return ONLY the specification, no additional explanation
29607. For OpenAPI, use version {}
2961
2962Return the specification in {} format."#,
2963 spec_type_label,
2964 request.query,
2965 spec_type_label,
2966 spec_type_label,
2967 api_version,
2968 if request.spec_type == "graphql" {
2969 "GraphQL SDL"
2970 } else {
2971 "YAML"
2972 }
2973 );
2974
2975 use mockforge_data::rag::storage::InMemoryStorage;
2980 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2981
2982 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2984 Ok(engine) => engine,
2985 Err(e) => {
2986 return (
2987 StatusCode::INTERNAL_SERVER_ERROR,
2988 Json(serde_json::json!({
2989 "error": "Failed to initialize RAG engine",
2990 "message": e.to_string()
2991 })),
2992 )
2993 .into_response();
2994 }
2995 };
2996
2997 match rag_engine.generate(&prompt, None).await {
2999 Ok(generated_text) => {
3000 let spec = if request.spec_type == "graphql" {
3002 extract_graphql_schema(&generated_text)
3004 } else {
3005 extract_yaml_spec(&generated_text)
3007 };
3008
3009 Json(serde_json::json!({
3010 "success": true,
3011 "spec": spec,
3012 "spec_type": request.spec_type,
3013 }))
3014 .into_response()
3015 }
3016 Err(e) => (
3017 StatusCode::INTERNAL_SERVER_ERROR,
3018 Json(serde_json::json!({
3019 "error": "AI generation failed",
3020 "message": e.to_string()
3021 })),
3022 )
3023 .into_response(),
3024 }
3025}
3026
3027#[cfg(not(feature = "data-faker"))]
3028async fn generate_ai_spec(
3029 State(_state): State<ManagementState>,
3030 Json(_request): Json<GenerateSpecRequest>,
3031) -> impl IntoResponse {
3032 (
3033 StatusCode::NOT_IMPLEMENTED,
3034 Json(serde_json::json!({
3035 "error": "AI features not enabled",
3036 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3037 })),
3038 )
3039 .into_response()
3040}
3041
3042async fn generate_openapi_from_traffic(
3044 State(_state): State<ManagementState>,
3045 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3046) -> impl IntoResponse {
3047 use chrono::{DateTime, Utc};
3048 use mockforge_core::intelligent_behavior::{
3049 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3050 IntelligentBehaviorConfig,
3051 };
3052 use mockforge_recorder::{
3053 database::RecorderDatabase,
3054 openapi_export::{QueryFilters, RecordingsToOpenApi},
3055 };
3056 use std::path::PathBuf;
3057
3058 let db_path = if let Some(ref path) = request.database_path {
3060 PathBuf::from(path)
3061 } else {
3062 std::env::current_dir()
3063 .unwrap_or_else(|_| PathBuf::from("."))
3064 .join("recordings.db")
3065 };
3066
3067 let db = match RecorderDatabase::new(&db_path).await {
3069 Ok(db) => db,
3070 Err(e) => {
3071 return (
3072 StatusCode::BAD_REQUEST,
3073 Json(serde_json::json!({
3074 "error": "Database error",
3075 "message": format!("Failed to open recorder database: {}", e)
3076 })),
3077 )
3078 .into_response();
3079 }
3080 };
3081
3082 let since_dt = if let Some(ref since_str) = request.since {
3084 match DateTime::parse_from_rfc3339(since_str) {
3085 Ok(dt) => Some(dt.with_timezone(&Utc)),
3086 Err(e) => {
3087 return (
3088 StatusCode::BAD_REQUEST,
3089 Json(serde_json::json!({
3090 "error": "Invalid date format",
3091 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3092 })),
3093 )
3094 .into_response();
3095 }
3096 }
3097 } else {
3098 None
3099 };
3100
3101 let until_dt = if let Some(ref until_str) = request.until {
3102 match DateTime::parse_from_rfc3339(until_str) {
3103 Ok(dt) => Some(dt.with_timezone(&Utc)),
3104 Err(e) => {
3105 return (
3106 StatusCode::BAD_REQUEST,
3107 Json(serde_json::json!({
3108 "error": "Invalid date format",
3109 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3110 })),
3111 )
3112 .into_response();
3113 }
3114 }
3115 } else {
3116 None
3117 };
3118
3119 let query_filters = QueryFilters {
3121 since: since_dt,
3122 until: until_dt,
3123 path_pattern: request.path_pattern.clone(),
3124 min_status_code: None,
3125 max_requests: Some(1000),
3126 };
3127
3128 let exchanges = match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await
3130 {
3131 Ok(exchanges) => exchanges,
3132 Err(e) => {
3133 return (
3134 StatusCode::INTERNAL_SERVER_ERROR,
3135 Json(serde_json::json!({
3136 "error": "Query error",
3137 "message": format!("Failed to query HTTP exchanges: {}", e)
3138 })),
3139 )
3140 .into_response();
3141 }
3142 };
3143
3144 if exchanges.is_empty() {
3145 return (
3146 StatusCode::NOT_FOUND,
3147 Json(serde_json::json!({
3148 "error": "No exchanges found",
3149 "message": "No HTTP exchanges found matching the specified filters"
3150 })),
3151 )
3152 .into_response();
3153 }
3154
3155 let behavior_config = IntelligentBehaviorConfig::default();
3157 let gen_config = OpenApiGenerationConfig {
3158 min_confidence: request.min_confidence,
3159 behavior_model: Some(behavior_config.behavior_model),
3160 };
3161
3162 let generator = OpenApiSpecGenerator::new(gen_config);
3164 let result = match generator.generate_from_exchanges(exchanges).await {
3165 Ok(result) => result,
3166 Err(e) => {
3167 return (
3168 StatusCode::INTERNAL_SERVER_ERROR,
3169 Json(serde_json::json!({
3170 "error": "Generation error",
3171 "message": format!("Failed to generate OpenAPI spec: {}", e)
3172 })),
3173 )
3174 .into_response();
3175 }
3176 };
3177
3178 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3180 raw.clone()
3181 } else {
3182 match serde_json::to_value(&result.spec.spec) {
3183 Ok(json) => json,
3184 Err(e) => {
3185 return (
3186 StatusCode::INTERNAL_SERVER_ERROR,
3187 Json(serde_json::json!({
3188 "error": "Serialization error",
3189 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3190 })),
3191 )
3192 .into_response();
3193 }
3194 }
3195 };
3196
3197 let response = serde_json::json!({
3199 "spec": spec_json,
3200 "metadata": {
3201 "requests_analyzed": result.metadata.requests_analyzed,
3202 "paths_inferred": result.metadata.paths_inferred,
3203 "path_confidence": result.metadata.path_confidence,
3204 "generated_at": result.metadata.generated_at.to_rfc3339(),
3205 "duration_ms": result.metadata.duration_ms,
3206 }
3207 });
3208
3209 Json(response).into_response()
3210}
3211
3212async fn list_rule_explanations(
3214 State(state): State<ManagementState>,
3215 Query(params): Query<std::collections::HashMap<String, String>>,
3216) -> impl IntoResponse {
3217 use mockforge_core::intelligent_behavior::RuleType;
3218
3219 let explanations = state.rule_explanations.read().await;
3220 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3221
3222 if let Some(rule_type_str) = params.get("rule_type") {
3224 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3225 explanations_vec.retain(|e| e.rule_type == rule_type);
3226 }
3227 }
3228
3229 if let Some(min_confidence_str) = params.get("min_confidence") {
3231 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3232 explanations_vec.retain(|e| e.confidence >= min_confidence);
3233 }
3234 }
3235
3236 explanations_vec.sort_by(|a, b| {
3238 b.confidence
3239 .partial_cmp(&a.confidence)
3240 .unwrap_or(std::cmp::Ordering::Equal)
3241 .then_with(|| b.generated_at.cmp(&a.generated_at))
3242 });
3243
3244 Json(serde_json::json!({
3245 "explanations": explanations_vec,
3246 "total": explanations_vec.len(),
3247 }))
3248 .into_response()
3249}
3250
3251async fn get_rule_explanation(
3253 State(state): State<ManagementState>,
3254 Path(rule_id): Path<String>,
3255) -> impl IntoResponse {
3256 let explanations = state.rule_explanations.read().await;
3257
3258 match explanations.get(&rule_id) {
3259 Some(explanation) => Json(serde_json::json!({
3260 "explanation": explanation,
3261 }))
3262 .into_response(),
3263 None => (
3264 StatusCode::NOT_FOUND,
3265 Json(serde_json::json!({
3266 "error": "Rule explanation not found",
3267 "message": format!("No explanation found for rule ID: {}", rule_id)
3268 })),
3269 )
3270 .into_response(),
3271 }
3272}
3273
3274#[derive(Debug, Deserialize)]
3276pub struct LearnFromExamplesRequest {
3277 pub examples: Vec<ExamplePairRequest>,
3279 #[serde(default)]
3281 pub config: Option<serde_json::Value>,
3282}
3283
3284#[derive(Debug, Deserialize)]
3286pub struct ExamplePairRequest {
3287 pub request: serde_json::Value,
3289 pub response: serde_json::Value,
3291}
3292
3293async fn learn_from_examples(
3298 State(state): State<ManagementState>,
3299 Json(request): Json<LearnFromExamplesRequest>,
3300) -> impl IntoResponse {
3301 use mockforge_core::intelligent_behavior::{
3302 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3303 rule_generator::{ExamplePair, RuleGenerator},
3304 };
3305
3306 if request.examples.is_empty() {
3307 return (
3308 StatusCode::BAD_REQUEST,
3309 Json(serde_json::json!({
3310 "error": "No examples provided",
3311 "message": "At least one example pair is required"
3312 })),
3313 )
3314 .into_response();
3315 }
3316
3317 let example_pairs: Result<Vec<ExamplePair>, String> = request
3319 .examples
3320 .into_iter()
3321 .enumerate()
3322 .map(|(idx, ex)| {
3323 let method = ex
3325 .request
3326 .get("method")
3327 .and_then(|v| v.as_str())
3328 .map(|s| s.to_string())
3329 .unwrap_or_else(|| "GET".to_string());
3330 let path = ex
3331 .request
3332 .get("path")
3333 .and_then(|v| v.as_str())
3334 .map(|s| s.to_string())
3335 .unwrap_or_else(|| "/".to_string());
3336 let request_body = ex.request.get("body").cloned();
3337 let query_params = ex
3338 .request
3339 .get("query_params")
3340 .and_then(|v| v.as_object())
3341 .map(|obj| {
3342 obj.iter()
3343 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3344 .collect()
3345 })
3346 .unwrap_or_default();
3347 let headers = ex
3348 .request
3349 .get("headers")
3350 .and_then(|v| v.as_object())
3351 .map(|obj| {
3352 obj.iter()
3353 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3354 .collect()
3355 })
3356 .unwrap_or_default();
3357
3358 let status = ex
3360 .response
3361 .get("status_code")
3362 .or_else(|| ex.response.get("status"))
3363 .and_then(|v| v.as_u64())
3364 .map(|n| n as u16)
3365 .unwrap_or(200);
3366 let response_body = ex.response.get("body").cloned();
3367
3368 Ok(ExamplePair {
3369 method,
3370 path,
3371 request: request_body,
3372 status,
3373 response: response_body,
3374 query_params,
3375 headers,
3376 metadata: {
3377 let mut meta = std::collections::HashMap::new();
3378 meta.insert("source".to_string(), "api".to_string());
3379 meta.insert("example_index".to_string(), idx.to_string());
3380 meta
3381 },
3382 })
3383 })
3384 .collect();
3385
3386 let example_pairs = match example_pairs {
3387 Ok(pairs) => pairs,
3388 Err(e) => {
3389 return (
3390 StatusCode::BAD_REQUEST,
3391 Json(serde_json::json!({
3392 "error": "Invalid examples",
3393 "message": e
3394 })),
3395 )
3396 .into_response();
3397 }
3398 };
3399
3400 let behavior_config = if let Some(config_json) = request.config {
3402 serde_json::from_value(config_json)
3404 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3405 .behavior_model
3406 } else {
3407 BehaviorModelConfig::default()
3408 };
3409
3410 let generator = RuleGenerator::new(behavior_config);
3412
3413 let (rules, explanations) =
3415 match generator.generate_rules_with_explanations(example_pairs).await {
3416 Ok(result) => result,
3417 Err(e) => {
3418 return (
3419 StatusCode::INTERNAL_SERVER_ERROR,
3420 Json(serde_json::json!({
3421 "error": "Rule generation failed",
3422 "message": format!("Failed to generate rules: {}", e)
3423 })),
3424 )
3425 .into_response();
3426 }
3427 };
3428
3429 {
3431 let mut stored_explanations = state.rule_explanations.write().await;
3432 for explanation in &explanations {
3433 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3434 }
3435 }
3436
3437 let response = serde_json::json!({
3439 "success": true,
3440 "rules_generated": {
3441 "consistency_rules": rules.consistency_rules.len(),
3442 "schemas": rules.schemas.len(),
3443 "state_machines": rules.state_transitions.len(),
3444 "system_prompt": !rules.system_prompt.is_empty(),
3445 },
3446 "explanations": explanations.iter().map(|e| serde_json::json!({
3447 "rule_id": e.rule_id,
3448 "rule_type": e.rule_type,
3449 "confidence": e.confidence,
3450 "reasoning": e.reasoning,
3451 })).collect::<Vec<_>>(),
3452 "total_explanations": explanations.len(),
3453 });
3454
3455 Json(response).into_response()
3456}
3457
3458fn extract_yaml_spec(text: &str) -> String {
3459 if let Some(start) = text.find("```yaml") {
3461 let yaml_start = text[start + 7..].trim_start();
3462 if let Some(end) = yaml_start.find("```") {
3463 return yaml_start[..end].trim().to_string();
3464 }
3465 }
3466 if let Some(start) = text.find("```") {
3467 let content_start = text[start + 3..].trim_start();
3468 if let Some(end) = content_start.find("```") {
3469 return content_start[..end].trim().to_string();
3470 }
3471 }
3472
3473 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3475 return text.trim().to_string();
3476 }
3477
3478 text.trim().to_string()
3480}
3481
3482fn extract_graphql_schema(text: &str) -> String {
3483 if let Some(start) = text.find("```graphql") {
3485 let schema_start = text[start + 10..].trim_start();
3486 if let Some(end) = schema_start.find("```") {
3487 return schema_start[..end].trim().to_string();
3488 }
3489 }
3490 if let Some(start) = text.find("```") {
3491 let content_start = text[start + 3..].trim_start();
3492 if let Some(end) = content_start.find("```") {
3493 return content_start[..end].trim().to_string();
3494 }
3495 }
3496
3497 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3499 return text.trim().to_string();
3500 }
3501
3502 text.trim().to_string()
3503}
3504
3505async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3509 #[cfg(feature = "chaos")]
3510 {
3511 if let Some(chaos_state) = &state.chaos_api_state {
3512 let config = chaos_state.config.read().await;
3513 Json(serde_json::json!({
3515 "enabled": config.enabled,
3516 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3517 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3518 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3519 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3520 }))
3521 .into_response()
3522 } else {
3523 Json(serde_json::json!({
3525 "enabled": false,
3526 "latency": null,
3527 "fault_injection": null,
3528 "rate_limit": null,
3529 "traffic_shaping": null,
3530 }))
3531 .into_response()
3532 }
3533 }
3534 #[cfg(not(feature = "chaos"))]
3535 {
3536 Json(serde_json::json!({
3538 "enabled": false,
3539 "latency": null,
3540 "fault_injection": null,
3541 "rate_limit": null,
3542 "traffic_shaping": null,
3543 }))
3544 .into_response()
3545 }
3546}
3547
3548#[derive(Debug, Deserialize)]
3550pub struct ChaosConfigUpdate {
3551 pub enabled: Option<bool>,
3553 pub latency: Option<serde_json::Value>,
3555 pub fault_injection: Option<serde_json::Value>,
3557 pub rate_limit: Option<serde_json::Value>,
3559 pub traffic_shaping: Option<serde_json::Value>,
3561}
3562
3563async fn update_chaos_config(
3565 State(state): State<ManagementState>,
3566 Json(config_update): Json<ChaosConfigUpdate>,
3567) -> impl IntoResponse {
3568 #[cfg(feature = "chaos")]
3569 {
3570 if let Some(chaos_state) = &state.chaos_api_state {
3571 use mockforge_chaos::config::{ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig};
3572
3573 let mut config = chaos_state.config.write().await;
3574
3575 if let Some(enabled) = config_update.enabled {
3577 config.enabled = enabled;
3578 }
3579
3580 if let Some(latency_json) = config_update.latency {
3582 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3583 config.latency = Some(latency);
3584 }
3585 }
3586
3587 if let Some(fault_json) = config_update.fault_injection {
3589 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3590 config.fault_injection = Some(fault);
3591 }
3592 }
3593
3594 if let Some(rate_json) = config_update.rate_limit {
3596 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3597 config.rate_limit = Some(rate);
3598 }
3599 }
3600
3601 if let Some(traffic_json) = config_update.traffic_shaping {
3603 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3604 config.traffic_shaping = Some(traffic);
3605 }
3606 }
3607
3608 drop(config);
3611
3612 info!("Chaos configuration updated successfully");
3613 Json(serde_json::json!({
3614 "success": true,
3615 "message": "Chaos configuration updated and applied"
3616 }))
3617 .into_response()
3618 } else {
3619 (
3620 StatusCode::SERVICE_UNAVAILABLE,
3621 Json(serde_json::json!({
3622 "success": false,
3623 "error": "Chaos API not available",
3624 "message": "Chaos engineering is not enabled or configured"
3625 })),
3626 )
3627 .into_response()
3628 }
3629 }
3630 #[cfg(not(feature = "chaos"))]
3631 {
3632 (
3633 StatusCode::NOT_IMPLEMENTED,
3634 Json(serde_json::json!({
3635 "success": false,
3636 "error": "Chaos feature not enabled",
3637 "message": "Chaos engineering feature is not compiled into this build"
3638 })),
3639 )
3640 .into_response()
3641 }
3642}
3643
3644async fn list_network_profiles() -> impl IntoResponse {
3648 use mockforge_core::network_profiles::NetworkProfileCatalog;
3649
3650 let catalog = NetworkProfileCatalog::default();
3651 let profiles: Vec<serde_json::Value> = catalog
3652 .list_profiles_with_description()
3653 .iter()
3654 .map(|(name, description)| {
3655 serde_json::json!({
3656 "name": name,
3657 "description": description,
3658 })
3659 })
3660 .collect();
3661
3662 Json(serde_json::json!({
3663 "profiles": profiles
3664 }))
3665 .into_response()
3666}
3667
3668#[derive(Debug, Deserialize)]
3669pub struct ApplyNetworkProfileRequest {
3671 pub profile_name: String,
3673}
3674
3675async fn apply_network_profile(
3677 State(state): State<ManagementState>,
3678 Json(request): Json<ApplyNetworkProfileRequest>,
3679) -> impl IntoResponse {
3680 use mockforge_core::network_profiles::NetworkProfileCatalog;
3681
3682 let catalog = NetworkProfileCatalog::default();
3683 if let Some(profile) = catalog.get(&request.profile_name) {
3684 if let Some(server_config) = &state.server_config {
3687 let mut config = server_config.write().await;
3688
3689 use mockforge_core::config::NetworkShapingConfig;
3691
3692 let network_shaping = NetworkShapingConfig {
3696 enabled: profile.traffic_shaping.bandwidth.enabled || profile.traffic_shaping.burst_loss.enabled,
3697 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3699 max_connections: 1000, };
3701
3702 if let Some(ref mut chaos) = config.observability.chaos {
3705 chaos.traffic_shaping = Some(network_shaping);
3706 } else {
3707 use mockforge_core::config::ChaosEngConfig;
3709 config.observability.chaos = Some(ChaosEngConfig {
3710 enabled: true,
3711 latency: None,
3712 fault_injection: None,
3713 rate_limit: None,
3714 traffic_shaping: Some(network_shaping),
3715 scenario: None,
3716 });
3717 }
3718
3719 info!("Network profile '{}' applied to server configuration", request.profile_name);
3720 } else {
3721 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3722 }
3723
3724 #[cfg(feature = "chaos")]
3726 {
3727 if let Some(chaos_state) = &state.chaos_api_state {
3728 use mockforge_chaos::config::TrafficShapingConfig;
3729
3730 let mut chaos_config = chaos_state.config.write().await;
3731 let chaos_traffic_shaping = TrafficShapingConfig {
3733 enabled: profile.traffic_shaping.bandwidth.enabled || profile.traffic_shaping.burst_loss.enabled,
3734 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3736 max_connections: 0,
3737 connection_timeout_ms: 30000,
3738 };
3739 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3740 chaos_config.enabled = true; drop(chaos_config);
3742 info!("Network profile '{}' applied to chaos API state", request.profile_name);
3743 }
3744 }
3745
3746 Json(serde_json::json!({
3747 "success": true,
3748 "message": format!("Network profile '{}' applied", request.profile_name),
3749 "profile": {
3750 "name": profile.name,
3751 "description": profile.description,
3752 }
3753 }))
3754 .into_response()
3755 } else {
3756 (
3757 StatusCode::NOT_FOUND,
3758 Json(serde_json::json!({
3759 "error": "Profile not found",
3760 "message": format!("Network profile '{}' not found", request.profile_name)
3761 })),
3762 )
3763 .into_response()
3764 }
3765}
3766
3767pub fn management_router_with_ui_builder(
3769 state: ManagementState,
3770 server_config: mockforge_core::config::ServerConfig,
3771) -> Router {
3772 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3773
3774 let management = management_router(state);
3776
3777 let ui_builder_state = UIBuilderState::new(server_config);
3779 let ui_builder = create_ui_builder_router(ui_builder_state);
3780
3781 management.nest("/ui-builder", ui_builder)
3783}
3784
3785pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3787 use crate::spec_import::{spec_import_router, SpecImportState};
3788
3789 let management = management_router(state);
3791
3792 Router::new()
3794 .merge(management)
3795 .merge(spec_import_router(SpecImportState::new()))
3796}
3797
3798#[cfg(test)]
3799mod tests {
3800 use super::*;
3801
3802 #[tokio::test]
3803 async fn test_create_and_get_mock() {
3804 let state = ManagementState::new(None, None, 3000);
3805
3806 let mock = MockConfig {
3807 id: "test-1".to_string(),
3808 name: "Test Mock".to_string(),
3809 method: "GET".to_string(),
3810 path: "/test".to_string(),
3811 response: MockResponse {
3812 body: serde_json::json!({"message": "test"}),
3813 headers: None,
3814 },
3815 enabled: true,
3816 latency_ms: None,
3817 status_code: Some(200),
3818 request_match: None,
3819 priority: None,
3820 scenario: None,
3821 required_scenario_state: None,
3822 new_scenario_state: None,
3823 };
3824
3825 {
3827 let mut mocks = state.mocks.write().await;
3828 mocks.push(mock.clone());
3829 }
3830
3831 let mocks = state.mocks.read().await;
3833 let found = mocks.iter().find(|m| m.id == "test-1");
3834 assert!(found.is_some());
3835 assert_eq!(found.unwrap().name, "Test Mock");
3836 }
3837
3838 #[tokio::test]
3839 async fn test_server_stats() {
3840 let state = ManagementState::new(None, None, 3000);
3841
3842 {
3844 let mut mocks = state.mocks.write().await;
3845 mocks.push(MockConfig {
3846 id: "1".to_string(),
3847 name: "Mock 1".to_string(),
3848 method: "GET".to_string(),
3849 path: "/test1".to_string(),
3850 response: MockResponse {
3851 body: serde_json::json!({}),
3852 headers: None,
3853 },
3854 enabled: true,
3855 latency_ms: None,
3856 status_code: Some(200),
3857 request_match: None,
3858 priority: None,
3859 scenario: None,
3860 required_scenario_state: None,
3861 new_scenario_state: None,
3862 });
3863 mocks.push(MockConfig {
3864 id: "2".to_string(),
3865 name: "Mock 2".to_string(),
3866 method: "POST".to_string(),
3867 path: "/test2".to_string(),
3868 response: MockResponse {
3869 body: serde_json::json!({}),
3870 headers: None,
3871 },
3872 enabled: false,
3873 latency_ms: None,
3874 status_code: Some(201),
3875 request_match: None,
3876 priority: None,
3877 scenario: None,
3878 required_scenario_state: None,
3879 new_scenario_state: None,
3880 });
3881 }
3882
3883 let mocks = state.mocks.read().await;
3884 assert_eq!(mocks.len(), 2);
3885 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3886 }
3887}