1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json, Response,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33 #[cfg(feature = "mqtt")]
34 Mqtt(MqttMessageEvent),
36 #[cfg(feature = "kafka")]
37 Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45 pub topic: String,
47 pub payload: String,
49 pub qos: u8,
51 pub retain: bool,
53 pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60 pub topic: String,
61 pub key: Option<String>,
62 pub value: String,
63 pub partition: i32,
64 pub offset: i64,
65 pub headers: Option<std::collections::HashMap<String, String>>,
66 pub timestamp: String,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72 #[serde(skip_serializing_if = "String::is_empty")]
74 pub id: String,
75 pub name: String,
77 pub method: String,
79 pub path: String,
81 pub response: MockResponse,
83 #[serde(default = "default_true")]
85 pub enabled: bool,
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub latency_ms: Option<u64>,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub status_code: Option<u16>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub request_match: Option<RequestMatchCriteria>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub priority: Option<i32>,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub scenario: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub required_scenario_state: Option<String>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116 pub body: serde_json::Value,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128 pub headers: std::collections::HashMap<String, String>,
129 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131 pub query_params: std::collections::HashMap<String, String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub body_pattern: Option<String>,
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub json_path: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub xpath: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub custom_matcher: Option<String>,
144}
145
146pub fn mock_matches_request(
155 mock: &MockConfig,
156 method: &str,
157 path: &str,
158 headers: &std::collections::HashMap<String, String>,
159 query_params: &std::collections::HashMap<String, String>,
160 body: Option<&[u8]>,
161) -> bool {
162 use regex::Regex;
163
164 if !mock.enabled {
166 return false;
167 }
168
169 if mock.method.to_uppercase() != method.to_uppercase() {
171 return false;
172 }
173
174 if !path_matches_pattern(&mock.path, path) {
176 return false;
177 }
178
179 if let Some(criteria) = &mock.request_match {
181 for (key, expected_value) in &criteria.headers {
183 let header_key_lower = key.to_lowercase();
184 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186 if let Some((_, actual_value)) = found {
187 if let Ok(re) = Regex::new(expected_value) {
189 if !re.is_match(actual_value) {
190 return false;
191 }
192 } else if actual_value != expected_value {
193 return false;
194 }
195 } else {
196 return false; }
198 }
199
200 for (key, expected_value) in &criteria.query_params {
202 if let Some(actual_value) = query_params.get(key) {
203 if actual_value != expected_value {
204 return false;
205 }
206 } else {
207 return false; }
209 }
210
211 if let Some(pattern) = &criteria.body_pattern {
213 if let Some(body_bytes) = body {
214 let body_str = String::from_utf8_lossy(body_bytes);
215 if let Ok(re) = Regex::new(pattern) {
217 if !re.is_match(&body_str) {
218 return false;
219 }
220 } else if body_str.as_ref() != pattern {
221 return false;
222 }
223 } else {
224 return false; }
226 }
227
228 if let Some(json_path) = &criteria.json_path {
230 if let Some(body_bytes) = body {
231 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233 if !json_path_exists(&json_value, json_path) {
235 return false;
236 }
237 }
238 }
239 }
240 }
241
242 if let Some(_xpath) = &criteria.xpath {
244 tracing::warn!("XPath matching not yet fully implemented");
247 }
248
249 if let Some(custom) = &criteria.custom_matcher {
251 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252 return false;
253 }
254 }
255 }
256
257 true
258}
259
260fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262 if pattern == path {
264 return true;
265 }
266
267 if pattern == "*" {
269 return true;
270 }
271
272 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276 if pattern_parts.len() != path_parts.len() {
277 if pattern.contains('*') {
279 return matches_wildcard_pattern(pattern, path);
280 }
281 return false;
282 }
283
284 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287 continue; }
289
290 if pattern_part != path_part {
291 return false;
292 }
293 }
294
295 true
296}
297
298fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300 use regex::Regex;
301
302 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306 return re.is_match(path);
307 }
308
309 false
310}
311
312fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314 if json_path.starts_with("$.") {
317 let path = &json_path[2..];
318 let parts: Vec<&str> = path.split('.').collect();
319
320 let mut current = json;
321 for part in parts {
322 if let Some(obj) = current.as_object() {
323 if let Some(value) = obj.get(part) {
324 current = value;
325 } else {
326 return false;
327 }
328 } else {
329 return false;
330 }
331 }
332 true
333 } else {
334 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336 false
337 }
338}
339
340fn evaluate_custom_matcher(
342 expression: &str,
343 method: &str,
344 path: &str,
345 headers: &std::collections::HashMap<String, String>,
346 query_params: &std::collections::HashMap<String, String>,
347 body: Option<&[u8]>,
348) -> bool {
349 use regex::Regex;
350
351 let expr = expression.trim();
352
353 if expr.contains("==") {
355 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356 if parts.len() != 2 {
357 return false;
358 }
359
360 let field = parts[0];
361 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363 match field {
364 "method" => method == expected_value,
365 "path" => path == expected_value,
366 _ if field.starts_with("headers.") => {
367 let header_name = &field[8..];
368 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369 }
370 _ if field.starts_with("query.") => {
371 let param_name = &field[6..];
372 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373 }
374 _ => false,
375 }
376 }
377 else if expr.contains("=~") {
379 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380 if parts.len() != 2 {
381 return false;
382 }
383
384 let field = parts[0];
385 let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387 if let Ok(re) = Regex::new(pattern) {
388 match field {
389 "method" => re.is_match(method),
390 "path" => re.is_match(path),
391 _ if field.starts_with("headers.") => {
392 let header_name = &field[8..];
393 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394 }
395 _ if field.starts_with("query.") => {
396 let param_name = &field[6..];
397 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398 }
399 _ => false,
400 }
401 } else {
402 false
403 }
404 }
405 else if expr.contains("contains") {
407 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408 if parts.len() != 2 {
409 return false;
410 }
411
412 let field = parts[0];
413 let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415 match field {
416 "path" => path.contains(search_value),
417 _ if field.starts_with("headers.") => {
418 let header_name = &field[8..];
419 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420 }
421 _ if field.starts_with("body") => {
422 if let Some(body_bytes) = body {
423 let body_str = String::from_utf8_lossy(body_bytes);
424 body_str.contains(search_value)
425 } else {
426 false
427 }
428 }
429 _ => false,
430 }
431 } else {
432 tracing::warn!("Unknown custom matcher expression format: {}", expr);
434 false
435 }
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441 pub uptime_seconds: u64,
443 pub total_requests: u64,
445 pub active_mocks: usize,
447 pub enabled_mocks: usize,
449 pub registered_routes: usize,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456 pub version: String,
458 pub port: u16,
460 pub has_openapi_spec: bool,
462 #[serde(skip_serializing_if = "Option::is_none")]
464 pub spec_path: Option<String>,
465}
466
467#[derive(Clone)]
469pub struct ManagementState {
470 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472 pub spec: Option<Arc<OpenApiSpec>>,
474 pub spec_path: Option<String>,
476 pub port: u16,
478 pub start_time: std::time::Instant,
480 pub request_counter: Arc<RwLock<u64>>,
482 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484 #[cfg(feature = "smtp")]
486 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487 #[cfg(feature = "mqtt")]
489 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490 #[cfg(feature = "kafka")]
492 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493 #[cfg(any(feature = "mqtt", feature = "kafka"))]
495 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496 pub state_machine_manager:
498 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503 pub rule_explanations: Arc<
505 RwLock<
506 std::collections::HashMap<
507 String,
508 mockforge_core::intelligent_behavior::RuleExplanation,
509 >,
510 >,
511 >,
512 #[cfg(feature = "chaos")]
514 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
515 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
517}
518
519impl ManagementState {
520 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
527 Self {
528 mocks: Arc::new(RwLock::new(Vec::new())),
529 spec,
530 spec_path,
531 port,
532 start_time: std::time::Instant::now(),
533 request_counter: Arc::new(RwLock::new(0)),
534 proxy_config: None,
535 #[cfg(feature = "smtp")]
536 smtp_registry: None,
537 #[cfg(feature = "mqtt")]
538 mqtt_broker: None,
539 #[cfg(feature = "kafka")]
540 kafka_broker: None,
541 #[cfg(any(feature = "mqtt", feature = "kafka"))]
542 message_events: {
543 let (tx, _) = broadcast::channel(1000);
544 Arc::new(tx)
545 },
546 state_machine_manager: Arc::new(RwLock::new(
547 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
548 )),
549 ws_broadcast: None,
550 lifecycle_hooks: None,
551 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
552 #[cfg(feature = "chaos")]
553 chaos_api_state: None,
554 server_config: None,
555 }
556 }
557
558 pub fn with_lifecycle_hooks(
560 mut self,
561 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
562 ) -> Self {
563 self.lifecycle_hooks = Some(hooks);
564 self
565 }
566
567 pub fn with_ws_broadcast(
569 mut self,
570 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
571 ) -> Self {
572 self.ws_broadcast = Some(ws_broadcast);
573 self
574 }
575
576 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
578 self.proxy_config = Some(proxy_config);
579 self
580 }
581
582 #[cfg(feature = "smtp")]
583 pub fn with_smtp_registry(
585 mut self,
586 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
587 ) -> Self {
588 self.smtp_registry = Some(smtp_registry);
589 self
590 }
591
592 #[cfg(feature = "mqtt")]
593 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
595 self.mqtt_broker = Some(mqtt_broker);
596 self
597 }
598
599 #[cfg(feature = "kafka")]
600 pub fn with_kafka_broker(
602 mut self,
603 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
604 ) -> Self {
605 self.kafka_broker = Some(kafka_broker);
606 self
607 }
608
609 #[cfg(feature = "chaos")]
610 pub fn with_chaos_api_state(
612 mut self,
613 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
614 ) -> Self {
615 self.chaos_api_state = Some(chaos_api_state);
616 self
617 }
618
619 pub fn with_server_config(
621 mut self,
622 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
623 ) -> Self {
624 self.server_config = Some(server_config);
625 self
626 }
627}
628
629async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
631 let mocks = state.mocks.read().await;
632 Json(serde_json::json!({
633 "mocks": *mocks,
634 "total": mocks.len(),
635 "enabled": mocks.iter().filter(|m| m.enabled).count()
636 }))
637}
638
639async fn get_mock(
641 State(state): State<ManagementState>,
642 Path(id): Path<String>,
643) -> Result<Json<MockConfig>, StatusCode> {
644 let mocks = state.mocks.read().await;
645 mocks
646 .iter()
647 .find(|m| m.id == id)
648 .cloned()
649 .map(Json)
650 .ok_or(StatusCode::NOT_FOUND)
651}
652
653async fn create_mock(
655 State(state): State<ManagementState>,
656 Json(mut mock): Json<MockConfig>,
657) -> Result<Json<MockConfig>, StatusCode> {
658 let mut mocks = state.mocks.write().await;
659
660 if mock.id.is_empty() {
662 mock.id = uuid::Uuid::new_v4().to_string();
663 }
664
665 if mocks.iter().any(|m| m.id == mock.id) {
667 return Err(StatusCode::CONFLICT);
668 }
669
670 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
671
672 if let Some(hooks) = &state.lifecycle_hooks {
674 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
675 id: mock.id.clone(),
676 name: mock.name.clone(),
677 config: serde_json::to_value(&mock).unwrap_or_default(),
678 };
679 hooks.invoke_mock_created(&event).await;
680 }
681
682 mocks.push(mock.clone());
683
684 if let Some(tx) = &state.ws_broadcast {
686 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
687 }
688
689 Ok(Json(mock))
690}
691
692async fn update_mock(
694 State(state): State<ManagementState>,
695 Path(id): Path<String>,
696 Json(updated_mock): Json<MockConfig>,
697) -> Result<Json<MockConfig>, StatusCode> {
698 let mut mocks = state.mocks.write().await;
699
700 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
701
702 let old_mock = mocks[position].clone();
704
705 info!("Updating mock: {}", id);
706 mocks[position] = updated_mock.clone();
707
708 if let Some(hooks) = &state.lifecycle_hooks {
710 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
711 id: updated_mock.id.clone(),
712 name: updated_mock.name.clone(),
713 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
714 };
715 hooks.invoke_mock_updated(&event).await;
716
717 if old_mock.enabled != updated_mock.enabled {
719 let state_event = if updated_mock.enabled {
720 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
721 id: updated_mock.id.clone(),
722 }
723 } else {
724 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
725 id: updated_mock.id.clone(),
726 }
727 };
728 hooks.invoke_mock_state_changed(&state_event).await;
729 }
730 }
731
732 if let Some(tx) = &state.ws_broadcast {
734 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
735 }
736
737 Ok(Json(updated_mock))
738}
739
740async fn delete_mock(
742 State(state): State<ManagementState>,
743 Path(id): Path<String>,
744) -> Result<StatusCode, StatusCode> {
745 let mut mocks = state.mocks.write().await;
746
747 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
748
749 let deleted_mock = mocks[position].clone();
751
752 info!("Deleting mock: {}", id);
753 mocks.remove(position);
754
755 if let Some(hooks) = &state.lifecycle_hooks {
757 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
758 id: deleted_mock.id.clone(),
759 name: deleted_mock.name.clone(),
760 };
761 hooks.invoke_mock_deleted(&event).await;
762 }
763
764 if let Some(tx) = &state.ws_broadcast {
766 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
767 }
768
769 Ok(StatusCode::NO_CONTENT)
770}
771
772#[derive(Debug, Deserialize)]
774pub struct ValidateConfigRequest {
775 pub config: serde_json::Value,
777 #[serde(default = "default_format")]
779 pub format: String,
780}
781
782fn default_format() -> String {
783 "json".to_string()
784}
785
786async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
788 use mockforge_core::config::ServerConfig;
789
790 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
791 "yaml" | "yml" => {
792 let yaml_str = match serde_json::to_string(&request.config) {
793 Ok(s) => s,
794 Err(e) => {
795 return (
796 StatusCode::BAD_REQUEST,
797 Json(serde_json::json!({
798 "valid": false,
799 "error": format!("Failed to convert to string: {}", e),
800 "message": "Configuration validation failed"
801 })),
802 )
803 .into_response();
804 }
805 };
806 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
807 }
808 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
809 };
810
811 match config_result {
812 Ok(_) => Json(serde_json::json!({
813 "valid": true,
814 "message": "Configuration is valid"
815 }))
816 .into_response(),
817 Err(e) => (
818 StatusCode::BAD_REQUEST,
819 Json(serde_json::json!({
820 "valid": false,
821 "error": format!("Invalid configuration: {}", e),
822 "message": "Configuration validation failed"
823 })),
824 )
825 .into_response(),
826 }
827}
828
829#[derive(Debug, Deserialize)]
831pub struct BulkConfigUpdateRequest {
832 pub updates: serde_json::Value,
834}
835
836async fn bulk_update_config(
844 State(state): State<ManagementState>,
845 Json(request): Json<BulkConfigUpdateRequest>,
846) -> impl IntoResponse {
847 if !request.updates.is_object() {
849 return (
850 StatusCode::BAD_REQUEST,
851 Json(serde_json::json!({
852 "error": "Invalid request",
853 "message": "Updates must be a JSON object"
854 })),
855 )
856 .into_response();
857 }
858
859 use mockforge_core::config::ServerConfig;
861
862 let base_config = ServerConfig::default();
864 let base_json = match serde_json::to_value(&base_config) {
865 Ok(v) => v,
866 Err(e) => {
867 return (
868 StatusCode::INTERNAL_SERVER_ERROR,
869 Json(serde_json::json!({
870 "error": "Internal error",
871 "message": format!("Failed to serialize base config: {}", e)
872 })),
873 )
874 .into_response();
875 }
876 };
877
878 let mut merged = base_json.clone();
880 if let (Some(merged_obj), Some(updates_obj)) =
881 (merged.as_object_mut(), request.updates.as_object())
882 {
883 for (key, value) in updates_obj {
884 merged_obj.insert(key.clone(), value.clone());
885 }
886 }
887
888 match serde_json::from_value::<ServerConfig>(merged) {
890 Ok(_) => {
891 Json(serde_json::json!({
898 "success": true,
899 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
900 "updates_received": request.updates,
901 "validated": true
902 }))
903 .into_response()
904 }
905 Err(e) => (
906 StatusCode::BAD_REQUEST,
907 Json(serde_json::json!({
908 "error": "Invalid configuration",
909 "message": format!("Configuration validation failed: {}", e),
910 "validated": false
911 })),
912 )
913 .into_response(),
914 }
915}
916
917async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
919 let mocks = state.mocks.read().await;
920 let request_count = *state.request_counter.read().await;
921
922 Json(ServerStats {
923 uptime_seconds: state.start_time.elapsed().as_secs(),
924 total_requests: request_count,
925 active_mocks: mocks.len(),
926 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
927 registered_routes: mocks.len(), })
929}
930
931async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
933 Json(ServerConfig {
934 version: env!("CARGO_PKG_VERSION").to_string(),
935 port: state.port,
936 has_openapi_spec: state.spec.is_some(),
937 spec_path: state.spec_path.clone(),
938 })
939}
940
941async fn health_check() -> Json<serde_json::Value> {
943 Json(serde_json::json!({
944 "status": "healthy",
945 "service": "mockforge-management",
946 "timestamp": chrono::Utc::now().to_rfc3339()
947 }))
948}
949
950#[derive(Debug, Clone, Serialize, Deserialize)]
952#[serde(rename_all = "lowercase")]
953pub enum ExportFormat {
954 Json,
956 Yaml,
958}
959
960async fn export_mocks(
962 State(state): State<ManagementState>,
963 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
964) -> Result<(StatusCode, String), StatusCode> {
965 let mocks = state.mocks.read().await;
966
967 let format = params
968 .get("format")
969 .map(|f| match f.as_str() {
970 "yaml" | "yml" => ExportFormat::Yaml,
971 _ => ExportFormat::Json,
972 })
973 .unwrap_or(ExportFormat::Json);
974
975 match format {
976 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
977 .map(|json| (StatusCode::OK, json))
978 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
979 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
980 .map(|yaml| (StatusCode::OK, yaml))
981 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
982 }
983}
984
985async fn import_mocks(
987 State(state): State<ManagementState>,
988 Json(mocks): Json<Vec<MockConfig>>,
989) -> impl IntoResponse {
990 let mut current_mocks = state.mocks.write().await;
991 current_mocks.clear();
992 current_mocks.extend(mocks);
993 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
994}
995
996#[cfg(feature = "smtp")]
997async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
999 if let Some(ref smtp_registry) = state.smtp_registry {
1000 match smtp_registry.get_emails() {
1001 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1002 Err(e) => (
1003 StatusCode::INTERNAL_SERVER_ERROR,
1004 Json(serde_json::json!({
1005 "error": "Failed to retrieve emails",
1006 "message": e.to_string()
1007 })),
1008 ),
1009 }
1010 } else {
1011 (
1012 StatusCode::NOT_IMPLEMENTED,
1013 Json(serde_json::json!({
1014 "error": "SMTP mailbox management not available",
1015 "message": "SMTP server is not enabled or registry not available."
1016 })),
1017 )
1018 }
1019}
1020
1021#[cfg(feature = "smtp")]
1023async fn get_smtp_email(
1024 State(state): State<ManagementState>,
1025 Path(id): Path<String>,
1026) -> impl IntoResponse {
1027 if let Some(ref smtp_registry) = state.smtp_registry {
1028 match smtp_registry.get_email_by_id(&id) {
1029 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1030 Ok(None) => (
1031 StatusCode::NOT_FOUND,
1032 Json(serde_json::json!({
1033 "error": "Email not found",
1034 "id": id
1035 })),
1036 ),
1037 Err(e) => (
1038 StatusCode::INTERNAL_SERVER_ERROR,
1039 Json(serde_json::json!({
1040 "error": "Failed to retrieve email",
1041 "message": e.to_string()
1042 })),
1043 ),
1044 }
1045 } else {
1046 (
1047 StatusCode::NOT_IMPLEMENTED,
1048 Json(serde_json::json!({
1049 "error": "SMTP mailbox management not available",
1050 "message": "SMTP server is not enabled or registry not available."
1051 })),
1052 )
1053 }
1054}
1055
1056#[cfg(feature = "smtp")]
1058async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1059 if let Some(ref smtp_registry) = state.smtp_registry {
1060 match smtp_registry.clear_mailbox() {
1061 Ok(()) => (
1062 StatusCode::OK,
1063 Json(serde_json::json!({
1064 "message": "Mailbox cleared successfully"
1065 })),
1066 ),
1067 Err(e) => (
1068 StatusCode::INTERNAL_SERVER_ERROR,
1069 Json(serde_json::json!({
1070 "error": "Failed to clear mailbox",
1071 "message": e.to_string()
1072 })),
1073 ),
1074 }
1075 } else {
1076 (
1077 StatusCode::NOT_IMPLEMENTED,
1078 Json(serde_json::json!({
1079 "error": "SMTP mailbox management not available",
1080 "message": "SMTP server is not enabled or registry not available."
1081 })),
1082 )
1083 }
1084}
1085
1086#[cfg(feature = "smtp")]
1088async fn export_smtp_mailbox(
1089 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1090) -> impl IntoResponse {
1091 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1092 (
1093 StatusCode::NOT_IMPLEMENTED,
1094 Json(serde_json::json!({
1095 "error": "SMTP mailbox management not available via HTTP API",
1096 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1097 "requested_format": format
1098 })),
1099 )
1100}
1101
1102#[cfg(feature = "smtp")]
1104async fn search_smtp_emails(
1105 State(state): State<ManagementState>,
1106 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1107) -> impl IntoResponse {
1108 if let Some(ref smtp_registry) = state.smtp_registry {
1109 let filters = EmailSearchFilters {
1110 sender: params.get("sender").cloned(),
1111 recipient: params.get("recipient").cloned(),
1112 subject: params.get("subject").cloned(),
1113 body: params.get("body").cloned(),
1114 since: params
1115 .get("since")
1116 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1117 .map(|dt| dt.with_timezone(&chrono::Utc)),
1118 until: params
1119 .get("until")
1120 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1121 .map(|dt| dt.with_timezone(&chrono::Utc)),
1122 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1123 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1124 };
1125
1126 match smtp_registry.search_emails(filters) {
1127 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1128 Err(e) => (
1129 StatusCode::INTERNAL_SERVER_ERROR,
1130 Json(serde_json::json!({
1131 "error": "Failed to search emails",
1132 "message": e.to_string()
1133 })),
1134 ),
1135 }
1136 } else {
1137 (
1138 StatusCode::NOT_IMPLEMENTED,
1139 Json(serde_json::json!({
1140 "error": "SMTP mailbox management not available",
1141 "message": "SMTP server is not enabled or registry not available."
1142 })),
1143 )
1144 }
1145}
1146
1147#[cfg(feature = "mqtt")]
1149#[derive(Debug, Clone, Serialize, Deserialize)]
1150pub struct MqttBrokerStats {
1151 pub connected_clients: usize,
1153 pub active_topics: usize,
1155 pub retained_messages: usize,
1157 pub total_subscriptions: usize,
1159}
1160
1161#[cfg(feature = "mqtt")]
1163async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1164 if let Some(broker) = &state.mqtt_broker {
1165 let connected_clients = broker.get_connected_clients().await.len();
1166 let active_topics = broker.get_active_topics().await.len();
1167 let stats = broker.get_topic_stats().await;
1168
1169 let broker_stats = MqttBrokerStats {
1170 connected_clients,
1171 active_topics,
1172 retained_messages: stats.retained_messages,
1173 total_subscriptions: stats.total_subscriptions,
1174 };
1175
1176 Json(broker_stats).into_response()
1177 } else {
1178 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1179 }
1180}
1181
1182#[cfg(feature = "mqtt")]
1183async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1184 if let Some(broker) = &state.mqtt_broker {
1185 let clients = broker.get_connected_clients().await;
1186 Json(serde_json::json!({
1187 "clients": clients
1188 }))
1189 .into_response()
1190 } else {
1191 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1192 }
1193}
1194
1195#[cfg(feature = "mqtt")]
1196async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1197 if let Some(broker) = &state.mqtt_broker {
1198 let topics = broker.get_active_topics().await;
1199 Json(serde_json::json!({
1200 "topics": topics
1201 }))
1202 .into_response()
1203 } else {
1204 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1205 }
1206}
1207
1208#[cfg(feature = "mqtt")]
1209async fn disconnect_mqtt_client(
1210 State(state): State<ManagementState>,
1211 Path(client_id): Path<String>,
1212) -> impl IntoResponse {
1213 if let Some(broker) = &state.mqtt_broker {
1214 match broker.disconnect_client(&client_id).await {
1215 Ok(_) => {
1216 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1217 }
1218 Err(e) => {
1219 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1220 .into_response()
1221 }
1222 }
1223 } else {
1224 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1225 }
1226}
1227
1228#[cfg(feature = "mqtt")]
1231#[derive(Debug, Deserialize)]
1233pub struct MqttPublishRequest {
1234 pub topic: String,
1236 pub payload: String,
1238 #[serde(default = "default_qos")]
1240 pub qos: u8,
1241 #[serde(default)]
1243 pub retain: bool,
1244}
1245
1246#[cfg(feature = "mqtt")]
1247fn default_qos() -> u8 {
1248 0
1249}
1250
1251#[cfg(feature = "mqtt")]
1252async fn publish_mqtt_message_handler(
1254 State(state): State<ManagementState>,
1255 Json(request): Json<serde_json::Value>,
1256) -> impl IntoResponse {
1257 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1259 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1260 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1261 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1262
1263 if topic.is_none() || payload.is_none() {
1264 return (
1265 StatusCode::BAD_REQUEST,
1266 Json(serde_json::json!({
1267 "error": "Invalid request",
1268 "message": "Missing required fields: topic and payload"
1269 })),
1270 );
1271 }
1272
1273 let topic = topic.unwrap();
1274 let payload = payload.unwrap();
1275
1276 if let Some(broker) = &state.mqtt_broker {
1277 if qos > 2 {
1279 return (
1280 StatusCode::BAD_REQUEST,
1281 Json(serde_json::json!({
1282 "error": "Invalid QoS",
1283 "message": "QoS must be 0, 1, or 2"
1284 })),
1285 );
1286 }
1287
1288 let payload_bytes = payload.as_bytes().to_vec();
1290 let client_id = "mockforge-management-api".to_string();
1291
1292 let publish_result = broker
1293 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1294 .await
1295 .map_err(|e| format!("{}", e));
1296
1297 match publish_result {
1298 Ok(_) => {
1299 let event = MessageEvent::Mqtt(MqttMessageEvent {
1301 topic: topic.clone(),
1302 payload: payload.clone(),
1303 qos,
1304 retain,
1305 timestamp: chrono::Utc::now().to_rfc3339(),
1306 });
1307 let _ = state.message_events.send(event);
1308
1309 (
1310 StatusCode::OK,
1311 Json(serde_json::json!({
1312 "success": true,
1313 "message": format!("Message published to topic '{}'", topic),
1314 "topic": topic,
1315 "qos": qos,
1316 "retain": retain
1317 })),
1318 )
1319 }
1320 Err(error_msg) => (
1321 StatusCode::INTERNAL_SERVER_ERROR,
1322 Json(serde_json::json!({
1323 "error": "Failed to publish message",
1324 "message": error_msg
1325 })),
1326 ),
1327 }
1328 } else {
1329 (
1330 StatusCode::SERVICE_UNAVAILABLE,
1331 Json(serde_json::json!({
1332 "error": "MQTT broker not available",
1333 "message": "MQTT broker is not enabled or not available."
1334 })),
1335 )
1336 }
1337}
1338
1339#[cfg(not(feature = "mqtt"))]
1340async fn publish_mqtt_message_handler(
1342 State(_state): State<ManagementState>,
1343 Json(_request): Json<serde_json::Value>,
1344) -> impl IntoResponse {
1345 (
1346 StatusCode::SERVICE_UNAVAILABLE,
1347 Json(serde_json::json!({
1348 "error": "MQTT feature not enabled",
1349 "message": "MQTT support is not compiled into this build"
1350 })),
1351 )
1352}
1353
1354#[cfg(feature = "mqtt")]
1355#[derive(Debug, Deserialize)]
1357pub struct MqttBatchPublishRequest {
1358 pub messages: Vec<MqttPublishRequest>,
1360 #[serde(default = "default_delay")]
1362 pub delay_ms: u64,
1363}
1364
1365#[cfg(feature = "mqtt")]
1366fn default_delay() -> u64 {
1367 100
1368}
1369
1370#[cfg(feature = "mqtt")]
1371async fn publish_mqtt_batch_handler(
1373 State(state): State<ManagementState>,
1374 Json(request): Json<serde_json::Value>,
1375) -> impl IntoResponse {
1376 let messages_json = request.get("messages").and_then(|v| v.as_array());
1378 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1379
1380 if messages_json.is_none() {
1381 return (
1382 StatusCode::BAD_REQUEST,
1383 Json(serde_json::json!({
1384 "error": "Invalid request",
1385 "message": "Missing required field: messages"
1386 })),
1387 );
1388 }
1389
1390 let messages_json = messages_json.unwrap();
1391
1392 if let Some(broker) = &state.mqtt_broker {
1393 if messages_json.is_empty() {
1394 return (
1395 StatusCode::BAD_REQUEST,
1396 Json(serde_json::json!({
1397 "error": "Empty batch",
1398 "message": "At least one message is required"
1399 })),
1400 );
1401 }
1402
1403 let mut results = Vec::new();
1404 let client_id = "mockforge-management-api".to_string();
1405
1406 for (index, msg_json) in messages_json.iter().enumerate() {
1407 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1408 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1409 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1410 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1411
1412 if topic.is_none() || payload.is_none() {
1413 results.push(serde_json::json!({
1414 "index": index,
1415 "success": false,
1416 "error": "Missing required fields: topic and payload"
1417 }));
1418 continue;
1419 }
1420
1421 let topic = topic.unwrap();
1422 let payload = payload.unwrap();
1423
1424 if qos > 2 {
1426 results.push(serde_json::json!({
1427 "index": index,
1428 "success": false,
1429 "error": "Invalid QoS (must be 0, 1, or 2)"
1430 }));
1431 continue;
1432 }
1433
1434 let payload_bytes = payload.as_bytes().to_vec();
1436
1437 let publish_result = broker
1438 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1439 .await
1440 .map_err(|e| format!("{}", e));
1441
1442 match publish_result {
1443 Ok(_) => {
1444 let event = MessageEvent::Mqtt(MqttMessageEvent {
1446 topic: topic.clone(),
1447 payload: payload.clone(),
1448 qos,
1449 retain,
1450 timestamp: chrono::Utc::now().to_rfc3339(),
1451 });
1452 let _ = state.message_events.send(event);
1453
1454 results.push(serde_json::json!({
1455 "index": index,
1456 "success": true,
1457 "topic": topic,
1458 "qos": qos
1459 }));
1460 }
1461 Err(error_msg) => {
1462 results.push(serde_json::json!({
1463 "index": index,
1464 "success": false,
1465 "error": error_msg
1466 }));
1467 }
1468 }
1469
1470 if index < messages_json.len() - 1 && delay_ms > 0 {
1472 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1473 }
1474 }
1475
1476 let success_count =
1477 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1478
1479 (
1480 StatusCode::OK,
1481 Json(serde_json::json!({
1482 "success": true,
1483 "total": messages_json.len(),
1484 "succeeded": success_count,
1485 "failed": messages_json.len() - success_count,
1486 "results": results
1487 })),
1488 )
1489 } else {
1490 (
1491 StatusCode::SERVICE_UNAVAILABLE,
1492 Json(serde_json::json!({
1493 "error": "MQTT broker not available",
1494 "message": "MQTT broker is not enabled or not available."
1495 })),
1496 )
1497 }
1498}
1499
1500#[cfg(not(feature = "mqtt"))]
1501async fn publish_mqtt_batch_handler(
1503 State(_state): State<ManagementState>,
1504 Json(_request): Json<serde_json::Value>,
1505) -> impl IntoResponse {
1506 (
1507 StatusCode::SERVICE_UNAVAILABLE,
1508 Json(serde_json::json!({
1509 "error": "MQTT feature not enabled",
1510 "message": "MQTT support is not compiled into this build"
1511 })),
1512 )
1513}
1514
1515#[derive(Debug, Deserialize)]
1519struct SetMigrationModeRequest {
1520 mode: String,
1521}
1522
1523async fn get_migration_routes(
1525 State(state): State<ManagementState>,
1526) -> Result<Json<serde_json::Value>, StatusCode> {
1527 let proxy_config = match &state.proxy_config {
1528 Some(config) => config,
1529 None => {
1530 return Ok(Json(serde_json::json!({
1531 "error": "Migration not configured. Proxy config not available."
1532 })));
1533 }
1534 };
1535
1536 let config = proxy_config.read().await;
1537 let routes = config.get_migration_routes();
1538
1539 Ok(Json(serde_json::json!({
1540 "routes": routes
1541 })))
1542}
1543
1544async fn toggle_route_migration(
1546 State(state): State<ManagementState>,
1547 Path(pattern): Path<String>,
1548) -> Result<Json<serde_json::Value>, StatusCode> {
1549 let proxy_config = match &state.proxy_config {
1550 Some(config) => config,
1551 None => {
1552 return Ok(Json(serde_json::json!({
1553 "error": "Migration not configured. Proxy config not available."
1554 })));
1555 }
1556 };
1557
1558 let mut config = proxy_config.write().await;
1559 let new_mode = match config.toggle_route_migration(&pattern) {
1560 Some(mode) => mode,
1561 None => {
1562 return Ok(Json(serde_json::json!({
1563 "error": format!("Route pattern not found: {}", pattern)
1564 })));
1565 }
1566 };
1567
1568 Ok(Json(serde_json::json!({
1569 "pattern": pattern,
1570 "mode": format!("{:?}", new_mode).to_lowercase()
1571 })))
1572}
1573
1574async fn set_route_migration_mode(
1576 State(state): State<ManagementState>,
1577 Path(pattern): Path<String>,
1578 Json(request): Json<SetMigrationModeRequest>,
1579) -> Result<Json<serde_json::Value>, StatusCode> {
1580 let proxy_config = match &state.proxy_config {
1581 Some(config) => config,
1582 None => {
1583 return Ok(Json(serde_json::json!({
1584 "error": "Migration not configured. Proxy config not available."
1585 })));
1586 }
1587 };
1588
1589 use mockforge_core::proxy::config::MigrationMode;
1590 let mode = match request.mode.to_lowercase().as_str() {
1591 "mock" => MigrationMode::Mock,
1592 "shadow" => MigrationMode::Shadow,
1593 "real" => MigrationMode::Real,
1594 "auto" => MigrationMode::Auto,
1595 _ => {
1596 return Ok(Json(serde_json::json!({
1597 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1598 })));
1599 }
1600 };
1601
1602 let mut config = proxy_config.write().await;
1603 let updated = config.update_rule_migration_mode(&pattern, mode);
1604
1605 if !updated {
1606 return Ok(Json(serde_json::json!({
1607 "error": format!("Route pattern not found: {}", pattern)
1608 })));
1609 }
1610
1611 Ok(Json(serde_json::json!({
1612 "pattern": pattern,
1613 "mode": format!("{:?}", mode).to_lowercase()
1614 })))
1615}
1616
1617async fn toggle_group_migration(
1619 State(state): State<ManagementState>,
1620 Path(group): Path<String>,
1621) -> Result<Json<serde_json::Value>, StatusCode> {
1622 let proxy_config = match &state.proxy_config {
1623 Some(config) => config,
1624 None => {
1625 return Ok(Json(serde_json::json!({
1626 "error": "Migration not configured. Proxy config not available."
1627 })));
1628 }
1629 };
1630
1631 let mut config = proxy_config.write().await;
1632 let new_mode = config.toggle_group_migration(&group);
1633
1634 Ok(Json(serde_json::json!({
1635 "group": group,
1636 "mode": format!("{:?}", new_mode).to_lowercase()
1637 })))
1638}
1639
1640async fn set_group_migration_mode(
1642 State(state): State<ManagementState>,
1643 Path(group): Path<String>,
1644 Json(request): Json<SetMigrationModeRequest>,
1645) -> Result<Json<serde_json::Value>, StatusCode> {
1646 let proxy_config = match &state.proxy_config {
1647 Some(config) => config,
1648 None => {
1649 return Ok(Json(serde_json::json!({
1650 "error": "Migration not configured. Proxy config not available."
1651 })));
1652 }
1653 };
1654
1655 use mockforge_core::proxy::config::MigrationMode;
1656 let mode = match request.mode.to_lowercase().as_str() {
1657 "mock" => MigrationMode::Mock,
1658 "shadow" => MigrationMode::Shadow,
1659 "real" => MigrationMode::Real,
1660 "auto" => MigrationMode::Auto,
1661 _ => {
1662 return Ok(Json(serde_json::json!({
1663 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1664 })));
1665 }
1666 };
1667
1668 let mut config = proxy_config.write().await;
1669 config.update_group_migration_mode(&group, mode);
1670
1671 Ok(Json(serde_json::json!({
1672 "group": group,
1673 "mode": format!("{:?}", mode).to_lowercase()
1674 })))
1675}
1676
1677async fn get_migration_groups(
1679 State(state): State<ManagementState>,
1680) -> Result<Json<serde_json::Value>, StatusCode> {
1681 let proxy_config = match &state.proxy_config {
1682 Some(config) => config,
1683 None => {
1684 return Ok(Json(serde_json::json!({
1685 "error": "Migration not configured. Proxy config not available."
1686 })));
1687 }
1688 };
1689
1690 let config = proxy_config.read().await;
1691 let groups = config.get_migration_groups();
1692
1693 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1695 .into_iter()
1696 .map(|(name, info)| {
1697 (
1698 name,
1699 serde_json::json!({
1700 "name": info.name,
1701 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1702 "route_count": info.route_count
1703 }),
1704 )
1705 })
1706 .collect();
1707
1708 Ok(Json(serde_json::json!(groups_json)))
1709}
1710
1711async fn get_migration_status(
1713 State(state): State<ManagementState>,
1714) -> Result<Json<serde_json::Value>, StatusCode> {
1715 let proxy_config = match &state.proxy_config {
1716 Some(config) => config,
1717 None => {
1718 return Ok(Json(serde_json::json!({
1719 "error": "Migration not configured. Proxy config not available."
1720 })));
1721 }
1722 };
1723
1724 let config = proxy_config.read().await;
1725 let routes = config.get_migration_routes();
1726 let groups = config.get_migration_groups();
1727
1728 let mut mock_count = 0;
1729 let mut shadow_count = 0;
1730 let mut real_count = 0;
1731 let mut auto_count = 0;
1732
1733 for route in &routes {
1734 match route.migration_mode {
1735 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1736 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1737 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1738 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1739 }
1740 }
1741
1742 Ok(Json(serde_json::json!({
1743 "total_routes": routes.len(),
1744 "mock_routes": mock_count,
1745 "shadow_routes": shadow_count,
1746 "real_routes": real_count,
1747 "auto_routes": auto_count,
1748 "total_groups": groups.len(),
1749 "migration_enabled": config.migration_enabled
1750 })))
1751}
1752
1753#[derive(Debug, Deserialize, Serialize)]
1757pub struct ProxyRuleRequest {
1758 pub pattern: String,
1760 #[serde(rename = "type")]
1762 pub rule_type: String,
1763 #[serde(default)]
1765 pub status_codes: Vec<u16>,
1766 pub body_transforms: Vec<BodyTransformRequest>,
1768 #[serde(default = "default_true")]
1770 pub enabled: bool,
1771}
1772
1773#[derive(Debug, Deserialize, Serialize)]
1775pub struct BodyTransformRequest {
1776 pub path: String,
1778 pub replace: String,
1780 #[serde(default)]
1782 pub operation: String,
1783}
1784
1785#[derive(Debug, Serialize)]
1787pub struct ProxyRuleResponse {
1788 pub id: usize,
1790 pub pattern: String,
1792 #[serde(rename = "type")]
1794 pub rule_type: String,
1795 pub status_codes: Vec<u16>,
1797 pub body_transforms: Vec<BodyTransformRequest>,
1799 pub enabled: bool,
1801}
1802
1803async fn list_proxy_rules(
1805 State(state): State<ManagementState>,
1806) -> Result<Json<serde_json::Value>, StatusCode> {
1807 let proxy_config = match &state.proxy_config {
1808 Some(config) => config,
1809 None => {
1810 return Ok(Json(serde_json::json!({
1811 "error": "Proxy not configured. Proxy config not available."
1812 })));
1813 }
1814 };
1815
1816 let config = proxy_config.read().await;
1817
1818 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1819
1820 for (idx, rule) in config.request_replacements.iter().enumerate() {
1822 rules.push(ProxyRuleResponse {
1823 id: idx,
1824 pattern: rule.pattern.clone(),
1825 rule_type: "request".to_string(),
1826 status_codes: Vec::new(),
1827 body_transforms: rule
1828 .body_transforms
1829 .iter()
1830 .map(|t| BodyTransformRequest {
1831 path: t.path.clone(),
1832 replace: t.replace.clone(),
1833 operation: format!("{:?}", t.operation).to_lowercase(),
1834 })
1835 .collect(),
1836 enabled: rule.enabled,
1837 });
1838 }
1839
1840 let request_count = config.request_replacements.len();
1842 for (idx, rule) in config.response_replacements.iter().enumerate() {
1843 rules.push(ProxyRuleResponse {
1844 id: request_count + idx,
1845 pattern: rule.pattern.clone(),
1846 rule_type: "response".to_string(),
1847 status_codes: rule.status_codes.clone(),
1848 body_transforms: rule
1849 .body_transforms
1850 .iter()
1851 .map(|t| BodyTransformRequest {
1852 path: t.path.clone(),
1853 replace: t.replace.clone(),
1854 operation: format!("{:?}", t.operation).to_lowercase(),
1855 })
1856 .collect(),
1857 enabled: rule.enabled,
1858 });
1859 }
1860
1861 Ok(Json(serde_json::json!({
1862 "rules": rules
1863 })))
1864}
1865
1866async fn create_proxy_rule(
1868 State(state): State<ManagementState>,
1869 Json(request): Json<ProxyRuleRequest>,
1870) -> Result<Json<serde_json::Value>, StatusCode> {
1871 let proxy_config = match &state.proxy_config {
1872 Some(config) => config,
1873 None => {
1874 return Ok(Json(serde_json::json!({
1875 "error": "Proxy not configured. Proxy config not available."
1876 })));
1877 }
1878 };
1879
1880 if request.body_transforms.is_empty() {
1882 return Ok(Json(serde_json::json!({
1883 "error": "At least one body transform is required"
1884 })));
1885 }
1886
1887 let body_transforms: Vec<BodyTransform> = request
1888 .body_transforms
1889 .iter()
1890 .map(|t| {
1891 let op = match t.operation.as_str() {
1892 "replace" => TransformOperation::Replace,
1893 "add" => TransformOperation::Add,
1894 "remove" => TransformOperation::Remove,
1895 _ => TransformOperation::Replace,
1896 };
1897 BodyTransform {
1898 path: t.path.clone(),
1899 replace: t.replace.clone(),
1900 operation: op,
1901 }
1902 })
1903 .collect();
1904
1905 let new_rule = BodyTransformRule {
1906 pattern: request.pattern.clone(),
1907 status_codes: request.status_codes.clone(),
1908 body_transforms,
1909 enabled: request.enabled,
1910 };
1911
1912 let mut config = proxy_config.write().await;
1913
1914 let rule_id = if request.rule_type == "request" {
1915 config.request_replacements.push(new_rule);
1916 config.request_replacements.len() - 1
1917 } else if request.rule_type == "response" {
1918 config.response_replacements.push(new_rule);
1919 config.request_replacements.len() + config.response_replacements.len() - 1
1920 } else {
1921 return Ok(Json(serde_json::json!({
1922 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1923 })));
1924 };
1925
1926 Ok(Json(serde_json::json!({
1927 "id": rule_id,
1928 "message": "Rule created successfully"
1929 })))
1930}
1931
1932async fn get_proxy_rule(
1934 State(state): State<ManagementState>,
1935 Path(id): Path<String>,
1936) -> Result<Json<serde_json::Value>, StatusCode> {
1937 let proxy_config = match &state.proxy_config {
1938 Some(config) => config,
1939 None => {
1940 return Ok(Json(serde_json::json!({
1941 "error": "Proxy not configured. Proxy config not available."
1942 })));
1943 }
1944 };
1945
1946 let config = proxy_config.read().await;
1947 let rule_id: usize = match id.parse() {
1948 Ok(id) => id,
1949 Err(_) => {
1950 return Ok(Json(serde_json::json!({
1951 "error": format!("Invalid rule ID: {}", id)
1952 })));
1953 }
1954 };
1955
1956 let request_count = config.request_replacements.len();
1957
1958 if rule_id < request_count {
1959 let rule = &config.request_replacements[rule_id];
1961 Ok(Json(serde_json::json!({
1962 "id": rule_id,
1963 "pattern": rule.pattern,
1964 "type": "request",
1965 "status_codes": [],
1966 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1967 "path": t.path,
1968 "replace": t.replace,
1969 "operation": format!("{:?}", t.operation).to_lowercase()
1970 })).collect::<Vec<_>>(),
1971 "enabled": rule.enabled
1972 })))
1973 } else if rule_id < request_count + config.response_replacements.len() {
1974 let response_idx = rule_id - request_count;
1976 let rule = &config.response_replacements[response_idx];
1977 Ok(Json(serde_json::json!({
1978 "id": rule_id,
1979 "pattern": rule.pattern,
1980 "type": "response",
1981 "status_codes": rule.status_codes,
1982 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1983 "path": t.path,
1984 "replace": t.replace,
1985 "operation": format!("{:?}", t.operation).to_lowercase()
1986 })).collect::<Vec<_>>(),
1987 "enabled": rule.enabled
1988 })))
1989 } else {
1990 Ok(Json(serde_json::json!({
1991 "error": format!("Rule ID {} not found", rule_id)
1992 })))
1993 }
1994}
1995
1996async fn update_proxy_rule(
1998 State(state): State<ManagementState>,
1999 Path(id): Path<String>,
2000 Json(request): Json<ProxyRuleRequest>,
2001) -> Result<Json<serde_json::Value>, StatusCode> {
2002 let proxy_config = match &state.proxy_config {
2003 Some(config) => config,
2004 None => {
2005 return Ok(Json(serde_json::json!({
2006 "error": "Proxy not configured. Proxy config not available."
2007 })));
2008 }
2009 };
2010
2011 let mut config = proxy_config.write().await;
2012 let rule_id: usize = match id.parse() {
2013 Ok(id) => id,
2014 Err(_) => {
2015 return Ok(Json(serde_json::json!({
2016 "error": format!("Invalid rule ID: {}", id)
2017 })));
2018 }
2019 };
2020
2021 let body_transforms: Vec<BodyTransform> = request
2022 .body_transforms
2023 .iter()
2024 .map(|t| {
2025 let op = match t.operation.as_str() {
2026 "replace" => TransformOperation::Replace,
2027 "add" => TransformOperation::Add,
2028 "remove" => TransformOperation::Remove,
2029 _ => TransformOperation::Replace,
2030 };
2031 BodyTransform {
2032 path: t.path.clone(),
2033 replace: t.replace.clone(),
2034 operation: op,
2035 }
2036 })
2037 .collect();
2038
2039 let updated_rule = BodyTransformRule {
2040 pattern: request.pattern.clone(),
2041 status_codes: request.status_codes.clone(),
2042 body_transforms,
2043 enabled: request.enabled,
2044 };
2045
2046 let request_count = config.request_replacements.len();
2047
2048 if rule_id < request_count {
2049 config.request_replacements[rule_id] = updated_rule;
2051 } else if rule_id < request_count + config.response_replacements.len() {
2052 let response_idx = rule_id - request_count;
2054 config.response_replacements[response_idx] = updated_rule;
2055 } else {
2056 return Ok(Json(serde_json::json!({
2057 "error": format!("Rule ID {} not found", rule_id)
2058 })));
2059 }
2060
2061 Ok(Json(serde_json::json!({
2062 "id": rule_id,
2063 "message": "Rule updated successfully"
2064 })))
2065}
2066
2067async fn delete_proxy_rule(
2069 State(state): State<ManagementState>,
2070 Path(id): Path<String>,
2071) -> Result<Json<serde_json::Value>, StatusCode> {
2072 let proxy_config = match &state.proxy_config {
2073 Some(config) => config,
2074 None => {
2075 return Ok(Json(serde_json::json!({
2076 "error": "Proxy not configured. Proxy config not available."
2077 })));
2078 }
2079 };
2080
2081 let mut config = proxy_config.write().await;
2082 let rule_id: usize = match id.parse() {
2083 Ok(id) => id,
2084 Err(_) => {
2085 return Ok(Json(serde_json::json!({
2086 "error": format!("Invalid rule ID: {}", id)
2087 })));
2088 }
2089 };
2090
2091 let request_count = config.request_replacements.len();
2092
2093 if rule_id < request_count {
2094 config.request_replacements.remove(rule_id);
2096 } else if rule_id < request_count + config.response_replacements.len() {
2097 let response_idx = rule_id - request_count;
2099 config.response_replacements.remove(response_idx);
2100 } else {
2101 return Ok(Json(serde_json::json!({
2102 "error": format!("Rule ID {} not found", rule_id)
2103 })));
2104 }
2105
2106 Ok(Json(serde_json::json!({
2107 "id": rule_id,
2108 "message": "Rule deleted successfully"
2109 })))
2110}
2111
2112async fn get_proxy_inspect(
2115 State(_state): State<ManagementState>,
2116 Query(params): Query<std::collections::HashMap<String, String>>,
2117) -> Result<Json<serde_json::Value>, StatusCode> {
2118 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2119
2120 Ok(Json(serde_json::json!({
2126 "requests": [],
2127 "responses": [],
2128 "limit": limit,
2129 "total": 0,
2130 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2131 })))
2132}
2133
2134pub fn management_router(state: ManagementState) -> Router {
2136 let router = Router::new()
2137 .route("/health", get(health_check))
2138 .route("/stats", get(get_stats))
2139 .route("/config", get(get_config))
2140 .route("/config/validate", post(validate_config))
2141 .route("/config/bulk", post(bulk_update_config))
2142 .route("/mocks", get(list_mocks))
2143 .route("/mocks", post(create_mock))
2144 .route("/mocks/{id}", get(get_mock))
2145 .route("/mocks/{id}", put(update_mock))
2146 .route("/mocks/{id}", delete(delete_mock))
2147 .route("/export", get(export_mocks))
2148 .route("/import", post(import_mocks));
2149
2150 #[cfg(feature = "smtp")]
2151 let router = router
2152 .route("/smtp/mailbox", get(list_smtp_emails))
2153 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2154 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2155 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2156 .route("/smtp/mailbox/search", get(search_smtp_emails));
2157
2158 #[cfg(not(feature = "smtp"))]
2159 let router = router;
2160
2161 #[cfg(feature = "mqtt")]
2163 let router = router
2164 .route("/mqtt/stats", get(get_mqtt_stats))
2165 .route("/mqtt/clients", get(get_mqtt_clients))
2166 .route("/mqtt/topics", get(get_mqtt_topics))
2167 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2168 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2169 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2170 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2171
2172 #[cfg(not(feature = "mqtt"))]
2173 let router = router
2174 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2175 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2176
2177 #[cfg(feature = "kafka")]
2178 let router = router
2179 .route("/kafka/stats", get(get_kafka_stats))
2180 .route("/kafka/topics", get(get_kafka_topics))
2181 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2182 .route("/kafka/groups", get(get_kafka_groups))
2183 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2184 .route("/kafka/produce", post(produce_kafka_message))
2185 .route("/kafka/produce/batch", post(produce_kafka_batch))
2186 .route("/kafka/messages/stream", get(kafka_messages_stream));
2187
2188 #[cfg(not(feature = "kafka"))]
2189 let router = router;
2190
2191 let router = router
2193 .route("/migration/routes", get(get_migration_routes))
2194 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2195 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2196 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2197 .route("/migration/groups/{group}", put(set_group_migration_mode))
2198 .route("/migration/groups", get(get_migration_groups))
2199 .route("/migration/status", get(get_migration_status));
2200
2201 let router = router
2203 .route("/proxy/rules", get(list_proxy_rules))
2204 .route("/proxy/rules", post(create_proxy_rule))
2205 .route("/proxy/rules/{id}", get(get_proxy_rule))
2206 .route("/proxy/rules/{id}", put(update_proxy_rule))
2207 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2208 .route("/proxy/inspect", get(get_proxy_inspect));
2209
2210 let router = router
2212 .route("/ai/generate-spec", post(generate_ai_spec))
2213 .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2214 .route("/mockai/learn", post(learn_from_examples))
2215 .route("/mockai/rules/explanations", get(list_rule_explanations))
2216 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2217 .route("/chaos/config", get(get_chaos_config))
2218 .route("/chaos/config", post(update_chaos_config))
2219 .route("/network/profiles", get(list_network_profiles))
2220 .route("/network/profile/apply", post(apply_network_profile));
2221
2222 let router =
2224 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2225
2226 router.with_state(state)
2227}
2228
2229#[cfg(feature = "kafka")]
2230#[derive(Debug, Clone, Serialize, Deserialize)]
2231pub struct KafkaBrokerStats {
2232 pub topics: usize,
2234 pub partitions: usize,
2236 pub consumer_groups: usize,
2238 pub messages_produced: u64,
2240 pub messages_consumed: u64,
2242}
2243
2244#[cfg(feature = "kafka")]
2245#[derive(Debug, Clone, Serialize, Deserialize)]
2246pub struct KafkaTopicInfo {
2247 pub name: String,
2248 pub partitions: usize,
2249 pub replication_factor: i32,
2250}
2251
2252#[cfg(feature = "kafka")]
2253#[derive(Debug, Clone, Serialize, Deserialize)]
2254pub struct KafkaConsumerGroupInfo {
2255 pub group_id: String,
2256 pub members: usize,
2257 pub state: String,
2258}
2259
2260#[cfg(feature = "kafka")]
2261async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2263 if let Some(broker) = &state.kafka_broker {
2264 let topics = broker.topics.read().await;
2265 let consumer_groups = broker.consumer_groups.read().await;
2266 let metrics = broker.metrics.clone();
2267
2268 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2269 let snapshot = metrics.snapshot();
2270 let messages_produced = snapshot.messages_produced_total;
2271 let messages_consumed = snapshot.messages_consumed_total;
2272
2273 let stats = KafkaBrokerStats {
2274 topics: topics.len(),
2275 partitions: total_partitions,
2276 consumer_groups: consumer_groups.groups().len(),
2277 messages_produced,
2278 messages_consumed,
2279 };
2280
2281 Json(stats).into_response()
2282 } else {
2283 (
2284 StatusCode::SERVICE_UNAVAILABLE,
2285 Json(serde_json::json!({
2286 "error": "Kafka broker not available",
2287 "message": "Kafka broker is not enabled or not available."
2288 })),
2289 )
2290 .into_response()
2291 }
2292}
2293
2294#[cfg(feature = "kafka")]
2295async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2297 if let Some(broker) = &state.kafka_broker {
2298 let topics = broker.topics.read().await;
2299 let topic_list: Vec<KafkaTopicInfo> = topics
2300 .iter()
2301 .map(|(name, topic)| KafkaTopicInfo {
2302 name: name.clone(),
2303 partitions: topic.partitions.len(),
2304 replication_factor: topic.config.replication_factor,
2305 })
2306 .collect();
2307
2308 Json(serde_json::json!({
2309 "topics": topic_list
2310 }))
2311 .into_response()
2312 } else {
2313 (
2314 StatusCode::SERVICE_UNAVAILABLE,
2315 Json(serde_json::json!({
2316 "error": "Kafka broker not available",
2317 "message": "Kafka broker is not enabled or not available."
2318 })),
2319 )
2320 .into_response()
2321 }
2322}
2323
2324#[cfg(feature = "kafka")]
2325async fn get_kafka_topic(
2327 State(state): State<ManagementState>,
2328 Path(topic_name): Path<String>,
2329) -> impl IntoResponse {
2330 if let Some(broker) = &state.kafka_broker {
2331 let topics = broker.topics.read().await;
2332 if let Some(topic) = topics.get(&topic_name) {
2333 Json(serde_json::json!({
2334 "name": topic_name,
2335 "partitions": topic.partitions.len(),
2336 "replication_factor": topic.config.replication_factor,
2337 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2338 "id": idx as i32,
2339 "leader": 0,
2340 "replicas": vec![0],
2341 "message_count": partition.messages.len()
2342 })).collect::<Vec<_>>()
2343 })).into_response()
2344 } else {
2345 (
2346 StatusCode::NOT_FOUND,
2347 Json(serde_json::json!({
2348 "error": "Topic not found",
2349 "topic": topic_name
2350 })),
2351 )
2352 .into_response()
2353 }
2354 } else {
2355 (
2356 StatusCode::SERVICE_UNAVAILABLE,
2357 Json(serde_json::json!({
2358 "error": "Kafka broker not available",
2359 "message": "Kafka broker is not enabled or not available."
2360 })),
2361 )
2362 .into_response()
2363 }
2364}
2365
2366#[cfg(feature = "kafka")]
2367async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2369 if let Some(broker) = &state.kafka_broker {
2370 let consumer_groups = broker.consumer_groups.read().await;
2371 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2372 .groups()
2373 .iter()
2374 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2375 group_id: group_id.clone(),
2376 members: group.members.len(),
2377 state: "Stable".to_string(), })
2379 .collect();
2380
2381 Json(serde_json::json!({
2382 "groups": groups
2383 }))
2384 .into_response()
2385 } else {
2386 (
2387 StatusCode::SERVICE_UNAVAILABLE,
2388 Json(serde_json::json!({
2389 "error": "Kafka broker not available",
2390 "message": "Kafka broker is not enabled or not available."
2391 })),
2392 )
2393 .into_response()
2394 }
2395}
2396
2397#[cfg(feature = "kafka")]
2398async fn get_kafka_group(
2400 State(state): State<ManagementState>,
2401 Path(group_id): Path<String>,
2402) -> impl IntoResponse {
2403 if let Some(broker) = &state.kafka_broker {
2404 let consumer_groups = broker.consumer_groups.read().await;
2405 if let Some(group) = consumer_groups.groups().get(&group_id) {
2406 Json(serde_json::json!({
2407 "group_id": group_id,
2408 "members": group.members.len(),
2409 "state": "Stable",
2410 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2411 "member_id": member_id,
2412 "client_id": member.client_id,
2413 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2414 "topic": a.topic,
2415 "partitions": a.partitions
2416 })).collect::<Vec<_>>()
2417 })).collect::<Vec<_>>(),
2418 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2419 "topic": topic,
2420 "partition": partition,
2421 "offset": offset
2422 })).collect::<Vec<_>>()
2423 })).into_response()
2424 } else {
2425 (
2426 StatusCode::NOT_FOUND,
2427 Json(serde_json::json!({
2428 "error": "Consumer group not found",
2429 "group_id": group_id
2430 })),
2431 )
2432 .into_response()
2433 }
2434 } else {
2435 (
2436 StatusCode::SERVICE_UNAVAILABLE,
2437 Json(serde_json::json!({
2438 "error": "Kafka broker not available",
2439 "message": "Kafka broker is not enabled or not available."
2440 })),
2441 )
2442 .into_response()
2443 }
2444}
2445
2446#[cfg(feature = "kafka")]
2449#[derive(Debug, Deserialize)]
2450pub struct KafkaProduceRequest {
2451 pub topic: String,
2453 #[serde(default)]
2455 pub key: Option<String>,
2456 pub value: String,
2458 #[serde(default)]
2460 pub partition: Option<i32>,
2461 #[serde(default)]
2463 pub headers: Option<std::collections::HashMap<String, String>>,
2464}
2465
2466#[cfg(feature = "kafka")]
2467async fn produce_kafka_message(
2469 State(state): State<ManagementState>,
2470 Json(request): Json<KafkaProduceRequest>,
2471) -> impl IntoResponse {
2472 if let Some(broker) = &state.kafka_broker {
2473 let mut topics = broker.topics.write().await;
2474
2475 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2477 crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2478 });
2479
2480 let partition_id = if let Some(partition) = request.partition {
2482 partition
2483 } else {
2484 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2485 };
2486
2487 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2489 return (
2490 StatusCode::BAD_REQUEST,
2491 Json(serde_json::json!({
2492 "error": "Invalid partition",
2493 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2494 })),
2495 )
2496 .into_response();
2497 }
2498
2499 let message = crate::partitions::KafkaMessage {
2501 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2503 key: request.key.map(|k| k.as_bytes().to_vec()),
2504 value: request.value.as_bytes().to_vec(),
2505 headers: request
2506 .headers
2507 .unwrap_or_default()
2508 .into_iter()
2509 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2510 .collect(),
2511 };
2512
2513 match topic_entry.produce(partition_id, message).await {
2515 Ok(offset) => {
2516 broker.metrics.record_messages_produced(1);
2518
2519 #[cfg(feature = "kafka")]
2521 {
2522 let event = MessageEvent::Kafka(KafkaMessageEvent {
2523 topic: request.topic.clone(),
2524 key: request.key.clone(),
2525 value: request.value.clone(),
2526 partition: partition_id,
2527 offset,
2528 headers: request.headers.clone(),
2529 timestamp: chrono::Utc::now().to_rfc3339(),
2530 });
2531 let _ = state.message_events.send(event);
2532 }
2533
2534 Json(serde_json::json!({
2535 "success": true,
2536 "message": format!("Message produced to topic '{}'", request.topic),
2537 "topic": request.topic,
2538 "partition": partition_id,
2539 "offset": offset
2540 }))
2541 .into_response()
2542 }
2543 Err(e) => (
2544 StatusCode::INTERNAL_SERVER_ERROR,
2545 Json(serde_json::json!({
2546 "error": "Failed to produce message",
2547 "message": e.to_string()
2548 })),
2549 )
2550 .into_response(),
2551 }
2552 } else {
2553 (
2554 StatusCode::SERVICE_UNAVAILABLE,
2555 Json(serde_json::json!({
2556 "error": "Kafka broker not available",
2557 "message": "Kafka broker is not enabled or not available."
2558 })),
2559 )
2560 .into_response()
2561 }
2562}
2563
2564#[cfg(feature = "kafka")]
2565#[derive(Debug, Deserialize)]
2566pub struct KafkaBatchProduceRequest {
2567 pub messages: Vec<KafkaProduceRequest>,
2569 #[serde(default = "default_delay")]
2571 pub delay_ms: u64,
2572}
2573
2574#[cfg(feature = "kafka")]
2575async fn produce_kafka_batch(
2577 State(state): State<ManagementState>,
2578 Json(request): Json<KafkaBatchProduceRequest>,
2579) -> impl IntoResponse {
2580 if let Some(broker) = &state.kafka_broker {
2581 if request.messages.is_empty() {
2582 return (
2583 StatusCode::BAD_REQUEST,
2584 Json(serde_json::json!({
2585 "error": "Empty batch",
2586 "message": "At least one message is required"
2587 })),
2588 )
2589 .into_response();
2590 }
2591
2592 let mut results = Vec::new();
2593
2594 for (index, msg_request) in request.messages.iter().enumerate() {
2595 let mut topics = broker.topics.write().await;
2596
2597 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2599 crate::topics::Topic::new(
2600 msg_request.topic.clone(),
2601 crate::topics::TopicConfig::default(),
2602 )
2603 });
2604
2605 let partition_id = if let Some(partition) = msg_request.partition {
2607 partition
2608 } else {
2609 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2610 };
2611
2612 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2614 results.push(serde_json::json!({
2615 "index": index,
2616 "success": false,
2617 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2618 }));
2619 continue;
2620 }
2621
2622 let message = crate::partitions::KafkaMessage {
2624 offset: 0,
2625 timestamp: chrono::Utc::now().timestamp_millis(),
2626 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2627 value: msg_request.value.as_bytes().to_vec(),
2628 headers: msg_request
2629 .headers
2630 .clone()
2631 .unwrap_or_default()
2632 .into_iter()
2633 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2634 .collect(),
2635 };
2636
2637 match topic_entry.produce(partition_id, message).await {
2639 Ok(offset) => {
2640 broker.metrics.record_messages_produced(1);
2641
2642 let event = MessageEvent::Kafka(KafkaMessageEvent {
2644 topic: msg_request.topic.clone(),
2645 key: msg_request.key.clone(),
2646 value: msg_request.value.clone(),
2647 partition: partition_id,
2648 offset,
2649 headers: msg_request.headers.clone(),
2650 timestamp: chrono::Utc::now().to_rfc3339(),
2651 });
2652 let _ = state.message_events.send(event);
2653
2654 results.push(serde_json::json!({
2655 "index": index,
2656 "success": true,
2657 "topic": msg_request.topic,
2658 "partition": partition_id,
2659 "offset": offset
2660 }));
2661 }
2662 Err(e) => {
2663 results.push(serde_json::json!({
2664 "index": index,
2665 "success": false,
2666 "error": e.to_string()
2667 }));
2668 }
2669 }
2670
2671 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2673 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2674 }
2675 }
2676
2677 let success_count =
2678 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2679
2680 Json(serde_json::json!({
2681 "success": true,
2682 "total": request.messages.len(),
2683 "succeeded": success_count,
2684 "failed": request.messages.len() - success_count,
2685 "results": results
2686 }))
2687 .into_response()
2688 } else {
2689 (
2690 StatusCode::SERVICE_UNAVAILABLE,
2691 Json(serde_json::json!({
2692 "error": "Kafka broker not available",
2693 "message": "Kafka broker is not enabled or not available."
2694 })),
2695 )
2696 .into_response()
2697 }
2698}
2699
2700#[cfg(feature = "mqtt")]
2703async fn mqtt_messages_stream(
2705 State(state): State<ManagementState>,
2706 Query(params): Query<std::collections::HashMap<String, String>>,
2707) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2708 let mut rx = state.message_events.subscribe();
2709 let topic_filter = params.get("topic").cloned();
2710
2711 let stream = stream::unfold(rx, move |mut rx| {
2712 let topic_filter = topic_filter.clone();
2713
2714 async move {
2715 loop {
2716 match rx.recv().await {
2717 Ok(MessageEvent::Mqtt(event)) => {
2718 if let Some(filter) = &topic_filter {
2720 if !event.topic.contains(filter) {
2721 continue;
2722 }
2723 }
2724
2725 let event_json = serde_json::json!({
2726 "protocol": "mqtt",
2727 "topic": event.topic,
2728 "payload": event.payload,
2729 "qos": event.qos,
2730 "retain": event.retain,
2731 "timestamp": event.timestamp,
2732 });
2733
2734 if let Ok(event_data) = serde_json::to_string(&event_json) {
2735 let sse_event = Event::default().event("mqtt_message").data(event_data);
2736 return Some((Ok(sse_event), rx));
2737 }
2738 }
2739 #[cfg(feature = "kafka")]
2740 Ok(MessageEvent::Kafka(_)) => {
2741 continue;
2743 }
2744 Err(broadcast::error::RecvError::Closed) => {
2745 return None;
2746 }
2747 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2748 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2749 continue;
2750 }
2751 }
2752 }
2753 }
2754 });
2755
2756 Sse::new(stream).keep_alive(
2757 axum::response::sse::KeepAlive::new()
2758 .interval(std::time::Duration::from_secs(15))
2759 .text("keep-alive-text"),
2760 )
2761}
2762
2763#[cfg(feature = "kafka")]
2764async fn kafka_messages_stream(
2766 State(state): State<ManagementState>,
2767 Query(params): Query<std::collections::HashMap<String, String>>,
2768) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2769 let mut rx = state.message_events.subscribe();
2770 let topic_filter = params.get("topic").cloned();
2771
2772 let stream = stream::unfold(rx, move |mut rx| {
2773 let topic_filter = topic_filter.clone();
2774
2775 async move {
2776 loop {
2777 match rx.recv().await {
2778 #[cfg(feature = "mqtt")]
2779 Ok(MessageEvent::Mqtt(_)) => {
2780 continue;
2782 }
2783 Ok(MessageEvent::Kafka(event)) => {
2784 if let Some(filter) = &topic_filter {
2786 if !event.topic.contains(filter) {
2787 continue;
2788 }
2789 }
2790
2791 let event_json = serde_json::json!({
2792 "protocol": "kafka",
2793 "topic": event.topic,
2794 "key": event.key,
2795 "value": event.value,
2796 "partition": event.partition,
2797 "offset": event.offset,
2798 "headers": event.headers,
2799 "timestamp": event.timestamp,
2800 });
2801
2802 if let Ok(event_data) = serde_json::to_string(&event_json) {
2803 let sse_event =
2804 Event::default().event("kafka_message").data(event_data);
2805 return Some((Ok(sse_event), rx));
2806 }
2807 }
2808 Err(broadcast::error::RecvError::Closed) => {
2809 return None;
2810 }
2811 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2812 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2813 continue;
2814 }
2815 }
2816 }
2817 }
2818 });
2819
2820 Sse::new(stream).keep_alive(
2821 axum::response::sse::KeepAlive::new()
2822 .interval(std::time::Duration::from_secs(15))
2823 .text("keep-alive-text"),
2824 )
2825}
2826
2827#[derive(Debug, Deserialize)]
2831pub struct GenerateSpecRequest {
2832 pub query: String,
2834 pub spec_type: String,
2836 pub api_version: Option<String>,
2838}
2839
2840#[derive(Debug, Deserialize)]
2842pub struct GenerateOpenApiFromTrafficRequest {
2843 #[serde(default)]
2845 pub database_path: Option<String>,
2846 #[serde(default)]
2848 pub since: Option<String>,
2849 #[serde(default)]
2851 pub until: Option<String>,
2852 #[serde(default)]
2854 pub path_pattern: Option<String>,
2855 #[serde(default = "default_min_confidence")]
2857 pub min_confidence: f64,
2858}
2859
2860fn default_min_confidence() -> f64 {
2861 0.7
2862}
2863
2864#[cfg(feature = "data-faker")]
2866async fn generate_ai_spec(
2867 State(_state): State<ManagementState>,
2868 Json(request): Json<GenerateSpecRequest>,
2869) -> impl IntoResponse {
2870 use mockforge_data::rag::{
2871 config::{EmbeddingProvider, LlmProvider, RagConfig},
2872 engine::RagEngine,
2873 storage::{DocumentStorage, StorageFactory},
2874 };
2875 use std::sync::Arc;
2876
2877 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2879 .ok()
2880 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2881
2882 if api_key.is_none() {
2884 return (
2885 StatusCode::SERVICE_UNAVAILABLE,
2886 Json(serde_json::json!({
2887 "error": "AI service not configured",
2888 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2889 })),
2890 )
2891 .into_response();
2892 }
2893
2894 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2896 .unwrap_or_else(|_| "openai".to_string())
2897 .to_lowercase();
2898
2899 let provider = match provider_str.as_str() {
2900 "openai" => LlmProvider::OpenAI,
2901 "anthropic" => LlmProvider::Anthropic,
2902 "ollama" => LlmProvider::Ollama,
2903 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2904 _ => LlmProvider::OpenAI,
2905 };
2906
2907 let api_endpoint =
2908 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2909 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2910 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2911 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2912 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2913 });
2914
2915 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2916 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2917 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2918 LlmProvider::Ollama => "llama2".to_string(),
2919 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2920 });
2921
2922 let mut rag_config = RagConfig::default();
2924 rag_config.provider = provider;
2925 rag_config.api_endpoint = api_endpoint;
2926 rag_config.api_key = api_key;
2927 rag_config.model = model;
2928 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2929 .unwrap_or_else(|_| "4096".to_string())
2930 .parse()
2931 .unwrap_or(4096);
2932 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2933 .unwrap_or_else(|_| "0.3".to_string())
2934 .parse()
2935 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2937 .unwrap_or_else(|_| "60".to_string())
2938 .parse()
2939 .unwrap_or(60);
2940 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2941 .unwrap_or_else(|_| "4000".to_string())
2942 .parse()
2943 .unwrap_or(4000);
2944
2945 let spec_type_label = match request.spec_type.as_str() {
2947 "openapi" => "OpenAPI 3.0",
2948 "graphql" => "GraphQL",
2949 "asyncapi" => "AsyncAPI",
2950 _ => "OpenAPI 3.0",
2951 };
2952
2953 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2954
2955 let prompt = format!(
2956 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2957
2958User Requirements:
2959{}
2960
2961Instructions:
29621. Generate a complete, valid {} specification
29632. Include all paths, operations, request/response schemas, and components
29643. Use realistic field names and data types
29654. Include proper descriptions and examples
29665. Follow {} best practices
29676. Return ONLY the specification, no additional explanation
29687. For OpenAPI, use version {}
2969
2970Return the specification in {} format."#,
2971 spec_type_label,
2972 request.query,
2973 spec_type_label,
2974 spec_type_label,
2975 api_version,
2976 if request.spec_type == "graphql" {
2977 "GraphQL SDL"
2978 } else {
2979 "YAML"
2980 }
2981 );
2982
2983 use mockforge_data::rag::storage::InMemoryStorage;
2988 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2989
2990 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2992 Ok(engine) => engine,
2993 Err(e) => {
2994 return (
2995 StatusCode::INTERNAL_SERVER_ERROR,
2996 Json(serde_json::json!({
2997 "error": "Failed to initialize RAG engine",
2998 "message": e.to_string()
2999 })),
3000 )
3001 .into_response();
3002 }
3003 };
3004
3005 match rag_engine.generate(&prompt, None).await {
3007 Ok(generated_text) => {
3008 let spec = if request.spec_type == "graphql" {
3010 extract_graphql_schema(&generated_text)
3012 } else {
3013 extract_yaml_spec(&generated_text)
3015 };
3016
3017 Json(serde_json::json!({
3018 "success": true,
3019 "spec": spec,
3020 "spec_type": request.spec_type,
3021 }))
3022 .into_response()
3023 }
3024 Err(e) => (
3025 StatusCode::INTERNAL_SERVER_ERROR,
3026 Json(serde_json::json!({
3027 "error": "AI generation failed",
3028 "message": e.to_string()
3029 })),
3030 )
3031 .into_response(),
3032 }
3033}
3034
3035#[cfg(not(feature = "data-faker"))]
3036async fn generate_ai_spec(
3037 State(_state): State<ManagementState>,
3038 Json(_request): Json<GenerateSpecRequest>,
3039) -> impl IntoResponse {
3040 (
3041 StatusCode::NOT_IMPLEMENTED,
3042 Json(serde_json::json!({
3043 "error": "AI features not enabled",
3044 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3045 })),
3046 )
3047 .into_response()
3048}
3049
3050async fn generate_openapi_from_traffic(
3052 State(_state): State<ManagementState>,
3053 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3054) -> impl IntoResponse {
3055 use chrono::{DateTime, Utc};
3056 use mockforge_core::intelligent_behavior::{
3057 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3058 IntelligentBehaviorConfig,
3059 };
3060 use mockforge_recorder::{
3061 database::RecorderDatabase,
3062 openapi_export::{QueryFilters, RecordingsToOpenApi},
3063 };
3064 use std::path::PathBuf;
3065
3066 let db_path = if let Some(ref path) = request.database_path {
3068 PathBuf::from(path)
3069 } else {
3070 std::env::current_dir()
3071 .unwrap_or_else(|_| PathBuf::from("."))
3072 .join("recordings.db")
3073 };
3074
3075 let db = match RecorderDatabase::new(&db_path).await {
3077 Ok(db) => db,
3078 Err(e) => {
3079 return (
3080 StatusCode::BAD_REQUEST,
3081 Json(serde_json::json!({
3082 "error": "Database error",
3083 "message": format!("Failed to open recorder database: {}", e)
3084 })),
3085 )
3086 .into_response();
3087 }
3088 };
3089
3090 let since_dt = if let Some(ref since_str) = request.since {
3092 match DateTime::parse_from_rfc3339(since_str) {
3093 Ok(dt) => Some(dt.with_timezone(&Utc)),
3094 Err(e) => {
3095 return (
3096 StatusCode::BAD_REQUEST,
3097 Json(serde_json::json!({
3098 "error": "Invalid date format",
3099 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3100 })),
3101 )
3102 .into_response();
3103 }
3104 }
3105 } else {
3106 None
3107 };
3108
3109 let until_dt = if let Some(ref until_str) = request.until {
3110 match DateTime::parse_from_rfc3339(until_str) {
3111 Ok(dt) => Some(dt.with_timezone(&Utc)),
3112 Err(e) => {
3113 return (
3114 StatusCode::BAD_REQUEST,
3115 Json(serde_json::json!({
3116 "error": "Invalid date format",
3117 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3118 })),
3119 )
3120 .into_response();
3121 }
3122 }
3123 } else {
3124 None
3125 };
3126
3127 let query_filters = QueryFilters {
3129 since: since_dt,
3130 until: until_dt,
3131 path_pattern: request.path_pattern.clone(),
3132 min_status_code: None,
3133 max_requests: Some(1000),
3134 };
3135
3136 let exchanges_from_recorder =
3141 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3142 Ok(exchanges) => exchanges,
3143 Err(e) => {
3144 return (
3145 StatusCode::INTERNAL_SERVER_ERROR,
3146 Json(serde_json::json!({
3147 "error": "Query error",
3148 "message": format!("Failed to query HTTP exchanges: {}", e)
3149 })),
3150 )
3151 .into_response();
3152 }
3153 };
3154
3155 if exchanges_from_recorder.is_empty() {
3156 return (
3157 StatusCode::NOT_FOUND,
3158 Json(serde_json::json!({
3159 "error": "No exchanges found",
3160 "message": "No HTTP exchanges found matching the specified filters"
3161 })),
3162 )
3163 .into_response();
3164 }
3165
3166 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3168 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3169 .into_iter()
3170 .map(|e| LocalHttpExchange {
3171 method: e.method,
3172 path: e.path,
3173 query_params: e.query_params,
3174 headers: e.headers,
3175 body: e.body,
3176 body_encoding: e.body_encoding,
3177 status_code: e.status_code,
3178 response_headers: e.response_headers,
3179 response_body: e.response_body,
3180 response_body_encoding: e.response_body_encoding,
3181 timestamp: e.timestamp,
3182 })
3183 .collect();
3184
3185 let behavior_config = IntelligentBehaviorConfig::default();
3187 let gen_config = OpenApiGenerationConfig {
3188 min_confidence: request.min_confidence,
3189 behavior_model: Some(behavior_config.behavior_model),
3190 };
3191
3192 let generator = OpenApiSpecGenerator::new(gen_config);
3194 let result = match generator.generate_from_exchanges(exchanges).await {
3195 Ok(result) => result,
3196 Err(e) => {
3197 return (
3198 StatusCode::INTERNAL_SERVER_ERROR,
3199 Json(serde_json::json!({
3200 "error": "Generation error",
3201 "message": format!("Failed to generate OpenAPI spec: {}", e)
3202 })),
3203 )
3204 .into_response();
3205 }
3206 };
3207
3208 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3210 raw.clone()
3211 } else {
3212 match serde_json::to_value(&result.spec.spec) {
3213 Ok(json) => json,
3214 Err(e) => {
3215 return (
3216 StatusCode::INTERNAL_SERVER_ERROR,
3217 Json(serde_json::json!({
3218 "error": "Serialization error",
3219 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3220 })),
3221 )
3222 .into_response();
3223 }
3224 }
3225 };
3226
3227 let response = serde_json::json!({
3229 "spec": spec_json,
3230 "metadata": {
3231 "requests_analyzed": result.metadata.requests_analyzed,
3232 "paths_inferred": result.metadata.paths_inferred,
3233 "path_confidence": result.metadata.path_confidence,
3234 "generated_at": result.metadata.generated_at.to_rfc3339(),
3235 "duration_ms": result.metadata.duration_ms,
3236 }
3237 });
3238
3239 Json(response).into_response()
3240}
3241
3242async fn list_rule_explanations(
3244 State(state): State<ManagementState>,
3245 Query(params): Query<std::collections::HashMap<String, String>>,
3246) -> impl IntoResponse {
3247 use mockforge_core::intelligent_behavior::RuleType;
3248
3249 let explanations = state.rule_explanations.read().await;
3250 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3251
3252 if let Some(rule_type_str) = params.get("rule_type") {
3254 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3255 explanations_vec.retain(|e| e.rule_type == rule_type);
3256 }
3257 }
3258
3259 if let Some(min_confidence_str) = params.get("min_confidence") {
3261 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3262 explanations_vec.retain(|e| e.confidence >= min_confidence);
3263 }
3264 }
3265
3266 explanations_vec.sort_by(|a, b| {
3268 b.confidence
3269 .partial_cmp(&a.confidence)
3270 .unwrap_or(std::cmp::Ordering::Equal)
3271 .then_with(|| b.generated_at.cmp(&a.generated_at))
3272 });
3273
3274 Json(serde_json::json!({
3275 "explanations": explanations_vec,
3276 "total": explanations_vec.len(),
3277 }))
3278 .into_response()
3279}
3280
3281async fn get_rule_explanation(
3283 State(state): State<ManagementState>,
3284 Path(rule_id): Path<String>,
3285) -> impl IntoResponse {
3286 let explanations = state.rule_explanations.read().await;
3287
3288 match explanations.get(&rule_id) {
3289 Some(explanation) => Json(serde_json::json!({
3290 "explanation": explanation,
3291 }))
3292 .into_response(),
3293 None => (
3294 StatusCode::NOT_FOUND,
3295 Json(serde_json::json!({
3296 "error": "Rule explanation not found",
3297 "message": format!("No explanation found for rule ID: {}", rule_id)
3298 })),
3299 )
3300 .into_response(),
3301 }
3302}
3303
3304#[derive(Debug, Deserialize)]
3306pub struct LearnFromExamplesRequest {
3307 pub examples: Vec<ExamplePairRequest>,
3309 #[serde(default)]
3311 pub config: Option<serde_json::Value>,
3312}
3313
3314#[derive(Debug, Deserialize)]
3316pub struct ExamplePairRequest {
3317 pub request: serde_json::Value,
3319 pub response: serde_json::Value,
3321}
3322
3323async fn learn_from_examples(
3328 State(state): State<ManagementState>,
3329 Json(request): Json<LearnFromExamplesRequest>,
3330) -> impl IntoResponse {
3331 use mockforge_core::intelligent_behavior::{
3332 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3333 rule_generator::{ExamplePair, RuleGenerator},
3334 };
3335
3336 if request.examples.is_empty() {
3337 return (
3338 StatusCode::BAD_REQUEST,
3339 Json(serde_json::json!({
3340 "error": "No examples provided",
3341 "message": "At least one example pair is required"
3342 })),
3343 )
3344 .into_response();
3345 }
3346
3347 let example_pairs: Result<Vec<ExamplePair>, String> = request
3349 .examples
3350 .into_iter()
3351 .enumerate()
3352 .map(|(idx, ex)| {
3353 let method = ex
3355 .request
3356 .get("method")
3357 .and_then(|v| v.as_str())
3358 .map(|s| s.to_string())
3359 .unwrap_or_else(|| "GET".to_string());
3360 let path = ex
3361 .request
3362 .get("path")
3363 .and_then(|v| v.as_str())
3364 .map(|s| s.to_string())
3365 .unwrap_or_else(|| "/".to_string());
3366 let request_body = ex.request.get("body").cloned();
3367 let query_params = ex
3368 .request
3369 .get("query_params")
3370 .and_then(|v| v.as_object())
3371 .map(|obj| {
3372 obj.iter()
3373 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3374 .collect()
3375 })
3376 .unwrap_or_default();
3377 let headers = ex
3378 .request
3379 .get("headers")
3380 .and_then(|v| v.as_object())
3381 .map(|obj| {
3382 obj.iter()
3383 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3384 .collect()
3385 })
3386 .unwrap_or_default();
3387
3388 let status = ex
3390 .response
3391 .get("status_code")
3392 .or_else(|| ex.response.get("status"))
3393 .and_then(|v| v.as_u64())
3394 .map(|n| n as u16)
3395 .unwrap_or(200);
3396 let response_body = ex.response.get("body").cloned();
3397
3398 Ok(ExamplePair {
3399 method,
3400 path,
3401 request: request_body,
3402 status,
3403 response: response_body,
3404 query_params,
3405 headers,
3406 metadata: {
3407 let mut meta = std::collections::HashMap::new();
3408 meta.insert("source".to_string(), "api".to_string());
3409 meta.insert("example_index".to_string(), idx.to_string());
3410 meta
3411 },
3412 })
3413 })
3414 .collect();
3415
3416 let example_pairs = match example_pairs {
3417 Ok(pairs) => pairs,
3418 Err(e) => {
3419 return (
3420 StatusCode::BAD_REQUEST,
3421 Json(serde_json::json!({
3422 "error": "Invalid examples",
3423 "message": e
3424 })),
3425 )
3426 .into_response();
3427 }
3428 };
3429
3430 let behavior_config = if let Some(config_json) = request.config {
3432 serde_json::from_value(config_json)
3434 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3435 .behavior_model
3436 } else {
3437 BehaviorModelConfig::default()
3438 };
3439
3440 let generator = RuleGenerator::new(behavior_config);
3442
3443 let (rules, explanations) =
3445 match generator.generate_rules_with_explanations(example_pairs).await {
3446 Ok(result) => result,
3447 Err(e) => {
3448 return (
3449 StatusCode::INTERNAL_SERVER_ERROR,
3450 Json(serde_json::json!({
3451 "error": "Rule generation failed",
3452 "message": format!("Failed to generate rules: {}", e)
3453 })),
3454 )
3455 .into_response();
3456 }
3457 };
3458
3459 {
3461 let mut stored_explanations = state.rule_explanations.write().await;
3462 for explanation in &explanations {
3463 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3464 }
3465 }
3466
3467 let response = serde_json::json!({
3469 "success": true,
3470 "rules_generated": {
3471 "consistency_rules": rules.consistency_rules.len(),
3472 "schemas": rules.schemas.len(),
3473 "state_machines": rules.state_transitions.len(),
3474 "system_prompt": !rules.system_prompt.is_empty(),
3475 },
3476 "explanations": explanations.iter().map(|e| serde_json::json!({
3477 "rule_id": e.rule_id,
3478 "rule_type": e.rule_type,
3479 "confidence": e.confidence,
3480 "reasoning": e.reasoning,
3481 })).collect::<Vec<_>>(),
3482 "total_explanations": explanations.len(),
3483 });
3484
3485 Json(response).into_response()
3486}
3487
3488fn extract_yaml_spec(text: &str) -> String {
3489 if let Some(start) = text.find("```yaml") {
3491 let yaml_start = text[start + 7..].trim_start();
3492 if let Some(end) = yaml_start.find("```") {
3493 return yaml_start[..end].trim().to_string();
3494 }
3495 }
3496 if let Some(start) = text.find("```") {
3497 let content_start = text[start + 3..].trim_start();
3498 if let Some(end) = content_start.find("```") {
3499 return content_start[..end].trim().to_string();
3500 }
3501 }
3502
3503 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3505 return text.trim().to_string();
3506 }
3507
3508 text.trim().to_string()
3510}
3511
3512fn extract_graphql_schema(text: &str) -> String {
3514 if let Some(start) = text.find("```graphql") {
3516 let schema_start = text[start + 10..].trim_start();
3517 if let Some(end) = schema_start.find("```") {
3518 return schema_start[..end].trim().to_string();
3519 }
3520 }
3521 if let Some(start) = text.find("```") {
3522 let content_start = text[start + 3..].trim_start();
3523 if let Some(end) = content_start.find("```") {
3524 return content_start[..end].trim().to_string();
3525 }
3526 }
3527
3528 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3530 return text.trim().to_string();
3531 }
3532
3533 text.trim().to_string()
3534}
3535
3536async fn get_chaos_config(State(state): State<ManagementState>) -> impl IntoResponse {
3540 #[cfg(feature = "chaos")]
3541 {
3542 if let Some(chaos_state) = &state.chaos_api_state {
3543 let config = chaos_state.config.read().await;
3544 Json(serde_json::json!({
3546 "enabled": config.enabled,
3547 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3548 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3549 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3550 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3551 }))
3552 .into_response()
3553 } else {
3554 Json(serde_json::json!({
3556 "enabled": false,
3557 "latency": null,
3558 "fault_injection": null,
3559 "rate_limit": null,
3560 "traffic_shaping": null,
3561 }))
3562 .into_response()
3563 }
3564 }
3565 #[cfg(not(feature = "chaos"))]
3566 {
3567 Json(serde_json::json!({
3569 "enabled": false,
3570 "latency": null,
3571 "fault_injection": null,
3572 "rate_limit": null,
3573 "traffic_shaping": null,
3574 }))
3575 .into_response()
3576 }
3577}
3578
3579#[derive(Debug, Deserialize)]
3581pub struct ChaosConfigUpdate {
3582 pub enabled: Option<bool>,
3584 pub latency: Option<serde_json::Value>,
3586 pub fault_injection: Option<serde_json::Value>,
3588 pub rate_limit: Option<serde_json::Value>,
3590 pub traffic_shaping: Option<serde_json::Value>,
3592}
3593
3594async fn update_chaos_config(
3596 State(state): State<ManagementState>,
3597 Json(config_update): Json<ChaosConfigUpdate>,
3598) -> impl IntoResponse {
3599 #[cfg(feature = "chaos")]
3600 {
3601 if let Some(chaos_state) = &state.chaos_api_state {
3602 use mockforge_chaos::config::{
3603 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3604 TrafficShapingConfig,
3605 };
3606
3607 let mut config = chaos_state.config.write().await;
3608
3609 if let Some(enabled) = config_update.enabled {
3611 config.enabled = enabled;
3612 }
3613
3614 if let Some(latency_json) = config_update.latency {
3616 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3617 config.latency = Some(latency);
3618 }
3619 }
3620
3621 if let Some(fault_json) = config_update.fault_injection {
3623 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3624 config.fault_injection = Some(fault);
3625 }
3626 }
3627
3628 if let Some(rate_json) = config_update.rate_limit {
3630 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3631 config.rate_limit = Some(rate);
3632 }
3633 }
3634
3635 if let Some(traffic_json) = config_update.traffic_shaping {
3637 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3638 config.traffic_shaping = Some(traffic);
3639 }
3640 }
3641
3642 drop(config);
3645
3646 info!("Chaos configuration updated successfully");
3647 Json(serde_json::json!({
3648 "success": true,
3649 "message": "Chaos configuration updated and applied"
3650 }))
3651 .into_response()
3652 } else {
3653 (
3654 StatusCode::SERVICE_UNAVAILABLE,
3655 Json(serde_json::json!({
3656 "success": false,
3657 "error": "Chaos API not available",
3658 "message": "Chaos engineering is not enabled or configured"
3659 })),
3660 )
3661 .into_response()
3662 }
3663 }
3664 #[cfg(not(feature = "chaos"))]
3665 {
3666 (
3667 StatusCode::NOT_IMPLEMENTED,
3668 Json(serde_json::json!({
3669 "success": false,
3670 "error": "Chaos feature not enabled",
3671 "message": "Chaos engineering feature is not compiled into this build"
3672 })),
3673 )
3674 .into_response()
3675 }
3676}
3677
3678async fn list_network_profiles() -> impl IntoResponse {
3682 use mockforge_core::network_profiles::NetworkProfileCatalog;
3683
3684 let catalog = NetworkProfileCatalog::default();
3685 let profiles: Vec<serde_json::Value> = catalog
3686 .list_profiles_with_description()
3687 .iter()
3688 .map(|(name, description)| {
3689 serde_json::json!({
3690 "name": name,
3691 "description": description,
3692 })
3693 })
3694 .collect();
3695
3696 Json(serde_json::json!({
3697 "profiles": profiles
3698 }))
3699 .into_response()
3700}
3701
3702#[derive(Debug, Deserialize)]
3703pub struct ApplyNetworkProfileRequest {
3705 pub profile_name: String,
3707}
3708
3709async fn apply_network_profile(
3711 State(state): State<ManagementState>,
3712 Json(request): Json<ApplyNetworkProfileRequest>,
3713) -> impl IntoResponse {
3714 use mockforge_core::network_profiles::NetworkProfileCatalog;
3715
3716 let catalog = NetworkProfileCatalog::default();
3717 if let Some(profile) = catalog.get(&request.profile_name) {
3718 if let Some(server_config) = &state.server_config {
3721 let mut config = server_config.write().await;
3722
3723 use mockforge_core::config::NetworkShapingConfig;
3725
3726 let network_shaping = NetworkShapingConfig {
3730 enabled: profile.traffic_shaping.bandwidth.enabled
3731 || profile.traffic_shaping.burst_loss.enabled,
3732 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3734 max_connections: 1000, };
3736
3737 if let Some(ref mut chaos) = config.observability.chaos {
3740 chaos.traffic_shaping = Some(network_shaping);
3741 } else {
3742 use mockforge_core::config::ChaosEngConfig;
3744 config.observability.chaos = Some(ChaosEngConfig {
3745 enabled: true,
3746 latency: None,
3747 fault_injection: None,
3748 rate_limit: None,
3749 traffic_shaping: Some(network_shaping),
3750 scenario: None,
3751 });
3752 }
3753
3754 info!("Network profile '{}' applied to server configuration", request.profile_name);
3755 } else {
3756 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3757 }
3758
3759 #[cfg(feature = "chaos")]
3761 {
3762 if let Some(chaos_state) = &state.chaos_api_state {
3763 use mockforge_chaos::config::TrafficShapingConfig;
3764
3765 let mut chaos_config = chaos_state.config.write().await;
3766 let chaos_traffic_shaping = TrafficShapingConfig {
3768 enabled: profile.traffic_shaping.bandwidth.enabled
3769 || profile.traffic_shaping.burst_loss.enabled,
3770 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3772 max_connections: 0,
3773 connection_timeout_ms: 30000,
3774 };
3775 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3776 chaos_config.enabled = true; drop(chaos_config);
3778 info!("Network profile '{}' applied to chaos API state", request.profile_name);
3779 }
3780 }
3781
3782 Json(serde_json::json!({
3783 "success": true,
3784 "message": format!("Network profile '{}' applied", request.profile_name),
3785 "profile": {
3786 "name": profile.name,
3787 "description": profile.description,
3788 }
3789 }))
3790 .into_response()
3791 } else {
3792 (
3793 StatusCode::NOT_FOUND,
3794 Json(serde_json::json!({
3795 "error": "Profile not found",
3796 "message": format!("Network profile '{}' not found", request.profile_name)
3797 })),
3798 )
3799 .into_response()
3800 }
3801}
3802
3803pub fn management_router_with_ui_builder(
3805 state: ManagementState,
3806 server_config: mockforge_core::config::ServerConfig,
3807) -> Router {
3808 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3809
3810 let management = management_router(state);
3812
3813 let ui_builder_state = UIBuilderState::new(server_config);
3815 let ui_builder = create_ui_builder_router(ui_builder_state);
3816
3817 management.nest("/ui-builder", ui_builder)
3819}
3820
3821pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3823 use crate::spec_import::{spec_import_router, SpecImportState};
3824
3825 let management = management_router(state);
3827
3828 Router::new()
3830 .merge(management)
3831 .merge(spec_import_router(SpecImportState::new()))
3832}
3833
3834#[cfg(test)]
3835mod tests {
3836 use super::*;
3837
3838 #[tokio::test]
3839 async fn test_create_and_get_mock() {
3840 let state = ManagementState::new(None, None, 3000);
3841
3842 let mock = MockConfig {
3843 id: "test-1".to_string(),
3844 name: "Test Mock".to_string(),
3845 method: "GET".to_string(),
3846 path: "/test".to_string(),
3847 response: MockResponse {
3848 body: serde_json::json!({"message": "test"}),
3849 headers: None,
3850 },
3851 enabled: true,
3852 latency_ms: None,
3853 status_code: Some(200),
3854 request_match: None,
3855 priority: None,
3856 scenario: None,
3857 required_scenario_state: None,
3858 new_scenario_state: None,
3859 };
3860
3861 {
3863 let mut mocks = state.mocks.write().await;
3864 mocks.push(mock.clone());
3865 }
3866
3867 let mocks = state.mocks.read().await;
3869 let found = mocks.iter().find(|m| m.id == "test-1");
3870 assert!(found.is_some());
3871 assert_eq!(found.unwrap().name, "Test Mock");
3872 }
3873
3874 #[tokio::test]
3875 async fn test_server_stats() {
3876 let state = ManagementState::new(None, None, 3000);
3877
3878 {
3880 let mut mocks = state.mocks.write().await;
3881 mocks.push(MockConfig {
3882 id: "1".to_string(),
3883 name: "Mock 1".to_string(),
3884 method: "GET".to_string(),
3885 path: "/test1".to_string(),
3886 response: MockResponse {
3887 body: serde_json::json!({}),
3888 headers: None,
3889 },
3890 enabled: true,
3891 latency_ms: None,
3892 status_code: Some(200),
3893 request_match: None,
3894 priority: None,
3895 scenario: None,
3896 required_scenario_state: None,
3897 new_scenario_state: None,
3898 });
3899 mocks.push(MockConfig {
3900 id: "2".to_string(),
3901 name: "Mock 2".to_string(),
3902 method: "POST".to_string(),
3903 path: "/test2".to_string(),
3904 response: MockResponse {
3905 body: serde_json::json!({}),
3906 headers: None,
3907 },
3908 enabled: false,
3909 latency_ms: None,
3910 status_code: Some(201),
3911 request_match: None,
3912 priority: None,
3913 scenario: None,
3914 required_scenario_state: None,
3915 new_scenario_state: None,
3916 });
3917 }
3918
3919 let mocks = state.mocks.read().await;
3920 assert_eq!(mocks.len(), 2);
3921 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3922 }
3923}