1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{
9 sse::{Event, Sse},
10 IntoResponse, Json, Response,
11 },
12 routing::{delete, get, post, put},
13 Router,
14};
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23use std::convert::Infallible;
24use std::sync::Arc;
25use tokio::sync::{broadcast, RwLock};
26use tracing::*;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "protocol", content = "data")]
31#[serde(rename_all = "lowercase")]
32pub enum MessageEvent {
33 #[cfg(feature = "mqtt")]
34 Mqtt(MqttMessageEvent),
36 #[cfg(feature = "kafka")]
37 Kafka(KafkaMessageEvent),
39}
40
41#[cfg(feature = "mqtt")]
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MqttMessageEvent {
45 pub topic: String,
47 pub payload: String,
49 pub qos: u8,
51 pub retain: bool,
53 pub timestamp: String,
55}
56
57#[cfg(feature = "kafka")]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct KafkaMessageEvent {
60 pub topic: String,
61 pub key: Option<String>,
62 pub value: String,
63 pub partition: i32,
64 pub offset: i64,
65 pub headers: Option<std::collections::HashMap<String, String>>,
66 pub timestamp: String,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MockConfig {
72 #[serde(skip_serializing_if = "String::is_empty")]
74 pub id: String,
75 pub name: String,
77 pub method: String,
79 pub path: String,
81 pub response: MockResponse,
83 #[serde(default = "default_true")]
85 pub enabled: bool,
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub latency_ms: Option<u64>,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub status_code: Option<u16>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub request_match: Option<RequestMatchCriteria>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub priority: Option<i32>,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub scenario: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub required_scenario_state: Option<String>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub new_scenario_state: Option<String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct MockResponse {
116 pub body: serde_json::Value,
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub headers: Option<std::collections::HashMap<String, String>>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
125pub struct RequestMatchCriteria {
126 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
128 pub headers: std::collections::HashMap<String, String>,
129 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
131 pub query_params: std::collections::HashMap<String, String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub body_pattern: Option<String>,
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub json_path: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 pub xpath: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 pub custom_matcher: Option<String>,
144}
145
146pub fn mock_matches_request(
155 mock: &MockConfig,
156 method: &str,
157 path: &str,
158 headers: &std::collections::HashMap<String, String>,
159 query_params: &std::collections::HashMap<String, String>,
160 body: Option<&[u8]>,
161) -> bool {
162 use regex::Regex;
163
164 if !mock.enabled {
166 return false;
167 }
168
169 if mock.method.to_uppercase() != method.to_uppercase() {
171 return false;
172 }
173
174 if !path_matches_pattern(&mock.path, path) {
176 return false;
177 }
178
179 if let Some(criteria) = &mock.request_match {
181 for (key, expected_value) in &criteria.headers {
183 let header_key_lower = key.to_lowercase();
184 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
185
186 if let Some((_, actual_value)) = found {
187 if let Ok(re) = Regex::new(expected_value) {
189 if !re.is_match(actual_value) {
190 return false;
191 }
192 } else if actual_value != expected_value {
193 return false;
194 }
195 } else {
196 return false; }
198 }
199
200 for (key, expected_value) in &criteria.query_params {
202 if let Some(actual_value) = query_params.get(key) {
203 if actual_value != expected_value {
204 return false;
205 }
206 } else {
207 return false; }
209 }
210
211 if let Some(pattern) = &criteria.body_pattern {
213 if let Some(body_bytes) = body {
214 let body_str = String::from_utf8_lossy(body_bytes);
215 if let Ok(re) = Regex::new(pattern) {
217 if !re.is_match(&body_str) {
218 return false;
219 }
220 } else if body_str.as_ref() != pattern {
221 return false;
222 }
223 } else {
224 return false; }
226 }
227
228 if let Some(json_path) = &criteria.json_path {
230 if let Some(body_bytes) = body {
231 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
232 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
233 if !json_path_exists(&json_value, json_path) {
235 return false;
236 }
237 }
238 }
239 }
240 }
241
242 if let Some(_xpath) = &criteria.xpath {
244 tracing::warn!("XPath matching not yet fully implemented");
247 }
248
249 if let Some(custom) = &criteria.custom_matcher {
251 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
252 return false;
253 }
254 }
255 }
256
257 true
258}
259
260fn path_matches_pattern(pattern: &str, path: &str) -> bool {
262 if pattern == path {
264 return true;
265 }
266
267 if pattern == "*" {
269 return true;
270 }
271
272 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
274 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
275
276 if pattern_parts.len() != path_parts.len() {
277 if pattern.contains('*') {
279 return matches_wildcard_pattern(pattern, path);
280 }
281 return false;
282 }
283
284 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
285 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
287 continue; }
289
290 if pattern_part != path_part {
291 return false;
292 }
293 }
294
295 true
296}
297
298fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
300 use regex::Regex;
301
302 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
304
305 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
306 return re.is_match(path);
307 }
308
309 false
310}
311
312fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
314 if json_path.starts_with("$.") {
317 let path = &json_path[2..];
318 let parts: Vec<&str> = path.split('.').collect();
319
320 let mut current = json;
321 for part in parts {
322 if let Some(obj) = current.as_object() {
323 if let Some(value) = obj.get(part) {
324 current = value;
325 } else {
326 return false;
327 }
328 } else {
329 return false;
330 }
331 }
332 true
333 } else {
334 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
336 false
337 }
338}
339
340fn evaluate_custom_matcher(
342 expression: &str,
343 method: &str,
344 path: &str,
345 headers: &std::collections::HashMap<String, String>,
346 query_params: &std::collections::HashMap<String, String>,
347 body: Option<&[u8]>,
348) -> bool {
349 use regex::Regex;
350
351 let expr = expression.trim();
352
353 if expr.contains("==") {
355 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
356 if parts.len() != 2 {
357 return false;
358 }
359
360 let field = parts[0];
361 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
362
363 match field {
364 "method" => method == expected_value,
365 "path" => path == expected_value,
366 _ if field.starts_with("headers.") => {
367 let header_name = &field[8..];
368 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
369 }
370 _ if field.starts_with("query.") => {
371 let param_name = &field[6..];
372 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
373 }
374 _ => false,
375 }
376 }
377 else if expr.contains("=~") {
379 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
380 if parts.len() != 2 {
381 return false;
382 }
383
384 let field = parts[0];
385 let pattern = parts[1].trim_matches('"').trim_matches('\'');
386
387 if let Ok(re) = Regex::new(pattern) {
388 match field {
389 "method" => re.is_match(method),
390 "path" => re.is_match(path),
391 _ if field.starts_with("headers.") => {
392 let header_name = &field[8..];
393 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
394 }
395 _ if field.starts_with("query.") => {
396 let param_name = &field[6..];
397 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
398 }
399 _ => false,
400 }
401 } else {
402 false
403 }
404 }
405 else if expr.contains("contains") {
407 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
408 if parts.len() != 2 {
409 return false;
410 }
411
412 let field = parts[0];
413 let search_value = parts[1].trim_matches('"').trim_matches('\'');
414
415 match field {
416 "path" => path.contains(search_value),
417 _ if field.starts_with("headers.") => {
418 let header_name = &field[8..];
419 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
420 }
421 _ if field.starts_with("body") => {
422 if let Some(body_bytes) = body {
423 let body_str = String::from_utf8_lossy(body_bytes);
424 body_str.contains(search_value)
425 } else {
426 false
427 }
428 }
429 _ => false,
430 }
431 } else {
432 tracing::warn!("Unknown custom matcher expression format: {}", expr);
434 false
435 }
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ServerStats {
441 pub uptime_seconds: u64,
443 pub total_requests: u64,
445 pub active_mocks: usize,
447 pub enabled_mocks: usize,
449 pub registered_routes: usize,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct ServerConfig {
456 pub version: String,
458 pub port: u16,
460 pub has_openapi_spec: bool,
462 #[serde(skip_serializing_if = "Option::is_none")]
464 pub spec_path: Option<String>,
465}
466
467#[derive(Clone)]
469pub struct ManagementState {
470 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
472 pub spec: Option<Arc<OpenApiSpec>>,
474 pub spec_path: Option<String>,
476 pub port: u16,
478 pub start_time: std::time::Instant,
480 pub request_counter: Arc<RwLock<u64>>,
482 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
484 #[cfg(feature = "smtp")]
486 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
487 #[cfg(feature = "mqtt")]
489 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
490 #[cfg(feature = "kafka")]
492 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
493 #[cfg(any(feature = "mqtt", feature = "kafka"))]
495 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
496 pub state_machine_manager:
498 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
499 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
501 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
503 pub rule_explanations: Arc<
505 RwLock<
506 std::collections::HashMap<
507 String,
508 mockforge_core::intelligent_behavior::RuleExplanation,
509 >,
510 >,
511 >,
512}
513
514impl ManagementState {
515 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
522 Self {
523 mocks: Arc::new(RwLock::new(Vec::new())),
524 spec,
525 spec_path,
526 port,
527 start_time: std::time::Instant::now(),
528 request_counter: Arc::new(RwLock::new(0)),
529 proxy_config: None,
530 #[cfg(feature = "smtp")]
531 smtp_registry: None,
532 #[cfg(feature = "mqtt")]
533 mqtt_broker: None,
534 #[cfg(feature = "kafka")]
535 kafka_broker: None,
536 #[cfg(any(feature = "mqtt", feature = "kafka"))]
537 message_events: {
538 let (tx, _) = broadcast::channel(1000);
539 Arc::new(tx)
540 },
541 state_machine_manager: Arc::new(RwLock::new(
542 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
543 )),
544 ws_broadcast: None,
545 lifecycle_hooks: None,
546 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
547 }
548 }
549
550 pub fn with_lifecycle_hooks(
552 mut self,
553 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
554 ) -> Self {
555 self.lifecycle_hooks = Some(hooks);
556 self
557 }
558
559 pub fn with_ws_broadcast(
561 mut self,
562 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
563 ) -> Self {
564 self.ws_broadcast = Some(ws_broadcast);
565 self
566 }
567
568 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
570 self.proxy_config = Some(proxy_config);
571 self
572 }
573
574 #[cfg(feature = "smtp")]
575 pub fn with_smtp_registry(
577 mut self,
578 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
579 ) -> Self {
580 self.smtp_registry = Some(smtp_registry);
581 self
582 }
583
584 #[cfg(feature = "mqtt")]
585 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
587 self.mqtt_broker = Some(mqtt_broker);
588 self
589 }
590
591 #[cfg(feature = "kafka")]
592 pub fn with_kafka_broker(
594 mut self,
595 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
596 ) -> Self {
597 self.kafka_broker = Some(kafka_broker);
598 self
599 }
600}
601
602async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
604 let mocks = state.mocks.read().await;
605 Json(serde_json::json!({
606 "mocks": *mocks,
607 "total": mocks.len(),
608 "enabled": mocks.iter().filter(|m| m.enabled).count()
609 }))
610}
611
612async fn get_mock(
614 State(state): State<ManagementState>,
615 Path(id): Path<String>,
616) -> Result<Json<MockConfig>, StatusCode> {
617 let mocks = state.mocks.read().await;
618 mocks
619 .iter()
620 .find(|m| m.id == id)
621 .cloned()
622 .map(Json)
623 .ok_or(StatusCode::NOT_FOUND)
624}
625
626async fn create_mock(
628 State(state): State<ManagementState>,
629 Json(mut mock): Json<MockConfig>,
630) -> Result<Json<MockConfig>, StatusCode> {
631 let mut mocks = state.mocks.write().await;
632
633 if mock.id.is_empty() {
635 mock.id = uuid::Uuid::new_v4().to_string();
636 }
637
638 if mocks.iter().any(|m| m.id == mock.id) {
640 return Err(StatusCode::CONFLICT);
641 }
642
643 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
644
645 if let Some(hooks) = &state.lifecycle_hooks {
647 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
648 id: mock.id.clone(),
649 name: mock.name.clone(),
650 config: serde_json::to_value(&mock).unwrap_or_default(),
651 };
652 hooks.invoke_mock_created(&event).await;
653 }
654
655 mocks.push(mock.clone());
656
657 if let Some(tx) = &state.ws_broadcast {
659 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
660 }
661
662 Ok(Json(mock))
663}
664
665async fn update_mock(
667 State(state): State<ManagementState>,
668 Path(id): Path<String>,
669 Json(updated_mock): Json<MockConfig>,
670) -> Result<Json<MockConfig>, StatusCode> {
671 let mut mocks = state.mocks.write().await;
672
673 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
674
675 let old_mock = mocks[position].clone();
677
678 info!("Updating mock: {}", id);
679 mocks[position] = updated_mock.clone();
680
681 if let Some(hooks) = &state.lifecycle_hooks {
683 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
684 id: updated_mock.id.clone(),
685 name: updated_mock.name.clone(),
686 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
687 };
688 hooks.invoke_mock_updated(&event).await;
689
690 if old_mock.enabled != updated_mock.enabled {
692 let state_event = if updated_mock.enabled {
693 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
694 id: updated_mock.id.clone(),
695 }
696 } else {
697 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
698 id: updated_mock.id.clone(),
699 }
700 };
701 hooks.invoke_mock_state_changed(&state_event).await;
702 }
703 }
704
705 if let Some(tx) = &state.ws_broadcast {
707 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
708 }
709
710 Ok(Json(updated_mock))
711}
712
713async fn delete_mock(
715 State(state): State<ManagementState>,
716 Path(id): Path<String>,
717) -> Result<StatusCode, StatusCode> {
718 let mut mocks = state.mocks.write().await;
719
720 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
721
722 let deleted_mock = mocks[position].clone();
724
725 info!("Deleting mock: {}", id);
726 mocks.remove(position);
727
728 if let Some(hooks) = &state.lifecycle_hooks {
730 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
731 id: deleted_mock.id.clone(),
732 name: deleted_mock.name.clone(),
733 };
734 hooks.invoke_mock_deleted(&event).await;
735 }
736
737 if let Some(tx) = &state.ws_broadcast {
739 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
740 }
741
742 Ok(StatusCode::NO_CONTENT)
743}
744
745#[derive(Debug, Deserialize)]
747pub struct ValidateConfigRequest {
748 pub config: serde_json::Value,
750 #[serde(default = "default_format")]
752 pub format: String,
753}
754
755fn default_format() -> String {
756 "json".to_string()
757}
758
759async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
761 use mockforge_core::config::ServerConfig;
762
763 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
764 "yaml" | "yml" => {
765 let yaml_str = match serde_json::to_string(&request.config) {
766 Ok(s) => s,
767 Err(e) => {
768 return (
769 StatusCode::BAD_REQUEST,
770 Json(serde_json::json!({
771 "valid": false,
772 "error": format!("Failed to convert to string: {}", e),
773 "message": "Configuration validation failed"
774 })),
775 )
776 .into_response();
777 }
778 };
779 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
780 }
781 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
782 };
783
784 match config_result {
785 Ok(_) => Json(serde_json::json!({
786 "valid": true,
787 "message": "Configuration is valid"
788 }))
789 .into_response(),
790 Err(e) => (
791 StatusCode::BAD_REQUEST,
792 Json(serde_json::json!({
793 "valid": false,
794 "error": format!("Invalid configuration: {}", e),
795 "message": "Configuration validation failed"
796 })),
797 )
798 .into_response(),
799 }
800}
801
802#[derive(Debug, Deserialize)]
804pub struct BulkConfigUpdateRequest {
805 pub updates: serde_json::Value,
807}
808
809async fn bulk_update_config(
819 State(_state): State<ManagementState>,
820 Json(request): Json<BulkConfigUpdateRequest>,
821) -> impl IntoResponse {
822 if !request.updates.is_object() {
824 return (
825 StatusCode::BAD_REQUEST,
826 Json(serde_json::json!({
827 "error": "Invalid request",
828 "message": "Updates must be a JSON object"
829 })),
830 )
831 .into_response();
832 }
833
834 use mockforge_core::config::ServerConfig;
836
837 let base_config = ServerConfig::default();
839 let base_json = match serde_json::to_value(&base_config) {
840 Ok(v) => v,
841 Err(e) => {
842 return (
843 StatusCode::INTERNAL_SERVER_ERROR,
844 Json(serde_json::json!({
845 "error": "Internal error",
846 "message": format!("Failed to serialize base config: {}", e)
847 })),
848 )
849 .into_response();
850 }
851 };
852
853 let mut merged = base_json.clone();
855 if let (Some(merged_obj), Some(updates_obj)) =
856 (merged.as_object_mut(), request.updates.as_object())
857 {
858 for (key, value) in updates_obj {
859 merged_obj.insert(key.clone(), value.clone());
860 }
861 }
862
863 match serde_json::from_value::<ServerConfig>(merged) {
865 Ok(_) => {
866 Json(serde_json::json!({
869 "success": true,
870 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState.",
871 "updates_received": request.updates,
872 "validated": true
873 }))
874 .into_response()
875 }
876 Err(e) => (
877 StatusCode::BAD_REQUEST,
878 Json(serde_json::json!({
879 "error": "Invalid configuration",
880 "message": format!("Configuration validation failed: {}", e),
881 "validated": false
882 })),
883 )
884 .into_response(),
885 }
886}
887
888async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
890 let mocks = state.mocks.read().await;
891 let request_count = *state.request_counter.read().await;
892
893 Json(ServerStats {
894 uptime_seconds: state.start_time.elapsed().as_secs(),
895 total_requests: request_count,
896 active_mocks: mocks.len(),
897 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
898 registered_routes: mocks.len(), })
900}
901
902async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
904 Json(ServerConfig {
905 version: env!("CARGO_PKG_VERSION").to_string(),
906 port: state.port,
907 has_openapi_spec: state.spec.is_some(),
908 spec_path: state.spec_path.clone(),
909 })
910}
911
912async fn health_check() -> Json<serde_json::Value> {
914 Json(serde_json::json!({
915 "status": "healthy",
916 "service": "mockforge-management",
917 "timestamp": chrono::Utc::now().to_rfc3339()
918 }))
919}
920
921#[derive(Debug, Clone, Serialize, Deserialize)]
923#[serde(rename_all = "lowercase")]
924pub enum ExportFormat {
925 Json,
927 Yaml,
929}
930
931async fn export_mocks(
933 State(state): State<ManagementState>,
934 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
935) -> Result<(StatusCode, String), StatusCode> {
936 let mocks = state.mocks.read().await;
937
938 let format = params
939 .get("format")
940 .map(|f| match f.as_str() {
941 "yaml" | "yml" => ExportFormat::Yaml,
942 _ => ExportFormat::Json,
943 })
944 .unwrap_or(ExportFormat::Json);
945
946 match format {
947 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
948 .map(|json| (StatusCode::OK, json))
949 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
950 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
951 .map(|yaml| (StatusCode::OK, yaml))
952 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
953 }
954}
955
956async fn import_mocks(
958 State(state): State<ManagementState>,
959 Json(mocks): Json<Vec<MockConfig>>,
960) -> impl IntoResponse {
961 let mut current_mocks = state.mocks.write().await;
962 current_mocks.clear();
963 current_mocks.extend(mocks);
964 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
965}
966
967#[cfg(feature = "smtp")]
968async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
970 if let Some(ref smtp_registry) = state.smtp_registry {
971 match smtp_registry.get_emails() {
972 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
973 Err(e) => (
974 StatusCode::INTERNAL_SERVER_ERROR,
975 Json(serde_json::json!({
976 "error": "Failed to retrieve emails",
977 "message": e.to_string()
978 })),
979 ),
980 }
981 } else {
982 (
983 StatusCode::NOT_IMPLEMENTED,
984 Json(serde_json::json!({
985 "error": "SMTP mailbox management not available",
986 "message": "SMTP server is not enabled or registry not available."
987 })),
988 )
989 }
990}
991
992#[cfg(feature = "smtp")]
994async fn get_smtp_email(
995 State(state): State<ManagementState>,
996 Path(id): Path<String>,
997) -> impl IntoResponse {
998 if let Some(ref smtp_registry) = state.smtp_registry {
999 match smtp_registry.get_email_by_id(&id) {
1000 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1001 Ok(None) => (
1002 StatusCode::NOT_FOUND,
1003 Json(serde_json::json!({
1004 "error": "Email not found",
1005 "id": id
1006 })),
1007 ),
1008 Err(e) => (
1009 StatusCode::INTERNAL_SERVER_ERROR,
1010 Json(serde_json::json!({
1011 "error": "Failed to retrieve email",
1012 "message": e.to_string()
1013 })),
1014 ),
1015 }
1016 } else {
1017 (
1018 StatusCode::NOT_IMPLEMENTED,
1019 Json(serde_json::json!({
1020 "error": "SMTP mailbox management not available",
1021 "message": "SMTP server is not enabled or registry not available."
1022 })),
1023 )
1024 }
1025}
1026
1027#[cfg(feature = "smtp")]
1029async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1030 if let Some(ref smtp_registry) = state.smtp_registry {
1031 match smtp_registry.clear_mailbox() {
1032 Ok(()) => (
1033 StatusCode::OK,
1034 Json(serde_json::json!({
1035 "message": "Mailbox cleared successfully"
1036 })),
1037 ),
1038 Err(e) => (
1039 StatusCode::INTERNAL_SERVER_ERROR,
1040 Json(serde_json::json!({
1041 "error": "Failed to clear mailbox",
1042 "message": e.to_string()
1043 })),
1044 ),
1045 }
1046 } else {
1047 (
1048 StatusCode::NOT_IMPLEMENTED,
1049 Json(serde_json::json!({
1050 "error": "SMTP mailbox management not available",
1051 "message": "SMTP server is not enabled or registry not available."
1052 })),
1053 )
1054 }
1055}
1056
1057#[cfg(feature = "smtp")]
1059async fn export_smtp_mailbox(
1060 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1061) -> impl IntoResponse {
1062 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1063 (
1064 StatusCode::NOT_IMPLEMENTED,
1065 Json(serde_json::json!({
1066 "error": "SMTP mailbox management not available via HTTP API",
1067 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1068 "requested_format": format
1069 })),
1070 )
1071}
1072
1073#[cfg(feature = "smtp")]
1075async fn search_smtp_emails(
1076 State(state): State<ManagementState>,
1077 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1078) -> impl IntoResponse {
1079 if let Some(ref smtp_registry) = state.smtp_registry {
1080 let filters = EmailSearchFilters {
1081 sender: params.get("sender").cloned(),
1082 recipient: params.get("recipient").cloned(),
1083 subject: params.get("subject").cloned(),
1084 body: params.get("body").cloned(),
1085 since: params
1086 .get("since")
1087 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1088 .map(|dt| dt.with_timezone(&chrono::Utc)),
1089 until: params
1090 .get("until")
1091 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1092 .map(|dt| dt.with_timezone(&chrono::Utc)),
1093 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1094 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1095 };
1096
1097 match smtp_registry.search_emails(filters) {
1098 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1099 Err(e) => (
1100 StatusCode::INTERNAL_SERVER_ERROR,
1101 Json(serde_json::json!({
1102 "error": "Failed to search emails",
1103 "message": e.to_string()
1104 })),
1105 ),
1106 }
1107 } else {
1108 (
1109 StatusCode::NOT_IMPLEMENTED,
1110 Json(serde_json::json!({
1111 "error": "SMTP mailbox management not available",
1112 "message": "SMTP server is not enabled or registry not available."
1113 })),
1114 )
1115 }
1116}
1117
1118#[cfg(feature = "mqtt")]
1120#[derive(Debug, Clone, Serialize, Deserialize)]
1121pub struct MqttBrokerStats {
1122 pub connected_clients: usize,
1124 pub active_topics: usize,
1126 pub retained_messages: usize,
1128 pub total_subscriptions: usize,
1130}
1131
1132#[cfg(feature = "mqtt")]
1134async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1135 if let Some(broker) = &state.mqtt_broker {
1136 let connected_clients = broker.get_connected_clients().await.len();
1137 let active_topics = broker.get_active_topics().await.len();
1138 let stats = broker.get_topic_stats().await;
1139
1140 let broker_stats = MqttBrokerStats {
1141 connected_clients,
1142 active_topics,
1143 retained_messages: stats.retained_messages,
1144 total_subscriptions: stats.total_subscriptions,
1145 };
1146
1147 Json(broker_stats).into_response()
1148 } else {
1149 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1150 }
1151}
1152
1153#[cfg(feature = "mqtt")]
1154async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1155 if let Some(broker) = &state.mqtt_broker {
1156 let clients = broker.get_connected_clients().await;
1157 Json(serde_json::json!({
1158 "clients": clients
1159 }))
1160 .into_response()
1161 } else {
1162 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1163 }
1164}
1165
1166#[cfg(feature = "mqtt")]
1167async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1168 if let Some(broker) = &state.mqtt_broker {
1169 let topics = broker.get_active_topics().await;
1170 Json(serde_json::json!({
1171 "topics": topics
1172 }))
1173 .into_response()
1174 } else {
1175 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1176 }
1177}
1178
1179#[cfg(feature = "mqtt")]
1180async fn disconnect_mqtt_client(
1181 State(state): State<ManagementState>,
1182 Path(client_id): Path<String>,
1183) -> impl IntoResponse {
1184 if let Some(broker) = &state.mqtt_broker {
1185 match broker.disconnect_client(&client_id).await {
1186 Ok(_) => {
1187 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1188 }
1189 Err(e) => {
1190 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1191 .into_response()
1192 }
1193 }
1194 } else {
1195 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1196 }
1197}
1198
1199#[cfg(feature = "mqtt")]
1202#[derive(Debug, Deserialize)]
1204pub struct MqttPublishRequest {
1205 pub topic: String,
1207 pub payload: String,
1209 #[serde(default = "default_qos")]
1211 pub qos: u8,
1212 #[serde(default)]
1214 pub retain: bool,
1215}
1216
1217#[cfg(feature = "mqtt")]
1218fn default_qos() -> u8 {
1219 0
1220}
1221
1222#[cfg(feature = "mqtt")]
1223async fn publish_mqtt_message_handler(
1225 State(state): State<ManagementState>,
1226 Json(request): Json<serde_json::Value>,
1227) -> impl IntoResponse {
1228 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1230 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1231 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1232 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1233
1234 if topic.is_none() || payload.is_none() {
1235 return (
1236 StatusCode::BAD_REQUEST,
1237 Json(serde_json::json!({
1238 "error": "Invalid request",
1239 "message": "Missing required fields: topic and payload"
1240 })),
1241 );
1242 }
1243
1244 let topic = topic.unwrap();
1245 let payload = payload.unwrap();
1246
1247 if let Some(broker) = &state.mqtt_broker {
1248 if qos > 2 {
1250 return (
1251 StatusCode::BAD_REQUEST,
1252 Json(serde_json::json!({
1253 "error": "Invalid QoS",
1254 "message": "QoS must be 0, 1, or 2"
1255 })),
1256 );
1257 }
1258
1259 let payload_bytes = payload.as_bytes().to_vec();
1261 let client_id = "mockforge-management-api".to_string();
1262
1263 let publish_result = broker
1264 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1265 .await
1266 .map_err(|e| format!("{}", e));
1267
1268 match publish_result {
1269 Ok(_) => {
1270 let event = MessageEvent::Mqtt(MqttMessageEvent {
1272 topic: topic.clone(),
1273 payload: payload.clone(),
1274 qos,
1275 retain,
1276 timestamp: chrono::Utc::now().to_rfc3339(),
1277 });
1278 let _ = state.message_events.send(event);
1279
1280 (
1281 StatusCode::OK,
1282 Json(serde_json::json!({
1283 "success": true,
1284 "message": format!("Message published to topic '{}'", topic),
1285 "topic": topic,
1286 "qos": qos,
1287 "retain": retain
1288 })),
1289 )
1290 }
1291 Err(error_msg) => (
1292 StatusCode::INTERNAL_SERVER_ERROR,
1293 Json(serde_json::json!({
1294 "error": "Failed to publish message",
1295 "message": error_msg
1296 })),
1297 ),
1298 }
1299 } else {
1300 (
1301 StatusCode::SERVICE_UNAVAILABLE,
1302 Json(serde_json::json!({
1303 "error": "MQTT broker not available",
1304 "message": "MQTT broker is not enabled or not available."
1305 })),
1306 )
1307 }
1308}
1309
1310#[cfg(not(feature = "mqtt"))]
1311async fn publish_mqtt_message_handler(
1313 State(_state): State<ManagementState>,
1314 Json(_request): Json<serde_json::Value>,
1315) -> impl IntoResponse {
1316 (
1317 StatusCode::SERVICE_UNAVAILABLE,
1318 Json(serde_json::json!({
1319 "error": "MQTT feature not enabled",
1320 "message": "MQTT support is not compiled into this build"
1321 })),
1322 )
1323}
1324
1325#[cfg(feature = "mqtt")]
1326#[derive(Debug, Deserialize)]
1328pub struct MqttBatchPublishRequest {
1329 pub messages: Vec<MqttPublishRequest>,
1331 #[serde(default = "default_delay")]
1333 pub delay_ms: u64,
1334}
1335
1336#[cfg(feature = "mqtt")]
1337fn default_delay() -> u64 {
1338 100
1339}
1340
1341#[cfg(feature = "mqtt")]
1342async fn publish_mqtt_batch_handler(
1344 State(state): State<ManagementState>,
1345 Json(request): Json<serde_json::Value>,
1346) -> impl IntoResponse {
1347 let messages_json = request.get("messages").and_then(|v| v.as_array());
1349 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1350
1351 if messages_json.is_none() {
1352 return (
1353 StatusCode::BAD_REQUEST,
1354 Json(serde_json::json!({
1355 "error": "Invalid request",
1356 "message": "Missing required field: messages"
1357 })),
1358 );
1359 }
1360
1361 let messages_json = messages_json.unwrap();
1362
1363 if let Some(broker) = &state.mqtt_broker {
1364 if messages_json.is_empty() {
1365 return (
1366 StatusCode::BAD_REQUEST,
1367 Json(serde_json::json!({
1368 "error": "Empty batch",
1369 "message": "At least one message is required"
1370 })),
1371 );
1372 }
1373
1374 let mut results = Vec::new();
1375 let client_id = "mockforge-management-api".to_string();
1376
1377 for (index, msg_json) in messages_json.iter().enumerate() {
1378 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1379 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1380 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1381 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1382
1383 if topic.is_none() || payload.is_none() {
1384 results.push(serde_json::json!({
1385 "index": index,
1386 "success": false,
1387 "error": "Missing required fields: topic and payload"
1388 }));
1389 continue;
1390 }
1391
1392 let topic = topic.unwrap();
1393 let payload = payload.unwrap();
1394
1395 if qos > 2 {
1397 results.push(serde_json::json!({
1398 "index": index,
1399 "success": false,
1400 "error": "Invalid QoS (must be 0, 1, or 2)"
1401 }));
1402 continue;
1403 }
1404
1405 let payload_bytes = payload.as_bytes().to_vec();
1407
1408 let publish_result = broker
1409 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1410 .await
1411 .map_err(|e| format!("{}", e));
1412
1413 match publish_result {
1414 Ok(_) => {
1415 let event = MessageEvent::Mqtt(MqttMessageEvent {
1417 topic: topic.clone(),
1418 payload: payload.clone(),
1419 qos,
1420 retain,
1421 timestamp: chrono::Utc::now().to_rfc3339(),
1422 });
1423 let _ = state.message_events.send(event);
1424
1425 results.push(serde_json::json!({
1426 "index": index,
1427 "success": true,
1428 "topic": topic,
1429 "qos": qos
1430 }));
1431 }
1432 Err(error_msg) => {
1433 results.push(serde_json::json!({
1434 "index": index,
1435 "success": false,
1436 "error": error_msg
1437 }));
1438 }
1439 }
1440
1441 if index < messages_json.len() - 1 && delay_ms > 0 {
1443 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1444 }
1445 }
1446
1447 let success_count =
1448 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1449
1450 (
1451 StatusCode::OK,
1452 Json(serde_json::json!({
1453 "success": true,
1454 "total": messages_json.len(),
1455 "succeeded": success_count,
1456 "failed": messages_json.len() - success_count,
1457 "results": results
1458 })),
1459 )
1460 } else {
1461 (
1462 StatusCode::SERVICE_UNAVAILABLE,
1463 Json(serde_json::json!({
1464 "error": "MQTT broker not available",
1465 "message": "MQTT broker is not enabled or not available."
1466 })),
1467 )
1468 }
1469}
1470
1471#[cfg(not(feature = "mqtt"))]
1472async fn publish_mqtt_batch_handler(
1474 State(_state): State<ManagementState>,
1475 Json(_request): Json<serde_json::Value>,
1476) -> impl IntoResponse {
1477 (
1478 StatusCode::SERVICE_UNAVAILABLE,
1479 Json(serde_json::json!({
1480 "error": "MQTT feature not enabled",
1481 "message": "MQTT support is not compiled into this build"
1482 })),
1483 )
1484}
1485
1486#[derive(Debug, Deserialize)]
1490struct SetMigrationModeRequest {
1491 mode: String,
1492}
1493
1494async fn get_migration_routes(
1496 State(state): State<ManagementState>,
1497) -> Result<Json<serde_json::Value>, StatusCode> {
1498 let proxy_config = match &state.proxy_config {
1499 Some(config) => config,
1500 None => {
1501 return Ok(Json(serde_json::json!({
1502 "error": "Migration not configured. Proxy config not available."
1503 })));
1504 }
1505 };
1506
1507 let config = proxy_config.read().await;
1508 let routes = config.get_migration_routes();
1509
1510 Ok(Json(serde_json::json!({
1511 "routes": routes
1512 })))
1513}
1514
1515async fn toggle_route_migration(
1517 State(state): State<ManagementState>,
1518 Path(pattern): Path<String>,
1519) -> Result<Json<serde_json::Value>, StatusCode> {
1520 let proxy_config = match &state.proxy_config {
1521 Some(config) => config,
1522 None => {
1523 return Ok(Json(serde_json::json!({
1524 "error": "Migration not configured. Proxy config not available."
1525 })));
1526 }
1527 };
1528
1529 let mut config = proxy_config.write().await;
1530 let new_mode = match config.toggle_route_migration(&pattern) {
1531 Some(mode) => mode,
1532 None => {
1533 return Ok(Json(serde_json::json!({
1534 "error": format!("Route pattern not found: {}", pattern)
1535 })));
1536 }
1537 };
1538
1539 Ok(Json(serde_json::json!({
1540 "pattern": pattern,
1541 "mode": format!("{:?}", new_mode).to_lowercase()
1542 })))
1543}
1544
1545async fn set_route_migration_mode(
1547 State(state): State<ManagementState>,
1548 Path(pattern): Path<String>,
1549 Json(request): Json<SetMigrationModeRequest>,
1550) -> Result<Json<serde_json::Value>, StatusCode> {
1551 let proxy_config = match &state.proxy_config {
1552 Some(config) => config,
1553 None => {
1554 return Ok(Json(serde_json::json!({
1555 "error": "Migration not configured. Proxy config not available."
1556 })));
1557 }
1558 };
1559
1560 use mockforge_core::proxy::config::MigrationMode;
1561 let mode = match request.mode.to_lowercase().as_str() {
1562 "mock" => MigrationMode::Mock,
1563 "shadow" => MigrationMode::Shadow,
1564 "real" => MigrationMode::Real,
1565 "auto" => MigrationMode::Auto,
1566 _ => {
1567 return Ok(Json(serde_json::json!({
1568 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1569 })));
1570 }
1571 };
1572
1573 let mut config = proxy_config.write().await;
1574 let updated = config.update_rule_migration_mode(&pattern, mode);
1575
1576 if !updated {
1577 return Ok(Json(serde_json::json!({
1578 "error": format!("Route pattern not found: {}", pattern)
1579 })));
1580 }
1581
1582 Ok(Json(serde_json::json!({
1583 "pattern": pattern,
1584 "mode": format!("{:?}", mode).to_lowercase()
1585 })))
1586}
1587
1588async fn toggle_group_migration(
1590 State(state): State<ManagementState>,
1591 Path(group): Path<String>,
1592) -> Result<Json<serde_json::Value>, StatusCode> {
1593 let proxy_config = match &state.proxy_config {
1594 Some(config) => config,
1595 None => {
1596 return Ok(Json(serde_json::json!({
1597 "error": "Migration not configured. Proxy config not available."
1598 })));
1599 }
1600 };
1601
1602 let mut config = proxy_config.write().await;
1603 let new_mode = config.toggle_group_migration(&group);
1604
1605 Ok(Json(serde_json::json!({
1606 "group": group,
1607 "mode": format!("{:?}", new_mode).to_lowercase()
1608 })))
1609}
1610
1611async fn set_group_migration_mode(
1613 State(state): State<ManagementState>,
1614 Path(group): Path<String>,
1615 Json(request): Json<SetMigrationModeRequest>,
1616) -> Result<Json<serde_json::Value>, StatusCode> {
1617 let proxy_config = match &state.proxy_config {
1618 Some(config) => config,
1619 None => {
1620 return Ok(Json(serde_json::json!({
1621 "error": "Migration not configured. Proxy config not available."
1622 })));
1623 }
1624 };
1625
1626 use mockforge_core::proxy::config::MigrationMode;
1627 let mode = match request.mode.to_lowercase().as_str() {
1628 "mock" => MigrationMode::Mock,
1629 "shadow" => MigrationMode::Shadow,
1630 "real" => MigrationMode::Real,
1631 "auto" => MigrationMode::Auto,
1632 _ => {
1633 return Ok(Json(serde_json::json!({
1634 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1635 })));
1636 }
1637 };
1638
1639 let mut config = proxy_config.write().await;
1640 config.update_group_migration_mode(&group, mode);
1641
1642 Ok(Json(serde_json::json!({
1643 "group": group,
1644 "mode": format!("{:?}", mode).to_lowercase()
1645 })))
1646}
1647
1648async fn get_migration_groups(
1650 State(state): State<ManagementState>,
1651) -> Result<Json<serde_json::Value>, StatusCode> {
1652 let proxy_config = match &state.proxy_config {
1653 Some(config) => config,
1654 None => {
1655 return Ok(Json(serde_json::json!({
1656 "error": "Migration not configured. Proxy config not available."
1657 })));
1658 }
1659 };
1660
1661 let config = proxy_config.read().await;
1662 let groups = config.get_migration_groups();
1663
1664 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1666 .into_iter()
1667 .map(|(name, info)| {
1668 (
1669 name,
1670 serde_json::json!({
1671 "name": info.name,
1672 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1673 "route_count": info.route_count
1674 }),
1675 )
1676 })
1677 .collect();
1678
1679 Ok(Json(serde_json::json!(groups_json)))
1680}
1681
1682async fn get_migration_status(
1684 State(state): State<ManagementState>,
1685) -> Result<Json<serde_json::Value>, StatusCode> {
1686 let proxy_config = match &state.proxy_config {
1687 Some(config) => config,
1688 None => {
1689 return Ok(Json(serde_json::json!({
1690 "error": "Migration not configured. Proxy config not available."
1691 })));
1692 }
1693 };
1694
1695 let config = proxy_config.read().await;
1696 let routes = config.get_migration_routes();
1697 let groups = config.get_migration_groups();
1698
1699 let mut mock_count = 0;
1700 let mut shadow_count = 0;
1701 let mut real_count = 0;
1702 let mut auto_count = 0;
1703
1704 for route in &routes {
1705 match route.migration_mode {
1706 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1707 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1708 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1709 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1710 }
1711 }
1712
1713 Ok(Json(serde_json::json!({
1714 "total_routes": routes.len(),
1715 "mock_routes": mock_count,
1716 "shadow_routes": shadow_count,
1717 "real_routes": real_count,
1718 "auto_routes": auto_count,
1719 "total_groups": groups.len(),
1720 "migration_enabled": config.migration_enabled
1721 })))
1722}
1723
1724#[derive(Debug, Deserialize, Serialize)]
1728pub struct ProxyRuleRequest {
1729 pub pattern: String,
1731 #[serde(rename = "type")]
1733 pub rule_type: String,
1734 #[serde(default)]
1736 pub status_codes: Vec<u16>,
1737 pub body_transforms: Vec<BodyTransformRequest>,
1739 #[serde(default = "default_true")]
1741 pub enabled: bool,
1742}
1743
1744#[derive(Debug, Deserialize, Serialize)]
1746pub struct BodyTransformRequest {
1747 pub path: String,
1749 pub replace: String,
1751 #[serde(default)]
1753 pub operation: String,
1754}
1755
1756#[derive(Debug, Serialize)]
1758pub struct ProxyRuleResponse {
1759 pub id: usize,
1761 pub pattern: String,
1763 #[serde(rename = "type")]
1765 pub rule_type: String,
1766 pub status_codes: Vec<u16>,
1768 pub body_transforms: Vec<BodyTransformRequest>,
1770 pub enabled: bool,
1772}
1773
1774async fn list_proxy_rules(
1776 State(state): State<ManagementState>,
1777) -> Result<Json<serde_json::Value>, StatusCode> {
1778 let proxy_config = match &state.proxy_config {
1779 Some(config) => config,
1780 None => {
1781 return Ok(Json(serde_json::json!({
1782 "error": "Proxy not configured. Proxy config not available."
1783 })));
1784 }
1785 };
1786
1787 let config = proxy_config.read().await;
1788
1789 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1790
1791 for (idx, rule) in config.request_replacements.iter().enumerate() {
1793 rules.push(ProxyRuleResponse {
1794 id: idx,
1795 pattern: rule.pattern.clone(),
1796 rule_type: "request".to_string(),
1797 status_codes: Vec::new(),
1798 body_transforms: rule
1799 .body_transforms
1800 .iter()
1801 .map(|t| BodyTransformRequest {
1802 path: t.path.clone(),
1803 replace: t.replace.clone(),
1804 operation: format!("{:?}", t.operation).to_lowercase(),
1805 })
1806 .collect(),
1807 enabled: rule.enabled,
1808 });
1809 }
1810
1811 let request_count = config.request_replacements.len();
1813 for (idx, rule) in config.response_replacements.iter().enumerate() {
1814 rules.push(ProxyRuleResponse {
1815 id: request_count + idx,
1816 pattern: rule.pattern.clone(),
1817 rule_type: "response".to_string(),
1818 status_codes: rule.status_codes.clone(),
1819 body_transforms: rule
1820 .body_transforms
1821 .iter()
1822 .map(|t| BodyTransformRequest {
1823 path: t.path.clone(),
1824 replace: t.replace.clone(),
1825 operation: format!("{:?}", t.operation).to_lowercase(),
1826 })
1827 .collect(),
1828 enabled: rule.enabled,
1829 });
1830 }
1831
1832 Ok(Json(serde_json::json!({
1833 "rules": rules
1834 })))
1835}
1836
1837async fn create_proxy_rule(
1839 State(state): State<ManagementState>,
1840 Json(request): Json<ProxyRuleRequest>,
1841) -> Result<Json<serde_json::Value>, StatusCode> {
1842 let proxy_config = match &state.proxy_config {
1843 Some(config) => config,
1844 None => {
1845 return Ok(Json(serde_json::json!({
1846 "error": "Proxy not configured. Proxy config not available."
1847 })));
1848 }
1849 };
1850
1851 if request.body_transforms.is_empty() {
1853 return Ok(Json(serde_json::json!({
1854 "error": "At least one body transform is required"
1855 })));
1856 }
1857
1858 let body_transforms: Vec<BodyTransform> = request
1859 .body_transforms
1860 .iter()
1861 .map(|t| {
1862 let op = match t.operation.as_str() {
1863 "replace" => TransformOperation::Replace,
1864 "add" => TransformOperation::Add,
1865 "remove" => TransformOperation::Remove,
1866 _ => TransformOperation::Replace,
1867 };
1868 BodyTransform {
1869 path: t.path.clone(),
1870 replace: t.replace.clone(),
1871 operation: op,
1872 }
1873 })
1874 .collect();
1875
1876 let new_rule = BodyTransformRule {
1877 pattern: request.pattern.clone(),
1878 status_codes: request.status_codes.clone(),
1879 body_transforms,
1880 enabled: request.enabled,
1881 };
1882
1883 let mut config = proxy_config.write().await;
1884
1885 let rule_id = if request.rule_type == "request" {
1886 config.request_replacements.push(new_rule);
1887 config.request_replacements.len() - 1
1888 } else if request.rule_type == "response" {
1889 config.response_replacements.push(new_rule);
1890 config.request_replacements.len() + config.response_replacements.len() - 1
1891 } else {
1892 return Ok(Json(serde_json::json!({
1893 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1894 })));
1895 };
1896
1897 Ok(Json(serde_json::json!({
1898 "id": rule_id,
1899 "message": "Rule created successfully"
1900 })))
1901}
1902
1903async fn get_proxy_rule(
1905 State(state): State<ManagementState>,
1906 Path(id): Path<String>,
1907) -> Result<Json<serde_json::Value>, StatusCode> {
1908 let proxy_config = match &state.proxy_config {
1909 Some(config) => config,
1910 None => {
1911 return Ok(Json(serde_json::json!({
1912 "error": "Proxy not configured. Proxy config not available."
1913 })));
1914 }
1915 };
1916
1917 let config = proxy_config.read().await;
1918 let rule_id: usize = match id.parse() {
1919 Ok(id) => id,
1920 Err(_) => {
1921 return Ok(Json(serde_json::json!({
1922 "error": format!("Invalid rule ID: {}", id)
1923 })));
1924 }
1925 };
1926
1927 let request_count = config.request_replacements.len();
1928
1929 if rule_id < request_count {
1930 let rule = &config.request_replacements[rule_id];
1932 Ok(Json(serde_json::json!({
1933 "id": rule_id,
1934 "pattern": rule.pattern,
1935 "type": "request",
1936 "status_codes": [],
1937 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1938 "path": t.path,
1939 "replace": t.replace,
1940 "operation": format!("{:?}", t.operation).to_lowercase()
1941 })).collect::<Vec<_>>(),
1942 "enabled": rule.enabled
1943 })))
1944 } else if rule_id < request_count + config.response_replacements.len() {
1945 let response_idx = rule_id - request_count;
1947 let rule = &config.response_replacements[response_idx];
1948 Ok(Json(serde_json::json!({
1949 "id": rule_id,
1950 "pattern": rule.pattern,
1951 "type": "response",
1952 "status_codes": rule.status_codes,
1953 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1954 "path": t.path,
1955 "replace": t.replace,
1956 "operation": format!("{:?}", t.operation).to_lowercase()
1957 })).collect::<Vec<_>>(),
1958 "enabled": rule.enabled
1959 })))
1960 } else {
1961 Ok(Json(serde_json::json!({
1962 "error": format!("Rule ID {} not found", rule_id)
1963 })))
1964 }
1965}
1966
1967async fn update_proxy_rule(
1969 State(state): State<ManagementState>,
1970 Path(id): Path<String>,
1971 Json(request): Json<ProxyRuleRequest>,
1972) -> Result<Json<serde_json::Value>, StatusCode> {
1973 let proxy_config = match &state.proxy_config {
1974 Some(config) => config,
1975 None => {
1976 return Ok(Json(serde_json::json!({
1977 "error": "Proxy not configured. Proxy config not available."
1978 })));
1979 }
1980 };
1981
1982 let mut config = proxy_config.write().await;
1983 let rule_id: usize = match id.parse() {
1984 Ok(id) => id,
1985 Err(_) => {
1986 return Ok(Json(serde_json::json!({
1987 "error": format!("Invalid rule ID: {}", id)
1988 })));
1989 }
1990 };
1991
1992 let body_transforms: Vec<BodyTransform> = request
1993 .body_transforms
1994 .iter()
1995 .map(|t| {
1996 let op = match t.operation.as_str() {
1997 "replace" => TransformOperation::Replace,
1998 "add" => TransformOperation::Add,
1999 "remove" => TransformOperation::Remove,
2000 _ => TransformOperation::Replace,
2001 };
2002 BodyTransform {
2003 path: t.path.clone(),
2004 replace: t.replace.clone(),
2005 operation: op,
2006 }
2007 })
2008 .collect();
2009
2010 let updated_rule = BodyTransformRule {
2011 pattern: request.pattern.clone(),
2012 status_codes: request.status_codes.clone(),
2013 body_transforms,
2014 enabled: request.enabled,
2015 };
2016
2017 let request_count = config.request_replacements.len();
2018
2019 if rule_id < request_count {
2020 config.request_replacements[rule_id] = updated_rule;
2022 } else if rule_id < request_count + config.response_replacements.len() {
2023 let response_idx = rule_id - request_count;
2025 config.response_replacements[response_idx] = updated_rule;
2026 } else {
2027 return Ok(Json(serde_json::json!({
2028 "error": format!("Rule ID {} not found", rule_id)
2029 })));
2030 }
2031
2032 Ok(Json(serde_json::json!({
2033 "id": rule_id,
2034 "message": "Rule updated successfully"
2035 })))
2036}
2037
2038async fn delete_proxy_rule(
2040 State(state): State<ManagementState>,
2041 Path(id): Path<String>,
2042) -> Result<Json<serde_json::Value>, StatusCode> {
2043 let proxy_config = match &state.proxy_config {
2044 Some(config) => config,
2045 None => {
2046 return Ok(Json(serde_json::json!({
2047 "error": "Proxy not configured. Proxy config not available."
2048 })));
2049 }
2050 };
2051
2052 let mut config = proxy_config.write().await;
2053 let rule_id: usize = match id.parse() {
2054 Ok(id) => id,
2055 Err(_) => {
2056 return Ok(Json(serde_json::json!({
2057 "error": format!("Invalid rule ID: {}", id)
2058 })));
2059 }
2060 };
2061
2062 let request_count = config.request_replacements.len();
2063
2064 if rule_id < request_count {
2065 config.request_replacements.remove(rule_id);
2067 } else if rule_id < request_count + config.response_replacements.len() {
2068 let response_idx = rule_id - request_count;
2070 config.response_replacements.remove(response_idx);
2071 } else {
2072 return Ok(Json(serde_json::json!({
2073 "error": format!("Rule ID {} not found", rule_id)
2074 })));
2075 }
2076
2077 Ok(Json(serde_json::json!({
2078 "id": rule_id,
2079 "message": "Rule deleted successfully"
2080 })))
2081}
2082
2083async fn get_proxy_inspect(
2086 State(_state): State<ManagementState>,
2087 Query(params): Query<std::collections::HashMap<String, String>>,
2088) -> Result<Json<serde_json::Value>, StatusCode> {
2089 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2090
2091 Ok(Json(serde_json::json!({
2094 "requests": [],
2095 "responses": [],
2096 "limit": limit,
2097 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic in a future version."
2098 })))
2099}
2100
2101pub fn management_router(state: ManagementState) -> Router {
2103 let router = Router::new()
2104 .route("/health", get(health_check))
2105 .route("/stats", get(get_stats))
2106 .route("/config", get(get_config))
2107 .route("/config/validate", post(validate_config))
2108 .route("/config/bulk", post(bulk_update_config))
2109 .route("/mocks", get(list_mocks))
2110 .route("/mocks", post(create_mock))
2111 .route("/mocks/{id}", get(get_mock))
2112 .route("/mocks/{id}", put(update_mock))
2113 .route("/mocks/{id}", delete(delete_mock))
2114 .route("/export", get(export_mocks))
2115 .route("/import", post(import_mocks));
2116
2117 #[cfg(feature = "smtp")]
2118 let router = router
2119 .route("/smtp/mailbox", get(list_smtp_emails))
2120 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2121 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2122 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2123 .route("/smtp/mailbox/search", get(search_smtp_emails));
2124
2125 #[cfg(not(feature = "smtp"))]
2126 let router = router;
2127
2128 #[cfg(feature = "mqtt")]
2130 let router = router
2131 .route("/mqtt/stats", get(get_mqtt_stats))
2132 .route("/mqtt/clients", get(get_mqtt_clients))
2133 .route("/mqtt/topics", get(get_mqtt_topics))
2134 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2135 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2136 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2137 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2138
2139 #[cfg(not(feature = "mqtt"))]
2140 let router = router
2141 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2142 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2143
2144 #[cfg(feature = "kafka")]
2145 let router = router
2146 .route("/kafka/stats", get(get_kafka_stats))
2147 .route("/kafka/topics", get(get_kafka_topics))
2148 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2149 .route("/kafka/groups", get(get_kafka_groups))
2150 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2151 .route("/kafka/produce", post(produce_kafka_message))
2152 .route("/kafka/produce/batch", post(produce_kafka_batch))
2153 .route("/kafka/messages/stream", get(kafka_messages_stream));
2154
2155 #[cfg(not(feature = "kafka"))]
2156 let router = router;
2157
2158 let router = router
2160 .route("/migration/routes", get(get_migration_routes))
2161 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2162 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2163 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2164 .route("/migration/groups/{group}", put(set_group_migration_mode))
2165 .route("/migration/groups", get(get_migration_groups))
2166 .route("/migration/status", get(get_migration_status));
2167
2168 let router = router
2170 .route("/proxy/rules", get(list_proxy_rules))
2171 .route("/proxy/rules", post(create_proxy_rule))
2172 .route("/proxy/rules/{id}", get(get_proxy_rule))
2173 .route("/proxy/rules/{id}", put(update_proxy_rule))
2174 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2175 .route("/proxy/inspect", get(get_proxy_inspect));
2176
2177 let router = router
2179 .route("/ai/generate-spec", post(generate_ai_spec))
2180 .route("/mockai/generate-openapi", post(generate_openapi_from_traffic))
2181 .route("/mockai/learn", post(learn_from_examples))
2182 .route("/mockai/rules/explanations", get(list_rule_explanations))
2183 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2184 .route("/chaos/config", get(get_chaos_config))
2185 .route("/chaos/config", post(update_chaos_config))
2186 .route("/network/profiles", get(list_network_profiles))
2187 .route("/network/profile/apply", post(apply_network_profile));
2188
2189 let router =
2191 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2192
2193 router.with_state(state)
2194}
2195
2196#[cfg(feature = "kafka")]
2197#[derive(Debug, Clone, Serialize, Deserialize)]
2198pub struct KafkaBrokerStats {
2199 pub topics: usize,
2201 pub partitions: usize,
2203 pub consumer_groups: usize,
2205 pub messages_produced: u64,
2207 pub messages_consumed: u64,
2209}
2210
2211#[cfg(feature = "kafka")]
2212#[derive(Debug, Clone, Serialize, Deserialize)]
2213pub struct KafkaTopicInfo {
2214 pub name: String,
2215 pub partitions: usize,
2216 pub replication_factor: i32,
2217}
2218
2219#[cfg(feature = "kafka")]
2220#[derive(Debug, Clone, Serialize, Deserialize)]
2221pub struct KafkaConsumerGroupInfo {
2222 pub group_id: String,
2223 pub members: usize,
2224 pub state: String,
2225}
2226
2227#[cfg(feature = "kafka")]
2228async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2230 if let Some(broker) = &state.kafka_broker {
2231 let topics = broker.topics.read().await;
2232 let consumer_groups = broker.consumer_groups.read().await;
2233 let metrics = broker.metrics.clone();
2234
2235 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2236 let snapshot = metrics.snapshot();
2237 let messages_produced = snapshot.messages_produced_total;
2238 let messages_consumed = snapshot.messages_consumed_total;
2239
2240 let stats = KafkaBrokerStats {
2241 topics: topics.len(),
2242 partitions: total_partitions,
2243 consumer_groups: consumer_groups.groups().len(),
2244 messages_produced,
2245 messages_consumed,
2246 };
2247
2248 Json(stats).into_response()
2249 } else {
2250 (
2251 StatusCode::SERVICE_UNAVAILABLE,
2252 Json(serde_json::json!({
2253 "error": "Kafka broker not available",
2254 "message": "Kafka broker is not enabled or not available."
2255 })),
2256 )
2257 .into_response()
2258 }
2259}
2260
2261#[cfg(feature = "kafka")]
2262async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2264 if let Some(broker) = &state.kafka_broker {
2265 let topics = broker.topics.read().await;
2266 let topic_list: Vec<KafkaTopicInfo> = topics
2267 .iter()
2268 .map(|(name, topic)| KafkaTopicInfo {
2269 name: name.clone(),
2270 partitions: topic.partitions.len(),
2271 replication_factor: topic.config.replication_factor,
2272 })
2273 .collect();
2274
2275 Json(serde_json::json!({
2276 "topics": topic_list
2277 }))
2278 .into_response()
2279 } else {
2280 (
2281 StatusCode::SERVICE_UNAVAILABLE,
2282 Json(serde_json::json!({
2283 "error": "Kafka broker not available",
2284 "message": "Kafka broker is not enabled or not available."
2285 })),
2286 )
2287 .into_response()
2288 }
2289}
2290
2291#[cfg(feature = "kafka")]
2292async fn get_kafka_topic(
2294 State(state): State<ManagementState>,
2295 Path(topic_name): Path<String>,
2296) -> impl IntoResponse {
2297 if let Some(broker) = &state.kafka_broker {
2298 let topics = broker.topics.read().await;
2299 if let Some(topic) = topics.get(&topic_name) {
2300 Json(serde_json::json!({
2301 "name": topic_name,
2302 "partitions": topic.partitions.len(),
2303 "replication_factor": topic.config.replication_factor,
2304 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2305 "id": idx as i32,
2306 "leader": 0,
2307 "replicas": vec![0],
2308 "message_count": partition.messages.len()
2309 })).collect::<Vec<_>>()
2310 })).into_response()
2311 } else {
2312 (
2313 StatusCode::NOT_FOUND,
2314 Json(serde_json::json!({
2315 "error": "Topic not found",
2316 "topic": topic_name
2317 })),
2318 )
2319 .into_response()
2320 }
2321 } else {
2322 (
2323 StatusCode::SERVICE_UNAVAILABLE,
2324 Json(serde_json::json!({
2325 "error": "Kafka broker not available",
2326 "message": "Kafka broker is not enabled or not available."
2327 })),
2328 )
2329 .into_response()
2330 }
2331}
2332
2333#[cfg(feature = "kafka")]
2334async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2336 if let Some(broker) = &state.kafka_broker {
2337 let consumer_groups = broker.consumer_groups.read().await;
2338 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2339 .groups()
2340 .iter()
2341 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2342 group_id: group_id.clone(),
2343 members: group.members.len(),
2344 state: "Stable".to_string(), })
2346 .collect();
2347
2348 Json(serde_json::json!({
2349 "groups": groups
2350 }))
2351 .into_response()
2352 } else {
2353 (
2354 StatusCode::SERVICE_UNAVAILABLE,
2355 Json(serde_json::json!({
2356 "error": "Kafka broker not available",
2357 "message": "Kafka broker is not enabled or not available."
2358 })),
2359 )
2360 .into_response()
2361 }
2362}
2363
2364#[cfg(feature = "kafka")]
2365async fn get_kafka_group(
2367 State(state): State<ManagementState>,
2368 Path(group_id): Path<String>,
2369) -> impl IntoResponse {
2370 if let Some(broker) = &state.kafka_broker {
2371 let consumer_groups = broker.consumer_groups.read().await;
2372 if let Some(group) = consumer_groups.groups().get(&group_id) {
2373 Json(serde_json::json!({
2374 "group_id": group_id,
2375 "members": group.members.len(),
2376 "state": "Stable",
2377 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2378 "member_id": member_id,
2379 "client_id": member.client_id,
2380 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2381 "topic": a.topic,
2382 "partitions": a.partitions
2383 })).collect::<Vec<_>>()
2384 })).collect::<Vec<_>>(),
2385 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2386 "topic": topic,
2387 "partition": partition,
2388 "offset": offset
2389 })).collect::<Vec<_>>()
2390 })).into_response()
2391 } else {
2392 (
2393 StatusCode::NOT_FOUND,
2394 Json(serde_json::json!({
2395 "error": "Consumer group not found",
2396 "group_id": group_id
2397 })),
2398 )
2399 .into_response()
2400 }
2401 } else {
2402 (
2403 StatusCode::SERVICE_UNAVAILABLE,
2404 Json(serde_json::json!({
2405 "error": "Kafka broker not available",
2406 "message": "Kafka broker is not enabled or not available."
2407 })),
2408 )
2409 .into_response()
2410 }
2411}
2412
2413#[cfg(feature = "kafka")]
2416#[derive(Debug, Deserialize)]
2417pub struct KafkaProduceRequest {
2418 pub topic: String,
2420 #[serde(default)]
2422 pub key: Option<String>,
2423 pub value: String,
2425 #[serde(default)]
2427 pub partition: Option<i32>,
2428 #[serde(default)]
2430 pub headers: Option<std::collections::HashMap<String, String>>,
2431}
2432
2433#[cfg(feature = "kafka")]
2434async fn produce_kafka_message(
2436 State(state): State<ManagementState>,
2437 Json(request): Json<KafkaProduceRequest>,
2438) -> impl IntoResponse {
2439 if let Some(broker) = &state.kafka_broker {
2440 let mut topics = broker.topics.write().await;
2441
2442 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2444 crate::topics::Topic::new(request.topic.clone(), crate::topics::TopicConfig::default())
2445 });
2446
2447 let partition_id = if let Some(partition) = request.partition {
2449 partition
2450 } else {
2451 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2452 };
2453
2454 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2456 return (
2457 StatusCode::BAD_REQUEST,
2458 Json(serde_json::json!({
2459 "error": "Invalid partition",
2460 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2461 })),
2462 )
2463 .into_response();
2464 }
2465
2466 let message = crate::partitions::KafkaMessage {
2468 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2470 key: request.key.map(|k| k.as_bytes().to_vec()),
2471 value: request.value.as_bytes().to_vec(),
2472 headers: request
2473 .headers
2474 .unwrap_or_default()
2475 .into_iter()
2476 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2477 .collect(),
2478 };
2479
2480 match topic_entry.produce(partition_id, message).await {
2482 Ok(offset) => {
2483 broker.metrics.record_messages_produced(1);
2485
2486 #[cfg(feature = "kafka")]
2488 {
2489 let event = MessageEvent::Kafka(KafkaMessageEvent {
2490 topic: request.topic.clone(),
2491 key: request.key.clone(),
2492 value: request.value.clone(),
2493 partition: partition_id,
2494 offset,
2495 headers: request.headers.clone(),
2496 timestamp: chrono::Utc::now().to_rfc3339(),
2497 });
2498 let _ = state.message_events.send(event);
2499 }
2500
2501 Json(serde_json::json!({
2502 "success": true,
2503 "message": format!("Message produced to topic '{}'", request.topic),
2504 "topic": request.topic,
2505 "partition": partition_id,
2506 "offset": offset
2507 }))
2508 .into_response()
2509 }
2510 Err(e) => (
2511 StatusCode::INTERNAL_SERVER_ERROR,
2512 Json(serde_json::json!({
2513 "error": "Failed to produce message",
2514 "message": e.to_string()
2515 })),
2516 )
2517 .into_response(),
2518 }
2519 } else {
2520 (
2521 StatusCode::SERVICE_UNAVAILABLE,
2522 Json(serde_json::json!({
2523 "error": "Kafka broker not available",
2524 "message": "Kafka broker is not enabled or not available."
2525 })),
2526 )
2527 .into_response()
2528 }
2529}
2530
2531#[cfg(feature = "kafka")]
2532#[derive(Debug, Deserialize)]
2533pub struct KafkaBatchProduceRequest {
2534 pub messages: Vec<KafkaProduceRequest>,
2536 #[serde(default = "default_delay")]
2538 pub delay_ms: u64,
2539}
2540
2541#[cfg(feature = "kafka")]
2542async fn produce_kafka_batch(
2544 State(state): State<ManagementState>,
2545 Json(request): Json<KafkaBatchProduceRequest>,
2546) -> impl IntoResponse {
2547 if let Some(broker) = &state.kafka_broker {
2548 if request.messages.is_empty() {
2549 return (
2550 StatusCode::BAD_REQUEST,
2551 Json(serde_json::json!({
2552 "error": "Empty batch",
2553 "message": "At least one message is required"
2554 })),
2555 )
2556 .into_response();
2557 }
2558
2559 let mut results = Vec::new();
2560
2561 for (index, msg_request) in request.messages.iter().enumerate() {
2562 let mut topics = broker.topics.write().await;
2563
2564 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2566 crate::topics::Topic::new(
2567 msg_request.topic.clone(),
2568 crate::topics::TopicConfig::default(),
2569 )
2570 });
2571
2572 let partition_id = if let Some(partition) = msg_request.partition {
2574 partition
2575 } else {
2576 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2577 };
2578
2579 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2581 results.push(serde_json::json!({
2582 "index": index,
2583 "success": false,
2584 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2585 }));
2586 continue;
2587 }
2588
2589 let message = crate::partitions::KafkaMessage {
2591 offset: 0,
2592 timestamp: chrono::Utc::now().timestamp_millis(),
2593 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2594 value: msg_request.value.as_bytes().to_vec(),
2595 headers: msg_request
2596 .headers
2597 .clone()
2598 .unwrap_or_default()
2599 .into_iter()
2600 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2601 .collect(),
2602 };
2603
2604 match topic_entry.produce(partition_id, message).await {
2606 Ok(offset) => {
2607 broker.metrics.record_messages_produced(1);
2608
2609 let event = MessageEvent::Kafka(KafkaMessageEvent {
2611 topic: msg_request.topic.clone(),
2612 key: msg_request.key.clone(),
2613 value: msg_request.value.clone(),
2614 partition: partition_id,
2615 offset,
2616 headers: msg_request.headers.clone(),
2617 timestamp: chrono::Utc::now().to_rfc3339(),
2618 });
2619 let _ = state.message_events.send(event);
2620
2621 results.push(serde_json::json!({
2622 "index": index,
2623 "success": true,
2624 "topic": msg_request.topic,
2625 "partition": partition_id,
2626 "offset": offset
2627 }));
2628 }
2629 Err(e) => {
2630 results.push(serde_json::json!({
2631 "index": index,
2632 "success": false,
2633 "error": e.to_string()
2634 }));
2635 }
2636 }
2637
2638 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2640 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2641 }
2642 }
2643
2644 let success_count =
2645 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2646
2647 Json(serde_json::json!({
2648 "success": true,
2649 "total": request.messages.len(),
2650 "succeeded": success_count,
2651 "failed": request.messages.len() - success_count,
2652 "results": results
2653 }))
2654 .into_response()
2655 } else {
2656 (
2657 StatusCode::SERVICE_UNAVAILABLE,
2658 Json(serde_json::json!({
2659 "error": "Kafka broker not available",
2660 "message": "Kafka broker is not enabled or not available."
2661 })),
2662 )
2663 .into_response()
2664 }
2665}
2666
2667#[cfg(feature = "mqtt")]
2670async fn mqtt_messages_stream(
2672 State(state): State<ManagementState>,
2673 Query(params): Query<std::collections::HashMap<String, String>>,
2674) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2675 let mut rx = state.message_events.subscribe();
2676 let topic_filter = params.get("topic").cloned();
2677
2678 let stream = stream::unfold(rx, move |mut rx| {
2679 let topic_filter = topic_filter.clone();
2680
2681 async move {
2682 loop {
2683 match rx.recv().await {
2684 Ok(MessageEvent::Mqtt(event)) => {
2685 if let Some(filter) = &topic_filter {
2687 if !event.topic.contains(filter) {
2688 continue;
2689 }
2690 }
2691
2692 let event_json = serde_json::json!({
2693 "protocol": "mqtt",
2694 "topic": event.topic,
2695 "payload": event.payload,
2696 "qos": event.qos,
2697 "retain": event.retain,
2698 "timestamp": event.timestamp,
2699 });
2700
2701 if let Ok(event_data) = serde_json::to_string(&event_json) {
2702 let sse_event = Event::default().event("mqtt_message").data(event_data);
2703 return Some((Ok(sse_event), rx));
2704 }
2705 }
2706 #[cfg(feature = "kafka")]
2707 Ok(MessageEvent::Kafka(_)) => {
2708 continue;
2710 }
2711 Err(broadcast::error::RecvError::Closed) => {
2712 return None;
2713 }
2714 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2715 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2716 continue;
2717 }
2718 }
2719 }
2720 }
2721 });
2722
2723 Sse::new(stream).keep_alive(
2724 axum::response::sse::KeepAlive::new()
2725 .interval(std::time::Duration::from_secs(15))
2726 .text("keep-alive-text"),
2727 )
2728}
2729
2730#[cfg(feature = "kafka")]
2731async fn kafka_messages_stream(
2733 State(state): State<ManagementState>,
2734 Query(params): Query<std::collections::HashMap<String, String>>,
2735) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2736 let mut rx = state.message_events.subscribe();
2737 let topic_filter = params.get("topic").cloned();
2738
2739 let stream = stream::unfold(rx, move |mut rx| {
2740 let topic_filter = topic_filter.clone();
2741
2742 async move {
2743 loop {
2744 match rx.recv().await {
2745 #[cfg(feature = "mqtt")]
2746 Ok(MessageEvent::Mqtt(_)) => {
2747 continue;
2749 }
2750 Ok(MessageEvent::Kafka(event)) => {
2751 if let Some(filter) = &topic_filter {
2753 if !event.topic.contains(filter) {
2754 continue;
2755 }
2756 }
2757
2758 let event_json = serde_json::json!({
2759 "protocol": "kafka",
2760 "topic": event.topic,
2761 "key": event.key,
2762 "value": event.value,
2763 "partition": event.partition,
2764 "offset": event.offset,
2765 "headers": event.headers,
2766 "timestamp": event.timestamp,
2767 });
2768
2769 if let Ok(event_data) = serde_json::to_string(&event_json) {
2770 let sse_event =
2771 Event::default().event("kafka_message").data(event_data);
2772 return Some((Ok(sse_event), rx));
2773 }
2774 }
2775 Err(broadcast::error::RecvError::Closed) => {
2776 return None;
2777 }
2778 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2779 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2780 continue;
2781 }
2782 }
2783 }
2784 }
2785 });
2786
2787 Sse::new(stream).keep_alive(
2788 axum::response::sse::KeepAlive::new()
2789 .interval(std::time::Duration::from_secs(15))
2790 .text("keep-alive-text"),
2791 )
2792}
2793
2794#[derive(Debug, Deserialize)]
2798pub struct GenerateSpecRequest {
2799 pub query: String,
2801 pub spec_type: String,
2803 pub api_version: Option<String>,
2805}
2806
2807#[derive(Debug, Deserialize)]
2809pub struct GenerateOpenApiFromTrafficRequest {
2810 #[serde(default)]
2812 pub database_path: Option<String>,
2813 #[serde(default)]
2815 pub since: Option<String>,
2816 #[serde(default)]
2818 pub until: Option<String>,
2819 #[serde(default)]
2821 pub path_pattern: Option<String>,
2822 #[serde(default = "default_min_confidence")]
2824 pub min_confidence: f64,
2825}
2826
2827fn default_min_confidence() -> f64 {
2828 0.7
2829}
2830
2831#[cfg(feature = "data-faker")]
2833async fn generate_ai_spec(
2834 State(_state): State<ManagementState>,
2835 Json(request): Json<GenerateSpecRequest>,
2836) -> impl IntoResponse {
2837 use mockforge_data::rag::{
2838 config::{EmbeddingProvider, LlmProvider, RagConfig},
2839 engine::RagEngine,
2840 storage::{DocumentStorage, StorageFactory},
2841 };
2842 use std::sync::Arc;
2843
2844 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2846 .ok()
2847 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2848
2849 if api_key.is_none() {
2851 return (
2852 StatusCode::SERVICE_UNAVAILABLE,
2853 Json(serde_json::json!({
2854 "error": "AI service not configured",
2855 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2856 })),
2857 )
2858 .into_response();
2859 }
2860
2861 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2863 .unwrap_or_else(|_| "openai".to_string())
2864 .to_lowercase();
2865
2866 let provider = match provider_str.as_str() {
2867 "openai" => LlmProvider::OpenAI,
2868 "anthropic" => LlmProvider::Anthropic,
2869 "ollama" => LlmProvider::Ollama,
2870 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2871 _ => LlmProvider::OpenAI,
2872 };
2873
2874 let api_endpoint =
2875 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2876 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2877 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2878 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2879 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2880 });
2881
2882 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2883 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2884 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2885 LlmProvider::Ollama => "llama2".to_string(),
2886 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2887 });
2888
2889 let mut rag_config = RagConfig::default();
2891 rag_config.provider = provider;
2892 rag_config.api_endpoint = api_endpoint;
2893 rag_config.api_key = api_key;
2894 rag_config.model = model;
2895 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2896 .unwrap_or_else(|_| "4096".to_string())
2897 .parse()
2898 .unwrap_or(4096);
2899 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2900 .unwrap_or_else(|_| "0.3".to_string())
2901 .parse()
2902 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2904 .unwrap_or_else(|_| "60".to_string())
2905 .parse()
2906 .unwrap_or(60);
2907 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2908 .unwrap_or_else(|_| "4000".to_string())
2909 .parse()
2910 .unwrap_or(4000);
2911
2912 let spec_type_label = match request.spec_type.as_str() {
2914 "openapi" => "OpenAPI 3.0",
2915 "graphql" => "GraphQL",
2916 "asyncapi" => "AsyncAPI",
2917 _ => "OpenAPI 3.0",
2918 };
2919
2920 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2921
2922 let prompt = format!(
2923 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2924
2925User Requirements:
2926{}
2927
2928Instructions:
29291. Generate a complete, valid {} specification
29302. Include all paths, operations, request/response schemas, and components
29313. Use realistic field names and data types
29324. Include proper descriptions and examples
29335. Follow {} best practices
29346. Return ONLY the specification, no additional explanation
29357. For OpenAPI, use version {}
2936
2937Return the specification in {} format."#,
2938 spec_type_label,
2939 request.query,
2940 spec_type_label,
2941 spec_type_label,
2942 api_version,
2943 if request.spec_type == "graphql" {
2944 "GraphQL SDL"
2945 } else {
2946 "YAML"
2947 }
2948 );
2949
2950 use mockforge_data::rag::storage::InMemoryStorage;
2955 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
2956
2957 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
2959 Ok(engine) => engine,
2960 Err(e) => {
2961 return (
2962 StatusCode::INTERNAL_SERVER_ERROR,
2963 Json(serde_json::json!({
2964 "error": "Failed to initialize RAG engine",
2965 "message": e.to_string()
2966 })),
2967 )
2968 .into_response();
2969 }
2970 };
2971
2972 match rag_engine.generate(&prompt, None).await {
2974 Ok(generated_text) => {
2975 let spec = if request.spec_type == "graphql" {
2977 extract_graphql_schema(&generated_text)
2979 } else {
2980 extract_yaml_spec(&generated_text)
2982 };
2983
2984 Json(serde_json::json!({
2985 "success": true,
2986 "spec": spec,
2987 "spec_type": request.spec_type,
2988 }))
2989 .into_response()
2990 }
2991 Err(e) => (
2992 StatusCode::INTERNAL_SERVER_ERROR,
2993 Json(serde_json::json!({
2994 "error": "AI generation failed",
2995 "message": e.to_string()
2996 })),
2997 )
2998 .into_response(),
2999 }
3000}
3001
3002#[cfg(not(feature = "data-faker"))]
3003async fn generate_ai_spec(
3004 State(_state): State<ManagementState>,
3005 Json(_request): Json<GenerateSpecRequest>,
3006) -> impl IntoResponse {
3007 (
3008 StatusCode::NOT_IMPLEMENTED,
3009 Json(serde_json::json!({
3010 "error": "AI features not enabled",
3011 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3012 })),
3013 )
3014 .into_response()
3015}
3016
3017async fn generate_openapi_from_traffic(
3019 State(_state): State<ManagementState>,
3020 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3021) -> impl IntoResponse {
3022 use chrono::{DateTime, Utc};
3023 use mockforge_core::intelligent_behavior::{
3024 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3025 IntelligentBehaviorConfig,
3026 };
3027 use mockforge_recorder::{
3028 database::RecorderDatabase,
3029 openapi_export::{QueryFilters, RecordingsToOpenApi},
3030 };
3031 use std::path::PathBuf;
3032
3033 let db_path = if let Some(ref path) = request.database_path {
3035 PathBuf::from(path)
3036 } else {
3037 std::env::current_dir()
3038 .unwrap_or_else(|_| PathBuf::from("."))
3039 .join("recordings.db")
3040 };
3041
3042 let db = match RecorderDatabase::new(&db_path).await {
3044 Ok(db) => db,
3045 Err(e) => {
3046 return (
3047 StatusCode::BAD_REQUEST,
3048 Json(serde_json::json!({
3049 "error": "Database error",
3050 "message": format!("Failed to open recorder database: {}", e)
3051 })),
3052 )
3053 .into_response();
3054 }
3055 };
3056
3057 let since_dt = if let Some(ref since_str) = request.since {
3059 match DateTime::parse_from_rfc3339(since_str) {
3060 Ok(dt) => Some(dt.with_timezone(&Utc)),
3061 Err(e) => {
3062 return (
3063 StatusCode::BAD_REQUEST,
3064 Json(serde_json::json!({
3065 "error": "Invalid date format",
3066 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3067 })),
3068 )
3069 .into_response();
3070 }
3071 }
3072 } else {
3073 None
3074 };
3075
3076 let until_dt = if let Some(ref until_str) = request.until {
3077 match DateTime::parse_from_rfc3339(until_str) {
3078 Ok(dt) => Some(dt.with_timezone(&Utc)),
3079 Err(e) => {
3080 return (
3081 StatusCode::BAD_REQUEST,
3082 Json(serde_json::json!({
3083 "error": "Invalid date format",
3084 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3085 })),
3086 )
3087 .into_response();
3088 }
3089 }
3090 } else {
3091 None
3092 };
3093
3094 let query_filters = QueryFilters {
3096 since: since_dt,
3097 until: until_dt,
3098 path_pattern: request.path_pattern.clone(),
3099 min_status_code: None,
3100 max_requests: Some(1000),
3101 };
3102
3103 let exchanges = match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await
3105 {
3106 Ok(exchanges) => exchanges,
3107 Err(e) => {
3108 return (
3109 StatusCode::INTERNAL_SERVER_ERROR,
3110 Json(serde_json::json!({
3111 "error": "Query error",
3112 "message": format!("Failed to query HTTP exchanges: {}", e)
3113 })),
3114 )
3115 .into_response();
3116 }
3117 };
3118
3119 if exchanges.is_empty() {
3120 return (
3121 StatusCode::NOT_FOUND,
3122 Json(serde_json::json!({
3123 "error": "No exchanges found",
3124 "message": "No HTTP exchanges found matching the specified filters"
3125 })),
3126 )
3127 .into_response();
3128 }
3129
3130 let behavior_config = IntelligentBehaviorConfig::default();
3132 let gen_config = OpenApiGenerationConfig {
3133 min_confidence: request.min_confidence,
3134 behavior_model: Some(behavior_config.behavior_model),
3135 };
3136
3137 let generator = OpenApiSpecGenerator::new(gen_config);
3139 let result = match generator.generate_from_exchanges(exchanges).await {
3140 Ok(result) => result,
3141 Err(e) => {
3142 return (
3143 StatusCode::INTERNAL_SERVER_ERROR,
3144 Json(serde_json::json!({
3145 "error": "Generation error",
3146 "message": format!("Failed to generate OpenAPI spec: {}", e)
3147 })),
3148 )
3149 .into_response();
3150 }
3151 };
3152
3153 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3155 raw.clone()
3156 } else {
3157 match serde_json::to_value(&result.spec.spec) {
3158 Ok(json) => json,
3159 Err(e) => {
3160 return (
3161 StatusCode::INTERNAL_SERVER_ERROR,
3162 Json(serde_json::json!({
3163 "error": "Serialization error",
3164 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3165 })),
3166 )
3167 .into_response();
3168 }
3169 }
3170 };
3171
3172 let response = serde_json::json!({
3174 "spec": spec_json,
3175 "metadata": {
3176 "requests_analyzed": result.metadata.requests_analyzed,
3177 "paths_inferred": result.metadata.paths_inferred,
3178 "path_confidence": result.metadata.path_confidence,
3179 "generated_at": result.metadata.generated_at.to_rfc3339(),
3180 "duration_ms": result.metadata.duration_ms,
3181 }
3182 });
3183
3184 Json(response).into_response()
3185}
3186
3187async fn list_rule_explanations(
3189 State(state): State<ManagementState>,
3190 Query(params): Query<std::collections::HashMap<String, String>>,
3191) -> impl IntoResponse {
3192 use mockforge_core::intelligent_behavior::RuleType;
3193
3194 let explanations = state.rule_explanations.read().await;
3195 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3196
3197 if let Some(rule_type_str) = params.get("rule_type") {
3199 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3200 explanations_vec.retain(|e| e.rule_type == rule_type);
3201 }
3202 }
3203
3204 if let Some(min_confidence_str) = params.get("min_confidence") {
3206 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3207 explanations_vec.retain(|e| e.confidence >= min_confidence);
3208 }
3209 }
3210
3211 explanations_vec.sort_by(|a, b| {
3213 b.confidence
3214 .partial_cmp(&a.confidence)
3215 .unwrap_or(std::cmp::Ordering::Equal)
3216 .then_with(|| b.generated_at.cmp(&a.generated_at))
3217 });
3218
3219 Json(serde_json::json!({
3220 "explanations": explanations_vec,
3221 "total": explanations_vec.len(),
3222 }))
3223 .into_response()
3224}
3225
3226async fn get_rule_explanation(
3228 State(state): State<ManagementState>,
3229 Path(rule_id): Path<String>,
3230) -> impl IntoResponse {
3231 let explanations = state.rule_explanations.read().await;
3232
3233 match explanations.get(&rule_id) {
3234 Some(explanation) => Json(serde_json::json!({
3235 "explanation": explanation,
3236 }))
3237 .into_response(),
3238 None => (
3239 StatusCode::NOT_FOUND,
3240 Json(serde_json::json!({
3241 "error": "Rule explanation not found",
3242 "message": format!("No explanation found for rule ID: {}", rule_id)
3243 })),
3244 )
3245 .into_response(),
3246 }
3247}
3248
3249#[derive(Debug, Deserialize)]
3251pub struct LearnFromExamplesRequest {
3252 pub examples: Vec<ExamplePairRequest>,
3254 #[serde(default)]
3256 pub config: Option<serde_json::Value>,
3257}
3258
3259#[derive(Debug, Deserialize)]
3261pub struct ExamplePairRequest {
3262 pub request: serde_json::Value,
3264 pub response: serde_json::Value,
3266}
3267
3268async fn learn_from_examples(
3273 State(state): State<ManagementState>,
3274 Json(request): Json<LearnFromExamplesRequest>,
3275) -> impl IntoResponse {
3276 use mockforge_core::intelligent_behavior::{
3277 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3278 rule_generator::{ExamplePair, RuleGenerator},
3279 };
3280
3281 if request.examples.is_empty() {
3282 return (
3283 StatusCode::BAD_REQUEST,
3284 Json(serde_json::json!({
3285 "error": "No examples provided",
3286 "message": "At least one example pair is required"
3287 })),
3288 )
3289 .into_response();
3290 }
3291
3292 let example_pairs: Result<Vec<ExamplePair>, String> = request
3294 .examples
3295 .into_iter()
3296 .enumerate()
3297 .map(|(idx, ex)| {
3298 let method = ex
3300 .request
3301 .get("method")
3302 .and_then(|v| v.as_str())
3303 .map(|s| s.to_string())
3304 .unwrap_or_else(|| "GET".to_string());
3305 let path = ex
3306 .request
3307 .get("path")
3308 .and_then(|v| v.as_str())
3309 .map(|s| s.to_string())
3310 .unwrap_or_else(|| "/".to_string());
3311 let request_body = ex.request.get("body").cloned();
3312 let query_params = ex
3313 .request
3314 .get("query_params")
3315 .and_then(|v| v.as_object())
3316 .map(|obj| {
3317 obj.iter()
3318 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3319 .collect()
3320 })
3321 .unwrap_or_default();
3322 let headers = ex
3323 .request
3324 .get("headers")
3325 .and_then(|v| v.as_object())
3326 .map(|obj| {
3327 obj.iter()
3328 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3329 .collect()
3330 })
3331 .unwrap_or_default();
3332
3333 let status = ex
3335 .response
3336 .get("status_code")
3337 .or_else(|| ex.response.get("status"))
3338 .and_then(|v| v.as_u64())
3339 .map(|n| n as u16)
3340 .unwrap_or(200);
3341 let response_body = ex.response.get("body").cloned();
3342
3343 Ok(ExamplePair {
3344 method,
3345 path,
3346 request: request_body,
3347 status,
3348 response: response_body,
3349 query_params,
3350 headers,
3351 metadata: {
3352 let mut meta = std::collections::HashMap::new();
3353 meta.insert("source".to_string(), "api".to_string());
3354 meta.insert("example_index".to_string(), idx.to_string());
3355 meta
3356 },
3357 })
3358 })
3359 .collect();
3360
3361 let example_pairs = match example_pairs {
3362 Ok(pairs) => pairs,
3363 Err(e) => {
3364 return (
3365 StatusCode::BAD_REQUEST,
3366 Json(serde_json::json!({
3367 "error": "Invalid examples",
3368 "message": e
3369 })),
3370 )
3371 .into_response();
3372 }
3373 };
3374
3375 let behavior_config = if let Some(config_json) = request.config {
3377 serde_json::from_value(config_json)
3379 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3380 .behavior_model
3381 } else {
3382 BehaviorModelConfig::default()
3383 };
3384
3385 let generator = RuleGenerator::new(behavior_config);
3387
3388 let (rules, explanations) =
3390 match generator.generate_rules_with_explanations(example_pairs).await {
3391 Ok(result) => result,
3392 Err(e) => {
3393 return (
3394 StatusCode::INTERNAL_SERVER_ERROR,
3395 Json(serde_json::json!({
3396 "error": "Rule generation failed",
3397 "message": format!("Failed to generate rules: {}", e)
3398 })),
3399 )
3400 .into_response();
3401 }
3402 };
3403
3404 {
3406 let mut stored_explanations = state.rule_explanations.write().await;
3407 for explanation in &explanations {
3408 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3409 }
3410 }
3411
3412 let response = serde_json::json!({
3414 "success": true,
3415 "rules_generated": {
3416 "consistency_rules": rules.consistency_rules.len(),
3417 "schemas": rules.schemas.len(),
3418 "state_machines": rules.state_transitions.len(),
3419 "system_prompt": !rules.system_prompt.is_empty(),
3420 },
3421 "explanations": explanations.iter().map(|e| serde_json::json!({
3422 "rule_id": e.rule_id,
3423 "rule_type": e.rule_type,
3424 "confidence": e.confidence,
3425 "reasoning": e.reasoning,
3426 })).collect::<Vec<_>>(),
3427 "total_explanations": explanations.len(),
3428 });
3429
3430 Json(response).into_response()
3431}
3432
3433fn extract_yaml_spec(text: &str) -> String {
3434 if let Some(start) = text.find("```yaml") {
3436 let yaml_start = text[start + 7..].trim_start();
3437 if let Some(end) = yaml_start.find("```") {
3438 return yaml_start[..end].trim().to_string();
3439 }
3440 }
3441 if let Some(start) = text.find("```") {
3442 let content_start = text[start + 3..].trim_start();
3443 if let Some(end) = content_start.find("```") {
3444 return content_start[..end].trim().to_string();
3445 }
3446 }
3447
3448 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3450 return text.trim().to_string();
3451 }
3452
3453 text.trim().to_string()
3455}
3456
3457fn extract_graphql_schema(text: &str) -> String {
3458 if let Some(start) = text.find("```graphql") {
3460 let schema_start = text[start + 10..].trim_start();
3461 if let Some(end) = schema_start.find("```") {
3462 return schema_start[..end].trim().to_string();
3463 }
3464 }
3465 if let Some(start) = text.find("```") {
3466 let content_start = text[start + 3..].trim_start();
3467 if let Some(end) = content_start.find("```") {
3468 return content_start[..end].trim().to_string();
3469 }
3470 }
3471
3472 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3474 return text.trim().to_string();
3475 }
3476
3477 text.trim().to_string()
3478}
3479
3480async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3484 Json(serde_json::json!({
3486 "enabled": false,
3487 "latency": null,
3488 "fault_injection": null,
3489 "rate_limit": null,
3490 "traffic_shaping": null,
3491 }))
3492 .into_response()
3493}
3494
3495#[derive(Debug, Deserialize)]
3497pub struct ChaosConfigUpdate {
3498 pub enabled: Option<bool>,
3500 pub latency: Option<serde_json::Value>,
3502 pub fault_injection: Option<serde_json::Value>,
3504 pub rate_limit: Option<serde_json::Value>,
3506 pub traffic_shaping: Option<serde_json::Value>,
3508}
3509
3510async fn update_chaos_config(
3512 State(_state): State<ManagementState>,
3513 Json(config): Json<ChaosConfigUpdate>,
3514) -> impl IntoResponse {
3515 Json(serde_json::json!({
3517 "success": true,
3518 "message": "Chaos configuration updated"
3519 }))
3520 .into_response()
3521}
3522
3523async fn list_network_profiles() -> impl IntoResponse {
3527 use mockforge_core::network_profiles::NetworkProfileCatalog;
3528
3529 let catalog = NetworkProfileCatalog::default();
3530 let profiles: Vec<serde_json::Value> = catalog
3531 .list_profiles_with_description()
3532 .iter()
3533 .map(|(name, description)| {
3534 serde_json::json!({
3535 "name": name,
3536 "description": description,
3537 })
3538 })
3539 .collect();
3540
3541 Json(serde_json::json!({
3542 "profiles": profiles
3543 }))
3544 .into_response()
3545}
3546
3547#[derive(Debug, Deserialize)]
3548pub struct ApplyNetworkProfileRequest {
3550 pub profile_name: String,
3552}
3553
3554async fn apply_network_profile(
3556 State(_state): State<ManagementState>,
3557 Json(request): Json<ApplyNetworkProfileRequest>,
3558) -> impl IntoResponse {
3559 use mockforge_core::network_profiles::NetworkProfileCatalog;
3560
3561 let catalog = NetworkProfileCatalog::default();
3562 if let Some(profile) = catalog.get(&request.profile_name) {
3563 Json(serde_json::json!({
3565 "success": true,
3566 "message": format!("Network profile '{}' applied", request.profile_name),
3567 "profile": {
3568 "name": profile.name,
3569 "description": profile.description,
3570 }
3571 }))
3572 .into_response()
3573 } else {
3574 (
3575 StatusCode::NOT_FOUND,
3576 Json(serde_json::json!({
3577 "error": "Profile not found",
3578 "message": format!("Network profile '{}' not found", request.profile_name)
3579 })),
3580 )
3581 .into_response()
3582 }
3583}
3584
3585pub fn management_router_with_ui_builder(
3587 state: ManagementState,
3588 server_config: mockforge_core::config::ServerConfig,
3589) -> Router {
3590 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3591
3592 let management = management_router(state);
3594
3595 let ui_builder_state = UIBuilderState::new(server_config);
3597 let ui_builder = create_ui_builder_router(ui_builder_state);
3598
3599 management.nest("/ui-builder", ui_builder)
3601}
3602
3603pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3605 use crate::spec_import::{spec_import_router, SpecImportState};
3606
3607 let management = management_router(state);
3609
3610 Router::new()
3612 .merge(management)
3613 .merge(spec_import_router(SpecImportState::new()))
3614}
3615
3616#[cfg(test)]
3617mod tests {
3618 use super::*;
3619
3620 #[tokio::test]
3621 async fn test_create_and_get_mock() {
3622 let state = ManagementState::new(None, None, 3000);
3623
3624 let mock = MockConfig {
3625 id: "test-1".to_string(),
3626 name: "Test Mock".to_string(),
3627 method: "GET".to_string(),
3628 path: "/test".to_string(),
3629 response: MockResponse {
3630 body: serde_json::json!({"message": "test"}),
3631 headers: None,
3632 },
3633 enabled: true,
3634 latency_ms: None,
3635 status_code: Some(200),
3636 request_match: None,
3637 priority: None,
3638 scenario: None,
3639 required_scenario_state: None,
3640 new_scenario_state: None,
3641 };
3642
3643 {
3645 let mut mocks = state.mocks.write().await;
3646 mocks.push(mock.clone());
3647 }
3648
3649 let mocks = state.mocks.read().await;
3651 let found = mocks.iter().find(|m| m.id == "test-1");
3652 assert!(found.is_some());
3653 assert_eq!(found.unwrap().name, "Test Mock");
3654 }
3655
3656 #[tokio::test]
3657 async fn test_server_stats() {
3658 let state = ManagementState::new(None, None, 3000);
3659
3660 {
3662 let mut mocks = state.mocks.write().await;
3663 mocks.push(MockConfig {
3664 id: "1".to_string(),
3665 name: "Mock 1".to_string(),
3666 method: "GET".to_string(),
3667 path: "/test1".to_string(),
3668 response: MockResponse {
3669 body: serde_json::json!({}),
3670 headers: None,
3671 },
3672 enabled: true,
3673 latency_ms: None,
3674 status_code: Some(200),
3675 request_match: None,
3676 priority: None,
3677 scenario: None,
3678 required_scenario_state: None,
3679 new_scenario_state: None,
3680 });
3681 mocks.push(MockConfig {
3682 id: "2".to_string(),
3683 name: "Mock 2".to_string(),
3684 method: "POST".to_string(),
3685 path: "/test2".to_string(),
3686 response: MockResponse {
3687 body: serde_json::json!({}),
3688 headers: None,
3689 },
3690 enabled: false,
3691 latency_ms: None,
3692 status_code: Some(201),
3693 request_match: None,
3694 priority: None,
3695 scenario: None,
3696 required_scenario_state: None,
3697 new_scenario_state: None,
3698 });
3699 }
3700
3701 let mocks = state.mocks.read().await;
3702 assert_eq!(mocks.len(), 2);
3703 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3704 }
3705}