1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Json},
9 routing::{delete, get, post, put},
10 Router,
11};
12#[cfg(any(feature = "mqtt", feature = "kafka"))]
13use axum::response::sse::{Event, Sse};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32fn get_message_broadcast_capacity() -> usize {
34 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45 #[cfg(feature = "mqtt")]
46 Mqtt(MqttMessageEvent),
48 #[cfg(feature = "kafka")]
49 Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57 pub topic: String,
59 pub payload: String,
61 pub qos: u8,
63 pub retain: bool,
65 pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72 pub topic: String,
73 pub key: Option<String>,
74 pub value: String,
75 pub partition: i32,
76 pub offset: i64,
77 pub headers: Option<std::collections::HashMap<String, String>>,
78 pub timestamp: String,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84 #[serde(skip_serializing_if = "String::is_empty")]
86 pub id: String,
87 pub name: String,
89 pub method: String,
91 pub path: String,
93 pub response: MockResponse,
95 #[serde(default = "default_true")]
97 pub enabled: bool,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub latency_ms: Option<u64>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub status_code: Option<u16>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub request_match: Option<RequestMatchCriteria>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub priority: Option<i32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub scenario: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub required_scenario_state: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122 true
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128 pub body: serde_json::Value,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140 pub headers: std::collections::HashMap<String, String>,
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub query_params: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub body_pattern: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub json_path: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub xpath: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub custom_matcher: Option<String>,
156}
157
158pub fn mock_matches_request(
167 mock: &MockConfig,
168 method: &str,
169 path: &str,
170 headers: &std::collections::HashMap<String, String>,
171 query_params: &std::collections::HashMap<String, String>,
172 body: Option<&[u8]>,
173) -> bool {
174 use regex::Regex;
175
176 if !mock.enabled {
178 return false;
179 }
180
181 if mock.method.to_uppercase() != method.to_uppercase() {
183 return false;
184 }
185
186 if !path_matches_pattern(&mock.path, path) {
188 return false;
189 }
190
191 if let Some(criteria) = &mock.request_match {
193 for (key, expected_value) in &criteria.headers {
195 let header_key_lower = key.to_lowercase();
196 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198 if let Some((_, actual_value)) = found {
199 if let Ok(re) = Regex::new(expected_value) {
201 if !re.is_match(actual_value) {
202 return false;
203 }
204 } else if actual_value != expected_value {
205 return false;
206 }
207 } else {
208 return false; }
210 }
211
212 for (key, expected_value) in &criteria.query_params {
214 if let Some(actual_value) = query_params.get(key) {
215 if actual_value != expected_value {
216 return false;
217 }
218 } else {
219 return false; }
221 }
222
223 if let Some(pattern) = &criteria.body_pattern {
225 if let Some(body_bytes) = body {
226 let body_str = String::from_utf8_lossy(body_bytes);
227 if let Ok(re) = Regex::new(pattern) {
229 if !re.is_match(&body_str) {
230 return false;
231 }
232 } else if body_str.as_ref() != pattern {
233 return false;
234 }
235 } else {
236 return false; }
238 }
239
240 if let Some(json_path) = &criteria.json_path {
242 if let Some(body_bytes) = body {
243 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245 if !json_path_exists(&json_value, json_path) {
247 return false;
248 }
249 }
250 }
251 }
252 }
253
254 if let Some(_xpath) = &criteria.xpath {
256 tracing::warn!("XPath matching not yet fully implemented");
259 }
260
261 if let Some(custom) = &criteria.custom_matcher {
263 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
264 return false;
265 }
266 }
267 }
268
269 true
270}
271
272fn path_matches_pattern(pattern: &str, path: &str) -> bool {
274 if pattern == path {
276 return true;
277 }
278
279 if pattern == "*" {
281 return true;
282 }
283
284 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
286 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
287
288 if pattern_parts.len() != path_parts.len() {
289 if pattern.contains('*') {
291 return matches_wildcard_pattern(pattern, path);
292 }
293 return false;
294 }
295
296 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
297 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
299 continue; }
301
302 if pattern_part != path_part {
303 return false;
304 }
305 }
306
307 true
308}
309
310fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
312 use regex::Regex;
313
314 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
316
317 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
318 return re.is_match(path);
319 }
320
321 false
322}
323
324fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
326 if let Some(path) = json_path.strip_prefix("$.") {
329 let parts: Vec<&str> = path.split('.').collect();
330
331 let mut current = json;
332 for part in parts {
333 if let Some(obj) = current.as_object() {
334 if let Some(value) = obj.get(part) {
335 current = value;
336 } else {
337 return false;
338 }
339 } else {
340 return false;
341 }
342 }
343 true
344 } else {
345 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
347 false
348 }
349}
350
351fn evaluate_custom_matcher(
353 expression: &str,
354 method: &str,
355 path: &str,
356 headers: &std::collections::HashMap<String, String>,
357 query_params: &std::collections::HashMap<String, String>,
358 body: Option<&[u8]>,
359) -> bool {
360 use regex::Regex;
361
362 let expr = expression.trim();
363
364 if expr.contains("==") {
366 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
367 if parts.len() != 2 {
368 return false;
369 }
370
371 let field = parts[0];
372 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
373
374 match field {
375 "method" => method == expected_value,
376 "path" => path == expected_value,
377 _ if field.starts_with("headers.") => {
378 let header_name = &field[8..];
379 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
380 }
381 _ if field.starts_with("query.") => {
382 let param_name = &field[6..];
383 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
384 }
385 _ => false,
386 }
387 }
388 else if expr.contains("=~") {
390 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
391 if parts.len() != 2 {
392 return false;
393 }
394
395 let field = parts[0];
396 let pattern = parts[1].trim_matches('"').trim_matches('\'');
397
398 if let Ok(re) = Regex::new(pattern) {
399 match field {
400 "method" => re.is_match(method),
401 "path" => re.is_match(path),
402 _ if field.starts_with("headers.") => {
403 let header_name = &field[8..];
404 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
405 }
406 _ if field.starts_with("query.") => {
407 let param_name = &field[6..];
408 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
409 }
410 _ => false,
411 }
412 } else {
413 false
414 }
415 }
416 else if expr.contains("contains") {
418 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
419 if parts.len() != 2 {
420 return false;
421 }
422
423 let field = parts[0];
424 let search_value = parts[1].trim_matches('"').trim_matches('\'');
425
426 match field {
427 "path" => path.contains(search_value),
428 _ if field.starts_with("headers.") => {
429 let header_name = &field[8..];
430 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
431 }
432 _ if field.starts_with("body") => {
433 if let Some(body_bytes) = body {
434 let body_str = String::from_utf8_lossy(body_bytes);
435 body_str.contains(search_value)
436 } else {
437 false
438 }
439 }
440 _ => false,
441 }
442 } else {
443 tracing::warn!("Unknown custom matcher expression format: {}", expr);
445 false
446 }
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct ServerStats {
452 pub uptime_seconds: u64,
454 pub total_requests: u64,
456 pub active_mocks: usize,
458 pub enabled_mocks: usize,
460 pub registered_routes: usize,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ServerConfig {
467 pub version: String,
469 pub port: u16,
471 pub has_openapi_spec: bool,
473 #[serde(skip_serializing_if = "Option::is_none")]
475 pub spec_path: Option<String>,
476}
477
478#[derive(Clone)]
480pub struct ManagementState {
481 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
483 pub spec: Option<Arc<OpenApiSpec>>,
485 pub spec_path: Option<String>,
487 pub port: u16,
489 pub start_time: std::time::Instant,
491 pub request_counter: Arc<RwLock<u64>>,
493 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
495 #[cfg(feature = "smtp")]
497 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
498 #[cfg(feature = "mqtt")]
500 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
501 #[cfg(feature = "kafka")]
503 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
504 #[cfg(any(feature = "mqtt", feature = "kafka"))]
506 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
507 pub state_machine_manager:
509 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
510 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
512 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
514 pub rule_explanations: Arc<
516 RwLock<
517 std::collections::HashMap<
518 String,
519 mockforge_core::intelligent_behavior::RuleExplanation,
520 >,
521 >,
522 >,
523 #[cfg(feature = "chaos")]
525 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
526 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
528}
529
530impl ManagementState {
531 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
538 Self {
539 mocks: Arc::new(RwLock::new(Vec::new())),
540 spec,
541 spec_path,
542 port,
543 start_time: std::time::Instant::now(),
544 request_counter: Arc::new(RwLock::new(0)),
545 proxy_config: None,
546 #[cfg(feature = "smtp")]
547 smtp_registry: None,
548 #[cfg(feature = "mqtt")]
549 mqtt_broker: None,
550 #[cfg(feature = "kafka")]
551 kafka_broker: None,
552 #[cfg(any(feature = "mqtt", feature = "kafka"))]
553 message_events: {
554 let capacity = get_message_broadcast_capacity();
555 let (tx, _) = broadcast::channel(capacity);
556 Arc::new(tx)
557 },
558 state_machine_manager: Arc::new(RwLock::new(
559 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
560 )),
561 ws_broadcast: None,
562 lifecycle_hooks: None,
563 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
564 #[cfg(feature = "chaos")]
565 chaos_api_state: None,
566 server_config: None,
567 }
568 }
569
570 pub fn with_lifecycle_hooks(
572 mut self,
573 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
574 ) -> Self {
575 self.lifecycle_hooks = Some(hooks);
576 self
577 }
578
579 pub fn with_ws_broadcast(
581 mut self,
582 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
583 ) -> Self {
584 self.ws_broadcast = Some(ws_broadcast);
585 self
586 }
587
588 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
590 self.proxy_config = Some(proxy_config);
591 self
592 }
593
594 #[cfg(feature = "smtp")]
595 pub fn with_smtp_registry(
597 mut self,
598 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
599 ) -> Self {
600 self.smtp_registry = Some(smtp_registry);
601 self
602 }
603
604 #[cfg(feature = "mqtt")]
605 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
607 self.mqtt_broker = Some(mqtt_broker);
608 self
609 }
610
611 #[cfg(feature = "kafka")]
612 pub fn with_kafka_broker(
614 mut self,
615 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
616 ) -> Self {
617 self.kafka_broker = Some(kafka_broker);
618 self
619 }
620
621 #[cfg(feature = "chaos")]
622 pub fn with_chaos_api_state(
624 mut self,
625 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
626 ) -> Self {
627 self.chaos_api_state = Some(chaos_api_state);
628 self
629 }
630
631 pub fn with_server_config(
633 mut self,
634 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
635 ) -> Self {
636 self.server_config = Some(server_config);
637 self
638 }
639}
640
641async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
643 let mocks = state.mocks.read().await;
644 Json(serde_json::json!({
645 "mocks": *mocks,
646 "total": mocks.len(),
647 "enabled": mocks.iter().filter(|m| m.enabled).count()
648 }))
649}
650
651async fn get_mock(
653 State(state): State<ManagementState>,
654 Path(id): Path<String>,
655) -> Result<Json<MockConfig>, StatusCode> {
656 let mocks = state.mocks.read().await;
657 mocks
658 .iter()
659 .find(|m| m.id == id)
660 .cloned()
661 .map(Json)
662 .ok_or(StatusCode::NOT_FOUND)
663}
664
665async fn create_mock(
667 State(state): State<ManagementState>,
668 Json(mut mock): Json<MockConfig>,
669) -> Result<Json<MockConfig>, StatusCode> {
670 let mut mocks = state.mocks.write().await;
671
672 if mock.id.is_empty() {
674 mock.id = uuid::Uuid::new_v4().to_string();
675 }
676
677 if mocks.iter().any(|m| m.id == mock.id) {
679 return Err(StatusCode::CONFLICT);
680 }
681
682 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
683
684 if let Some(hooks) = &state.lifecycle_hooks {
686 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
687 id: mock.id.clone(),
688 name: mock.name.clone(),
689 config: serde_json::to_value(&mock).unwrap_or_default(),
690 };
691 hooks.invoke_mock_created(&event).await;
692 }
693
694 mocks.push(mock.clone());
695
696 if let Some(tx) = &state.ws_broadcast {
698 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
699 }
700
701 Ok(Json(mock))
702}
703
704async fn update_mock(
706 State(state): State<ManagementState>,
707 Path(id): Path<String>,
708 Json(updated_mock): Json<MockConfig>,
709) -> Result<Json<MockConfig>, StatusCode> {
710 let mut mocks = state.mocks.write().await;
711
712 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
713
714 let old_mock = mocks[position].clone();
716
717 info!("Updating mock: {}", id);
718 mocks[position] = updated_mock.clone();
719
720 if let Some(hooks) = &state.lifecycle_hooks {
722 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
723 id: updated_mock.id.clone(),
724 name: updated_mock.name.clone(),
725 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
726 };
727 hooks.invoke_mock_updated(&event).await;
728
729 if old_mock.enabled != updated_mock.enabled {
731 let state_event = if updated_mock.enabled {
732 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
733 id: updated_mock.id.clone(),
734 }
735 } else {
736 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
737 id: updated_mock.id.clone(),
738 }
739 };
740 hooks.invoke_mock_state_changed(&state_event).await;
741 }
742 }
743
744 if let Some(tx) = &state.ws_broadcast {
746 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
747 }
748
749 Ok(Json(updated_mock))
750}
751
752async fn delete_mock(
754 State(state): State<ManagementState>,
755 Path(id): Path<String>,
756) -> Result<StatusCode, StatusCode> {
757 let mut mocks = state.mocks.write().await;
758
759 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
760
761 let deleted_mock = mocks[position].clone();
763
764 info!("Deleting mock: {}", id);
765 mocks.remove(position);
766
767 if let Some(hooks) = &state.lifecycle_hooks {
769 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
770 id: deleted_mock.id.clone(),
771 name: deleted_mock.name.clone(),
772 };
773 hooks.invoke_mock_deleted(&event).await;
774 }
775
776 if let Some(tx) = &state.ws_broadcast {
778 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
779 }
780
781 Ok(StatusCode::NO_CONTENT)
782}
783
784#[derive(Debug, Deserialize)]
786pub struct ValidateConfigRequest {
787 pub config: serde_json::Value,
789 #[serde(default = "default_format")]
791 pub format: String,
792}
793
794fn default_format() -> String {
795 "json".to_string()
796}
797
798async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
800 use mockforge_core::config::ServerConfig;
801
802 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
803 "yaml" | "yml" => {
804 let yaml_str = match serde_json::to_string(&request.config) {
805 Ok(s) => s,
806 Err(e) => {
807 return (
808 StatusCode::BAD_REQUEST,
809 Json(serde_json::json!({
810 "valid": false,
811 "error": format!("Failed to convert to string: {}", e),
812 "message": "Configuration validation failed"
813 })),
814 )
815 .into_response();
816 }
817 };
818 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
819 }
820 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
821 };
822
823 match config_result {
824 Ok(_) => Json(serde_json::json!({
825 "valid": true,
826 "message": "Configuration is valid"
827 }))
828 .into_response(),
829 Err(e) => (
830 StatusCode::BAD_REQUEST,
831 Json(serde_json::json!({
832 "valid": false,
833 "error": format!("Invalid configuration: {}", e),
834 "message": "Configuration validation failed"
835 })),
836 )
837 .into_response(),
838 }
839}
840
841#[derive(Debug, Deserialize)]
843pub struct BulkConfigUpdateRequest {
844 pub updates: serde_json::Value,
846}
847
848async fn bulk_update_config(
856 State(_state): State<ManagementState>,
857 Json(request): Json<BulkConfigUpdateRequest>,
858) -> impl IntoResponse {
859 if !request.updates.is_object() {
861 return (
862 StatusCode::BAD_REQUEST,
863 Json(serde_json::json!({
864 "error": "Invalid request",
865 "message": "Updates must be a JSON object"
866 })),
867 )
868 .into_response();
869 }
870
871 use mockforge_core::config::ServerConfig;
873
874 let base_config = ServerConfig::default();
876 let base_json = match serde_json::to_value(&base_config) {
877 Ok(v) => v,
878 Err(e) => {
879 return (
880 StatusCode::INTERNAL_SERVER_ERROR,
881 Json(serde_json::json!({
882 "error": "Internal error",
883 "message": format!("Failed to serialize base config: {}", e)
884 })),
885 )
886 .into_response();
887 }
888 };
889
890 let mut merged = base_json.clone();
892 if let (Some(merged_obj), Some(updates_obj)) =
893 (merged.as_object_mut(), request.updates.as_object())
894 {
895 for (key, value) in updates_obj {
896 merged_obj.insert(key.clone(), value.clone());
897 }
898 }
899
900 match serde_json::from_value::<ServerConfig>(merged) {
902 Ok(_) => {
903 Json(serde_json::json!({
910 "success": true,
911 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
912 "updates_received": request.updates,
913 "validated": true
914 }))
915 .into_response()
916 }
917 Err(e) => (
918 StatusCode::BAD_REQUEST,
919 Json(serde_json::json!({
920 "error": "Invalid configuration",
921 "message": format!("Configuration validation failed: {}", e),
922 "validated": false
923 })),
924 )
925 .into_response(),
926 }
927}
928
929async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
931 let mocks = state.mocks.read().await;
932 let request_count = *state.request_counter.read().await;
933
934 Json(ServerStats {
935 uptime_seconds: state.start_time.elapsed().as_secs(),
936 total_requests: request_count,
937 active_mocks: mocks.len(),
938 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
939 registered_routes: mocks.len(), })
941}
942
943async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
945 Json(ServerConfig {
946 version: env!("CARGO_PKG_VERSION").to_string(),
947 port: state.port,
948 has_openapi_spec: state.spec.is_some(),
949 spec_path: state.spec_path.clone(),
950 })
951}
952
953async fn health_check() -> Json<serde_json::Value> {
955 Json(serde_json::json!({
956 "status": "healthy",
957 "service": "mockforge-management",
958 "timestamp": chrono::Utc::now().to_rfc3339()
959 }))
960}
961
962#[derive(Debug, Clone, Serialize, Deserialize)]
964#[serde(rename_all = "lowercase")]
965pub enum ExportFormat {
966 Json,
968 Yaml,
970}
971
972async fn export_mocks(
974 State(state): State<ManagementState>,
975 Query(params): Query<std::collections::HashMap<String, String>>,
976) -> Result<(StatusCode, String), StatusCode> {
977 let mocks = state.mocks.read().await;
978
979 let format = params
980 .get("format")
981 .map(|f| match f.as_str() {
982 "yaml" | "yml" => ExportFormat::Yaml,
983 _ => ExportFormat::Json,
984 })
985 .unwrap_or(ExportFormat::Json);
986
987 match format {
988 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
989 .map(|json| (StatusCode::OK, json))
990 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
991 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
992 .map(|yaml| (StatusCode::OK, yaml))
993 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
994 }
995}
996
997async fn import_mocks(
999 State(state): State<ManagementState>,
1000 Json(mocks): Json<Vec<MockConfig>>,
1001) -> impl IntoResponse {
1002 let mut current_mocks = state.mocks.write().await;
1003 current_mocks.clear();
1004 current_mocks.extend(mocks);
1005 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1006}
1007
1008#[cfg(feature = "smtp")]
1009async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1011 if let Some(ref smtp_registry) = state.smtp_registry {
1012 match smtp_registry.get_emails() {
1013 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1014 Err(e) => (
1015 StatusCode::INTERNAL_SERVER_ERROR,
1016 Json(serde_json::json!({
1017 "error": "Failed to retrieve emails",
1018 "message": e.to_string()
1019 })),
1020 ),
1021 }
1022 } else {
1023 (
1024 StatusCode::NOT_IMPLEMENTED,
1025 Json(serde_json::json!({
1026 "error": "SMTP mailbox management not available",
1027 "message": "SMTP server is not enabled or registry not available."
1028 })),
1029 )
1030 }
1031}
1032
1033#[cfg(feature = "smtp")]
1035async fn get_smtp_email(
1036 State(state): State<ManagementState>,
1037 Path(id): Path<String>,
1038) -> impl IntoResponse {
1039 if let Some(ref smtp_registry) = state.smtp_registry {
1040 match smtp_registry.get_email_by_id(&id) {
1041 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1042 Ok(None) => (
1043 StatusCode::NOT_FOUND,
1044 Json(serde_json::json!({
1045 "error": "Email not found",
1046 "id": id
1047 })),
1048 ),
1049 Err(e) => (
1050 StatusCode::INTERNAL_SERVER_ERROR,
1051 Json(serde_json::json!({
1052 "error": "Failed to retrieve email",
1053 "message": e.to_string()
1054 })),
1055 ),
1056 }
1057 } else {
1058 (
1059 StatusCode::NOT_IMPLEMENTED,
1060 Json(serde_json::json!({
1061 "error": "SMTP mailbox management not available",
1062 "message": "SMTP server is not enabled or registry not available."
1063 })),
1064 )
1065 }
1066}
1067
1068#[cfg(feature = "smtp")]
1070async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1071 if let Some(ref smtp_registry) = state.smtp_registry {
1072 match smtp_registry.clear_mailbox() {
1073 Ok(()) => (
1074 StatusCode::OK,
1075 Json(serde_json::json!({
1076 "message": "Mailbox cleared successfully"
1077 })),
1078 ),
1079 Err(e) => (
1080 StatusCode::INTERNAL_SERVER_ERROR,
1081 Json(serde_json::json!({
1082 "error": "Failed to clear mailbox",
1083 "message": e.to_string()
1084 })),
1085 ),
1086 }
1087 } else {
1088 (
1089 StatusCode::NOT_IMPLEMENTED,
1090 Json(serde_json::json!({
1091 "error": "SMTP mailbox management not available",
1092 "message": "SMTP server is not enabled or registry not available."
1093 })),
1094 )
1095 }
1096}
1097
1098#[cfg(feature = "smtp")]
1100async fn export_smtp_mailbox(
1101 Query(params): Query<std::collections::HashMap<String, String>>,
1102) -> impl IntoResponse {
1103 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1104 (
1105 StatusCode::NOT_IMPLEMENTED,
1106 Json(serde_json::json!({
1107 "error": "SMTP mailbox management not available via HTTP API",
1108 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1109 "requested_format": format
1110 })),
1111 )
1112}
1113
1114#[cfg(feature = "smtp")]
1116async fn search_smtp_emails(
1117 State(state): State<ManagementState>,
1118 Query(params): Query<std::collections::HashMap<String, String>>,
1119) -> impl IntoResponse {
1120 if let Some(ref smtp_registry) = state.smtp_registry {
1121 let filters = EmailSearchFilters {
1122 sender: params.get("sender").cloned(),
1123 recipient: params.get("recipient").cloned(),
1124 subject: params.get("subject").cloned(),
1125 body: params.get("body").cloned(),
1126 since: params
1127 .get("since")
1128 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1129 .map(|dt| dt.with_timezone(&chrono::Utc)),
1130 until: params
1131 .get("until")
1132 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1133 .map(|dt| dt.with_timezone(&chrono::Utc)),
1134 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1135 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1136 };
1137
1138 match smtp_registry.search_emails(filters) {
1139 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1140 Err(e) => (
1141 StatusCode::INTERNAL_SERVER_ERROR,
1142 Json(serde_json::json!({
1143 "error": "Failed to search emails",
1144 "message": e.to_string()
1145 })),
1146 ),
1147 }
1148 } else {
1149 (
1150 StatusCode::NOT_IMPLEMENTED,
1151 Json(serde_json::json!({
1152 "error": "SMTP mailbox management not available",
1153 "message": "SMTP server is not enabled or registry not available."
1154 })),
1155 )
1156 }
1157}
1158
1159#[cfg(feature = "mqtt")]
1161#[derive(Debug, Clone, Serialize, Deserialize)]
1162pub struct MqttBrokerStats {
1163 pub connected_clients: usize,
1165 pub active_topics: usize,
1167 pub retained_messages: usize,
1169 pub total_subscriptions: usize,
1171}
1172
1173#[cfg(feature = "mqtt")]
1175async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1176 if let Some(broker) = &state.mqtt_broker {
1177 let connected_clients = broker.get_connected_clients().await.len();
1178 let active_topics = broker.get_active_topics().await.len();
1179 let stats = broker.get_topic_stats().await;
1180
1181 let broker_stats = MqttBrokerStats {
1182 connected_clients,
1183 active_topics,
1184 retained_messages: stats.retained_messages,
1185 total_subscriptions: stats.total_subscriptions,
1186 };
1187
1188 Json(broker_stats).into_response()
1189 } else {
1190 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1191 }
1192}
1193
1194#[cfg(feature = "mqtt")]
1195async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1196 if let Some(broker) = &state.mqtt_broker {
1197 let clients = broker.get_connected_clients().await;
1198 Json(serde_json::json!({
1199 "clients": clients
1200 }))
1201 .into_response()
1202 } else {
1203 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1204 }
1205}
1206
1207#[cfg(feature = "mqtt")]
1208async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1209 if let Some(broker) = &state.mqtt_broker {
1210 let topics = broker.get_active_topics().await;
1211 Json(serde_json::json!({
1212 "topics": topics
1213 }))
1214 .into_response()
1215 } else {
1216 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1217 }
1218}
1219
1220#[cfg(feature = "mqtt")]
1221async fn disconnect_mqtt_client(
1222 State(state): State<ManagementState>,
1223 Path(client_id): Path<String>,
1224) -> impl IntoResponse {
1225 if let Some(broker) = &state.mqtt_broker {
1226 match broker.disconnect_client(&client_id).await {
1227 Ok(_) => {
1228 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1229 }
1230 Err(e) => {
1231 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1232 .into_response()
1233 }
1234 }
1235 } else {
1236 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1237 }
1238}
1239
1240#[cfg(feature = "mqtt")]
1243#[derive(Debug, Deserialize)]
1245pub struct MqttPublishRequest {
1246 pub topic: String,
1248 pub payload: String,
1250 #[serde(default = "default_qos")]
1252 pub qos: u8,
1253 #[serde(default)]
1255 pub retain: bool,
1256}
1257
1258#[cfg(feature = "mqtt")]
1259fn default_qos() -> u8 {
1260 0
1261}
1262
1263#[cfg(feature = "mqtt")]
1264async fn publish_mqtt_message_handler(
1266 State(state): State<ManagementState>,
1267 Json(request): Json<serde_json::Value>,
1268) -> impl IntoResponse {
1269 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1271 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1272 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1273 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1274
1275 if topic.is_none() || payload.is_none() {
1276 return (
1277 StatusCode::BAD_REQUEST,
1278 Json(serde_json::json!({
1279 "error": "Invalid request",
1280 "message": "Missing required fields: topic and payload"
1281 })),
1282 );
1283 }
1284
1285 let topic = topic.unwrap();
1286 let payload = payload.unwrap();
1287
1288 if let Some(broker) = &state.mqtt_broker {
1289 if qos > 2 {
1291 return (
1292 StatusCode::BAD_REQUEST,
1293 Json(serde_json::json!({
1294 "error": "Invalid QoS",
1295 "message": "QoS must be 0, 1, or 2"
1296 })),
1297 );
1298 }
1299
1300 let payload_bytes = payload.as_bytes().to_vec();
1302 let client_id = "mockforge-management-api".to_string();
1303
1304 let publish_result = broker
1305 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1306 .await
1307 .map_err(|e| format!("{}", e));
1308
1309 match publish_result {
1310 Ok(_) => {
1311 let event = MessageEvent::Mqtt(MqttMessageEvent {
1313 topic: topic.clone(),
1314 payload: payload.clone(),
1315 qos,
1316 retain,
1317 timestamp: chrono::Utc::now().to_rfc3339(),
1318 });
1319 let _ = state.message_events.send(event);
1320
1321 (
1322 StatusCode::OK,
1323 Json(serde_json::json!({
1324 "success": true,
1325 "message": format!("Message published to topic '{}'", topic),
1326 "topic": topic,
1327 "qos": qos,
1328 "retain": retain
1329 })),
1330 )
1331 }
1332 Err(error_msg) => (
1333 StatusCode::INTERNAL_SERVER_ERROR,
1334 Json(serde_json::json!({
1335 "error": "Failed to publish message",
1336 "message": error_msg
1337 })),
1338 ),
1339 }
1340 } else {
1341 (
1342 StatusCode::SERVICE_UNAVAILABLE,
1343 Json(serde_json::json!({
1344 "error": "MQTT broker not available",
1345 "message": "MQTT broker is not enabled or not available."
1346 })),
1347 )
1348 }
1349}
1350
1351#[cfg(not(feature = "mqtt"))]
1352async fn publish_mqtt_message_handler(
1354 State(_state): State<ManagementState>,
1355 Json(_request): Json<serde_json::Value>,
1356) -> impl IntoResponse {
1357 (
1358 StatusCode::SERVICE_UNAVAILABLE,
1359 Json(serde_json::json!({
1360 "error": "MQTT feature not enabled",
1361 "message": "MQTT support is not compiled into this build"
1362 })),
1363 )
1364}
1365
1366#[cfg(feature = "mqtt")]
1367#[derive(Debug, Deserialize)]
1369pub struct MqttBatchPublishRequest {
1370 pub messages: Vec<MqttPublishRequest>,
1372 #[serde(default = "default_delay")]
1374 pub delay_ms: u64,
1375}
1376
1377#[cfg(feature = "mqtt")]
1378fn default_delay() -> u64 {
1379 100
1380}
1381
1382#[cfg(feature = "mqtt")]
1383async fn publish_mqtt_batch_handler(
1385 State(state): State<ManagementState>,
1386 Json(request): Json<serde_json::Value>,
1387) -> impl IntoResponse {
1388 let messages_json = request.get("messages").and_then(|v| v.as_array());
1390 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1391
1392 if messages_json.is_none() {
1393 return (
1394 StatusCode::BAD_REQUEST,
1395 Json(serde_json::json!({
1396 "error": "Invalid request",
1397 "message": "Missing required field: messages"
1398 })),
1399 );
1400 }
1401
1402 let messages_json = messages_json.unwrap();
1403
1404 if let Some(broker) = &state.mqtt_broker {
1405 if messages_json.is_empty() {
1406 return (
1407 StatusCode::BAD_REQUEST,
1408 Json(serde_json::json!({
1409 "error": "Empty batch",
1410 "message": "At least one message is required"
1411 })),
1412 );
1413 }
1414
1415 let mut results = Vec::new();
1416 let client_id = "mockforge-management-api".to_string();
1417
1418 for (index, msg_json) in messages_json.iter().enumerate() {
1419 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1420 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1421 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1422 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1423
1424 if topic.is_none() || payload.is_none() {
1425 results.push(serde_json::json!({
1426 "index": index,
1427 "success": false,
1428 "error": "Missing required fields: topic and payload"
1429 }));
1430 continue;
1431 }
1432
1433 let topic = topic.unwrap();
1434 let payload = payload.unwrap();
1435
1436 if qos > 2 {
1438 results.push(serde_json::json!({
1439 "index": index,
1440 "success": false,
1441 "error": "Invalid QoS (must be 0, 1, or 2)"
1442 }));
1443 continue;
1444 }
1445
1446 let payload_bytes = payload.as_bytes().to_vec();
1448
1449 let publish_result = broker
1450 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1451 .await
1452 .map_err(|e| format!("{}", e));
1453
1454 match publish_result {
1455 Ok(_) => {
1456 let event = MessageEvent::Mqtt(MqttMessageEvent {
1458 topic: topic.clone(),
1459 payload: payload.clone(),
1460 qos,
1461 retain,
1462 timestamp: chrono::Utc::now().to_rfc3339(),
1463 });
1464 let _ = state.message_events.send(event);
1465
1466 results.push(serde_json::json!({
1467 "index": index,
1468 "success": true,
1469 "topic": topic,
1470 "qos": qos
1471 }));
1472 }
1473 Err(error_msg) => {
1474 results.push(serde_json::json!({
1475 "index": index,
1476 "success": false,
1477 "error": error_msg
1478 }));
1479 }
1480 }
1481
1482 if index < messages_json.len() - 1 && delay_ms > 0 {
1484 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1485 }
1486 }
1487
1488 let success_count =
1489 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1490
1491 (
1492 StatusCode::OK,
1493 Json(serde_json::json!({
1494 "success": true,
1495 "total": messages_json.len(),
1496 "succeeded": success_count,
1497 "failed": messages_json.len() - success_count,
1498 "results": results
1499 })),
1500 )
1501 } else {
1502 (
1503 StatusCode::SERVICE_UNAVAILABLE,
1504 Json(serde_json::json!({
1505 "error": "MQTT broker not available",
1506 "message": "MQTT broker is not enabled or not available."
1507 })),
1508 )
1509 }
1510}
1511
1512#[cfg(not(feature = "mqtt"))]
1513async fn publish_mqtt_batch_handler(
1515 State(_state): State<ManagementState>,
1516 Json(_request): Json<serde_json::Value>,
1517) -> impl IntoResponse {
1518 (
1519 StatusCode::SERVICE_UNAVAILABLE,
1520 Json(serde_json::json!({
1521 "error": "MQTT feature not enabled",
1522 "message": "MQTT support is not compiled into this build"
1523 })),
1524 )
1525}
1526
1527#[derive(Debug, Deserialize)]
1531struct SetMigrationModeRequest {
1532 mode: String,
1533}
1534
1535async fn get_migration_routes(
1537 State(state): State<ManagementState>,
1538) -> Result<Json<serde_json::Value>, StatusCode> {
1539 let proxy_config = match &state.proxy_config {
1540 Some(config) => config,
1541 None => {
1542 return Ok(Json(serde_json::json!({
1543 "error": "Migration not configured. Proxy config not available."
1544 })));
1545 }
1546 };
1547
1548 let config = proxy_config.read().await;
1549 let routes = config.get_migration_routes();
1550
1551 Ok(Json(serde_json::json!({
1552 "routes": routes
1553 })))
1554}
1555
1556async fn toggle_route_migration(
1558 State(state): State<ManagementState>,
1559 Path(pattern): Path<String>,
1560) -> Result<Json<serde_json::Value>, StatusCode> {
1561 let proxy_config = match &state.proxy_config {
1562 Some(config) => config,
1563 None => {
1564 return Ok(Json(serde_json::json!({
1565 "error": "Migration not configured. Proxy config not available."
1566 })));
1567 }
1568 };
1569
1570 let mut config = proxy_config.write().await;
1571 let new_mode = match config.toggle_route_migration(&pattern) {
1572 Some(mode) => mode,
1573 None => {
1574 return Ok(Json(serde_json::json!({
1575 "error": format!("Route pattern not found: {}", pattern)
1576 })));
1577 }
1578 };
1579
1580 Ok(Json(serde_json::json!({
1581 "pattern": pattern,
1582 "mode": format!("{:?}", new_mode).to_lowercase()
1583 })))
1584}
1585
1586async fn set_route_migration_mode(
1588 State(state): State<ManagementState>,
1589 Path(pattern): Path<String>,
1590 Json(request): Json<SetMigrationModeRequest>,
1591) -> Result<Json<serde_json::Value>, StatusCode> {
1592 let proxy_config = match &state.proxy_config {
1593 Some(config) => config,
1594 None => {
1595 return Ok(Json(serde_json::json!({
1596 "error": "Migration not configured. Proxy config not available."
1597 })));
1598 }
1599 };
1600
1601 use mockforge_core::proxy::config::MigrationMode;
1602 let mode = match request.mode.to_lowercase().as_str() {
1603 "mock" => MigrationMode::Mock,
1604 "shadow" => MigrationMode::Shadow,
1605 "real" => MigrationMode::Real,
1606 "auto" => MigrationMode::Auto,
1607 _ => {
1608 return Ok(Json(serde_json::json!({
1609 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1610 })));
1611 }
1612 };
1613
1614 let mut config = proxy_config.write().await;
1615 let updated = config.update_rule_migration_mode(&pattern, mode);
1616
1617 if !updated {
1618 return Ok(Json(serde_json::json!({
1619 "error": format!("Route pattern not found: {}", pattern)
1620 })));
1621 }
1622
1623 Ok(Json(serde_json::json!({
1624 "pattern": pattern,
1625 "mode": format!("{:?}", mode).to_lowercase()
1626 })))
1627}
1628
1629async fn toggle_group_migration(
1631 State(state): State<ManagementState>,
1632 Path(group): Path<String>,
1633) -> Result<Json<serde_json::Value>, StatusCode> {
1634 let proxy_config = match &state.proxy_config {
1635 Some(config) => config,
1636 None => {
1637 return Ok(Json(serde_json::json!({
1638 "error": "Migration not configured. Proxy config not available."
1639 })));
1640 }
1641 };
1642
1643 let mut config = proxy_config.write().await;
1644 let new_mode = config.toggle_group_migration(&group);
1645
1646 Ok(Json(serde_json::json!({
1647 "group": group,
1648 "mode": format!("{:?}", new_mode).to_lowercase()
1649 })))
1650}
1651
1652async fn set_group_migration_mode(
1654 State(state): State<ManagementState>,
1655 Path(group): Path<String>,
1656 Json(request): Json<SetMigrationModeRequest>,
1657) -> Result<Json<serde_json::Value>, StatusCode> {
1658 let proxy_config = match &state.proxy_config {
1659 Some(config) => config,
1660 None => {
1661 return Ok(Json(serde_json::json!({
1662 "error": "Migration not configured. Proxy config not available."
1663 })));
1664 }
1665 };
1666
1667 use mockforge_core::proxy::config::MigrationMode;
1668 let mode = match request.mode.to_lowercase().as_str() {
1669 "mock" => MigrationMode::Mock,
1670 "shadow" => MigrationMode::Shadow,
1671 "real" => MigrationMode::Real,
1672 "auto" => MigrationMode::Auto,
1673 _ => {
1674 return Ok(Json(serde_json::json!({
1675 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1676 })));
1677 }
1678 };
1679
1680 let mut config = proxy_config.write().await;
1681 config.update_group_migration_mode(&group, mode);
1682
1683 Ok(Json(serde_json::json!({
1684 "group": group,
1685 "mode": format!("{:?}", mode).to_lowercase()
1686 })))
1687}
1688
1689async fn get_migration_groups(
1691 State(state): State<ManagementState>,
1692) -> Result<Json<serde_json::Value>, StatusCode> {
1693 let proxy_config = match &state.proxy_config {
1694 Some(config) => config,
1695 None => {
1696 return Ok(Json(serde_json::json!({
1697 "error": "Migration not configured. Proxy config not available."
1698 })));
1699 }
1700 };
1701
1702 let config = proxy_config.read().await;
1703 let groups = config.get_migration_groups();
1704
1705 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1707 .into_iter()
1708 .map(|(name, info)| {
1709 (
1710 name,
1711 serde_json::json!({
1712 "name": info.name,
1713 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1714 "route_count": info.route_count
1715 }),
1716 )
1717 })
1718 .collect();
1719
1720 Ok(Json(serde_json::json!(groups_json)))
1721}
1722
1723async fn get_migration_status(
1725 State(state): State<ManagementState>,
1726) -> Result<Json<serde_json::Value>, StatusCode> {
1727 let proxy_config = match &state.proxy_config {
1728 Some(config) => config,
1729 None => {
1730 return Ok(Json(serde_json::json!({
1731 "error": "Migration not configured. Proxy config not available."
1732 })));
1733 }
1734 };
1735
1736 let config = proxy_config.read().await;
1737 let routes = config.get_migration_routes();
1738 let groups = config.get_migration_groups();
1739
1740 let mut mock_count = 0;
1741 let mut shadow_count = 0;
1742 let mut real_count = 0;
1743 let mut auto_count = 0;
1744
1745 for route in &routes {
1746 match route.migration_mode {
1747 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1748 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1749 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1750 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1751 }
1752 }
1753
1754 Ok(Json(serde_json::json!({
1755 "total_routes": routes.len(),
1756 "mock_routes": mock_count,
1757 "shadow_routes": shadow_count,
1758 "real_routes": real_count,
1759 "auto_routes": auto_count,
1760 "total_groups": groups.len(),
1761 "migration_enabled": config.migration_enabled
1762 })))
1763}
1764
1765#[derive(Debug, Deserialize, Serialize)]
1769pub struct ProxyRuleRequest {
1770 pub pattern: String,
1772 #[serde(rename = "type")]
1774 pub rule_type: String,
1775 #[serde(default)]
1777 pub status_codes: Vec<u16>,
1778 pub body_transforms: Vec<BodyTransformRequest>,
1780 #[serde(default = "default_true")]
1782 pub enabled: bool,
1783}
1784
1785#[derive(Debug, Deserialize, Serialize)]
1787pub struct BodyTransformRequest {
1788 pub path: String,
1790 pub replace: String,
1792 #[serde(default)]
1794 pub operation: String,
1795}
1796
1797#[derive(Debug, Serialize)]
1799pub struct ProxyRuleResponse {
1800 pub id: usize,
1802 pub pattern: String,
1804 #[serde(rename = "type")]
1806 pub rule_type: String,
1807 pub status_codes: Vec<u16>,
1809 pub body_transforms: Vec<BodyTransformRequest>,
1811 pub enabled: bool,
1813}
1814
1815async fn list_proxy_rules(
1817 State(state): State<ManagementState>,
1818) -> Result<Json<serde_json::Value>, StatusCode> {
1819 let proxy_config = match &state.proxy_config {
1820 Some(config) => config,
1821 None => {
1822 return Ok(Json(serde_json::json!({
1823 "error": "Proxy not configured. Proxy config not available."
1824 })));
1825 }
1826 };
1827
1828 let config = proxy_config.read().await;
1829
1830 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1831
1832 for (idx, rule) in config.request_replacements.iter().enumerate() {
1834 rules.push(ProxyRuleResponse {
1835 id: idx,
1836 pattern: rule.pattern.clone(),
1837 rule_type: "request".to_string(),
1838 status_codes: Vec::new(),
1839 body_transforms: rule
1840 .body_transforms
1841 .iter()
1842 .map(|t| BodyTransformRequest {
1843 path: t.path.clone(),
1844 replace: t.replace.clone(),
1845 operation: format!("{:?}", t.operation).to_lowercase(),
1846 })
1847 .collect(),
1848 enabled: rule.enabled,
1849 });
1850 }
1851
1852 let request_count = config.request_replacements.len();
1854 for (idx, rule) in config.response_replacements.iter().enumerate() {
1855 rules.push(ProxyRuleResponse {
1856 id: request_count + idx,
1857 pattern: rule.pattern.clone(),
1858 rule_type: "response".to_string(),
1859 status_codes: rule.status_codes.clone(),
1860 body_transforms: rule
1861 .body_transforms
1862 .iter()
1863 .map(|t| BodyTransformRequest {
1864 path: t.path.clone(),
1865 replace: t.replace.clone(),
1866 operation: format!("{:?}", t.operation).to_lowercase(),
1867 })
1868 .collect(),
1869 enabled: rule.enabled,
1870 });
1871 }
1872
1873 Ok(Json(serde_json::json!({
1874 "rules": rules
1875 })))
1876}
1877
1878async fn create_proxy_rule(
1880 State(state): State<ManagementState>,
1881 Json(request): Json<ProxyRuleRequest>,
1882) -> Result<Json<serde_json::Value>, StatusCode> {
1883 let proxy_config = match &state.proxy_config {
1884 Some(config) => config,
1885 None => {
1886 return Ok(Json(serde_json::json!({
1887 "error": "Proxy not configured. Proxy config not available."
1888 })));
1889 }
1890 };
1891
1892 if request.body_transforms.is_empty() {
1894 return Ok(Json(serde_json::json!({
1895 "error": "At least one body transform is required"
1896 })));
1897 }
1898
1899 let body_transforms: Vec<BodyTransform> = request
1900 .body_transforms
1901 .iter()
1902 .map(|t| {
1903 let op = match t.operation.as_str() {
1904 "replace" => TransformOperation::Replace,
1905 "add" => TransformOperation::Add,
1906 "remove" => TransformOperation::Remove,
1907 _ => TransformOperation::Replace,
1908 };
1909 BodyTransform {
1910 path: t.path.clone(),
1911 replace: t.replace.clone(),
1912 operation: op,
1913 }
1914 })
1915 .collect();
1916
1917 let new_rule = BodyTransformRule {
1918 pattern: request.pattern.clone(),
1919 status_codes: request.status_codes.clone(),
1920 body_transforms,
1921 enabled: request.enabled,
1922 };
1923
1924 let mut config = proxy_config.write().await;
1925
1926 let rule_id = if request.rule_type == "request" {
1927 config.request_replacements.push(new_rule);
1928 config.request_replacements.len() - 1
1929 } else if request.rule_type == "response" {
1930 config.response_replacements.push(new_rule);
1931 config.request_replacements.len() + config.response_replacements.len() - 1
1932 } else {
1933 return Ok(Json(serde_json::json!({
1934 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
1935 })));
1936 };
1937
1938 Ok(Json(serde_json::json!({
1939 "id": rule_id,
1940 "message": "Rule created successfully"
1941 })))
1942}
1943
1944async fn get_proxy_rule(
1946 State(state): State<ManagementState>,
1947 Path(id): Path<String>,
1948) -> Result<Json<serde_json::Value>, StatusCode> {
1949 let proxy_config = match &state.proxy_config {
1950 Some(config) => config,
1951 None => {
1952 return Ok(Json(serde_json::json!({
1953 "error": "Proxy not configured. Proxy config not available."
1954 })));
1955 }
1956 };
1957
1958 let config = proxy_config.read().await;
1959 let rule_id: usize = match id.parse() {
1960 Ok(id) => id,
1961 Err(_) => {
1962 return Ok(Json(serde_json::json!({
1963 "error": format!("Invalid rule ID: {}", id)
1964 })));
1965 }
1966 };
1967
1968 let request_count = config.request_replacements.len();
1969
1970 if rule_id < request_count {
1971 let rule = &config.request_replacements[rule_id];
1973 Ok(Json(serde_json::json!({
1974 "id": rule_id,
1975 "pattern": rule.pattern,
1976 "type": "request",
1977 "status_codes": [],
1978 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1979 "path": t.path,
1980 "replace": t.replace,
1981 "operation": format!("{:?}", t.operation).to_lowercase()
1982 })).collect::<Vec<_>>(),
1983 "enabled": rule.enabled
1984 })))
1985 } else if rule_id < request_count + config.response_replacements.len() {
1986 let response_idx = rule_id - request_count;
1988 let rule = &config.response_replacements[response_idx];
1989 Ok(Json(serde_json::json!({
1990 "id": rule_id,
1991 "pattern": rule.pattern,
1992 "type": "response",
1993 "status_codes": rule.status_codes,
1994 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
1995 "path": t.path,
1996 "replace": t.replace,
1997 "operation": format!("{:?}", t.operation).to_lowercase()
1998 })).collect::<Vec<_>>(),
1999 "enabled": rule.enabled
2000 })))
2001 } else {
2002 Ok(Json(serde_json::json!({
2003 "error": format!("Rule ID {} not found", rule_id)
2004 })))
2005 }
2006}
2007
2008async fn update_proxy_rule(
2010 State(state): State<ManagementState>,
2011 Path(id): Path<String>,
2012 Json(request): Json<ProxyRuleRequest>,
2013) -> Result<Json<serde_json::Value>, StatusCode> {
2014 let proxy_config = match &state.proxy_config {
2015 Some(config) => config,
2016 None => {
2017 return Ok(Json(serde_json::json!({
2018 "error": "Proxy not configured. Proxy config not available."
2019 })));
2020 }
2021 };
2022
2023 let mut config = proxy_config.write().await;
2024 let rule_id: usize = match id.parse() {
2025 Ok(id) => id,
2026 Err(_) => {
2027 return Ok(Json(serde_json::json!({
2028 "error": format!("Invalid rule ID: {}", id)
2029 })));
2030 }
2031 };
2032
2033 let body_transforms: Vec<BodyTransform> = request
2034 .body_transforms
2035 .iter()
2036 .map(|t| {
2037 let op = match t.operation.as_str() {
2038 "replace" => TransformOperation::Replace,
2039 "add" => TransformOperation::Add,
2040 "remove" => TransformOperation::Remove,
2041 _ => TransformOperation::Replace,
2042 };
2043 BodyTransform {
2044 path: t.path.clone(),
2045 replace: t.replace.clone(),
2046 operation: op,
2047 }
2048 })
2049 .collect();
2050
2051 let updated_rule = BodyTransformRule {
2052 pattern: request.pattern.clone(),
2053 status_codes: request.status_codes.clone(),
2054 body_transforms,
2055 enabled: request.enabled,
2056 };
2057
2058 let request_count = config.request_replacements.len();
2059
2060 if rule_id < request_count {
2061 config.request_replacements[rule_id] = updated_rule;
2063 } else if rule_id < request_count + config.response_replacements.len() {
2064 let response_idx = rule_id - request_count;
2066 config.response_replacements[response_idx] = updated_rule;
2067 } else {
2068 return Ok(Json(serde_json::json!({
2069 "error": format!("Rule ID {} not found", rule_id)
2070 })));
2071 }
2072
2073 Ok(Json(serde_json::json!({
2074 "id": rule_id,
2075 "message": "Rule updated successfully"
2076 })))
2077}
2078
2079async fn delete_proxy_rule(
2081 State(state): State<ManagementState>,
2082 Path(id): Path<String>,
2083) -> Result<Json<serde_json::Value>, StatusCode> {
2084 let proxy_config = match &state.proxy_config {
2085 Some(config) => config,
2086 None => {
2087 return Ok(Json(serde_json::json!({
2088 "error": "Proxy not configured. Proxy config not available."
2089 })));
2090 }
2091 };
2092
2093 let mut config = proxy_config.write().await;
2094 let rule_id: usize = match id.parse() {
2095 Ok(id) => id,
2096 Err(_) => {
2097 return Ok(Json(serde_json::json!({
2098 "error": format!("Invalid rule ID: {}", id)
2099 })));
2100 }
2101 };
2102
2103 let request_count = config.request_replacements.len();
2104
2105 if rule_id < request_count {
2106 config.request_replacements.remove(rule_id);
2108 } else if rule_id < request_count + config.response_replacements.len() {
2109 let response_idx = rule_id - request_count;
2111 config.response_replacements.remove(response_idx);
2112 } else {
2113 return Ok(Json(serde_json::json!({
2114 "error": format!("Rule ID {} not found", rule_id)
2115 })));
2116 }
2117
2118 Ok(Json(serde_json::json!({
2119 "id": rule_id,
2120 "message": "Rule deleted successfully"
2121 })))
2122}
2123
2124async fn get_proxy_inspect(
2127 State(_state): State<ManagementState>,
2128 Query(params): Query<std::collections::HashMap<String, String>>,
2129) -> Result<Json<serde_json::Value>, StatusCode> {
2130 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2131
2132 Ok(Json(serde_json::json!({
2138 "requests": [],
2139 "responses": [],
2140 "limit": limit,
2141 "total": 0,
2142 "message": "Request/response inspection not yet implemented. This endpoint will return intercepted traffic when proxy inspection is fully integrated."
2143 })))
2144}
2145
2146pub fn management_router(state: ManagementState) -> Router {
2148 let router = Router::new()
2149 .route("/health", get(health_check))
2150 .route("/stats", get(get_stats))
2151 .route("/config", get(get_config))
2152 .route("/config/validate", post(validate_config))
2153 .route("/config/bulk", post(bulk_update_config))
2154 .route("/mocks", get(list_mocks))
2155 .route("/mocks", post(create_mock))
2156 .route("/mocks/{id}", get(get_mock))
2157 .route("/mocks/{id}", put(update_mock))
2158 .route("/mocks/{id}", delete(delete_mock))
2159 .route("/export", get(export_mocks))
2160 .route("/import", post(import_mocks));
2161
2162 #[cfg(feature = "smtp")]
2163 let router = router
2164 .route("/smtp/mailbox", get(list_smtp_emails))
2165 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2166 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2167 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2168 .route("/smtp/mailbox/search", get(search_smtp_emails));
2169
2170 #[cfg(not(feature = "smtp"))]
2171 let router = router;
2172
2173 #[cfg(feature = "mqtt")]
2175 let router = router
2176 .route("/mqtt/stats", get(get_mqtt_stats))
2177 .route("/mqtt/clients", get(get_mqtt_clients))
2178 .route("/mqtt/topics", get(get_mqtt_topics))
2179 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2180 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2181 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2182 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2183
2184 #[cfg(not(feature = "mqtt"))]
2185 let router = router
2186 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2187 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2188
2189 #[cfg(feature = "kafka")]
2190 let router = router
2191 .route("/kafka/stats", get(get_kafka_stats))
2192 .route("/kafka/topics", get(get_kafka_topics))
2193 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2194 .route("/kafka/groups", get(get_kafka_groups))
2195 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2196 .route("/kafka/produce", post(produce_kafka_message))
2197 .route("/kafka/produce/batch", post(produce_kafka_batch))
2198 .route("/kafka/messages/stream", get(kafka_messages_stream));
2199
2200 #[cfg(not(feature = "kafka"))]
2201 let router = router;
2202
2203 let router = router
2205 .route("/migration/routes", get(get_migration_routes))
2206 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2207 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2208 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2209 .route("/migration/groups/{group}", put(set_group_migration_mode))
2210 .route("/migration/groups", get(get_migration_groups))
2211 .route("/migration/status", get(get_migration_status));
2212
2213 let router = router
2215 .route("/proxy/rules", get(list_proxy_rules))
2216 .route("/proxy/rules", post(create_proxy_rule))
2217 .route("/proxy/rules/{id}", get(get_proxy_rule))
2218 .route("/proxy/rules/{id}", put(update_proxy_rule))
2219 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2220 .route("/proxy/inspect", get(get_proxy_inspect));
2221
2222 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2224
2225 let router = router.nest(
2227 "/snapshot-diff",
2228 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2229 );
2230
2231 #[cfg(feature = "behavioral-cloning")]
2232 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2233
2234 let router = router
2235 .route("/mockai/learn", post(learn_from_examples))
2236 .route("/mockai/rules/explanations", get(list_rule_explanations))
2237 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2238 .route("/chaos/config", get(get_chaos_config))
2239 .route("/chaos/config", post(update_chaos_config))
2240 .route("/network/profiles", get(list_network_profiles))
2241 .route("/network/profile/apply", post(apply_network_profile));
2242
2243 let router =
2245 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2246
2247 router.with_state(state)
2248}
2249
2250#[cfg(feature = "kafka")]
2251#[derive(Debug, Clone, Serialize, Deserialize)]
2252pub struct KafkaBrokerStats {
2253 pub topics: usize,
2255 pub partitions: usize,
2257 pub consumer_groups: usize,
2259 pub messages_produced: u64,
2261 pub messages_consumed: u64,
2263}
2264
2265#[cfg(feature = "kafka")]
2266#[derive(Debug, Clone, Serialize, Deserialize)]
2267pub struct KafkaTopicInfo {
2268 pub name: String,
2269 pub partitions: usize,
2270 pub replication_factor: i32,
2271}
2272
2273#[cfg(feature = "kafka")]
2274#[derive(Debug, Clone, Serialize, Deserialize)]
2275pub struct KafkaConsumerGroupInfo {
2276 pub group_id: String,
2277 pub members: usize,
2278 pub state: String,
2279}
2280
2281#[cfg(feature = "kafka")]
2282async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2284 if let Some(broker) = &state.kafka_broker {
2285 let topics = broker.topics.read().await;
2286 let consumer_groups = broker.consumer_groups.read().await;
2287
2288 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2289
2290 let metrics_snapshot = broker.metrics().snapshot();
2292
2293 let stats = KafkaBrokerStats {
2294 topics: topics.len(),
2295 partitions: total_partitions,
2296 consumer_groups: consumer_groups.groups().len(),
2297 messages_produced: metrics_snapshot.messages_produced_total,
2298 messages_consumed: metrics_snapshot.messages_consumed_total,
2299 };
2300
2301 Json(stats).into_response()
2302 } else {
2303 (
2304 StatusCode::SERVICE_UNAVAILABLE,
2305 Json(serde_json::json!({
2306 "error": "Kafka broker not available",
2307 "message": "Kafka broker is not enabled or not available."
2308 })),
2309 )
2310 .into_response()
2311 }
2312}
2313
2314#[cfg(feature = "kafka")]
2315async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2317 if let Some(broker) = &state.kafka_broker {
2318 let topics = broker.topics.read().await;
2319 let topic_list: Vec<KafkaTopicInfo> = topics
2320 .iter()
2321 .map(|(name, topic)| KafkaTopicInfo {
2322 name: name.clone(),
2323 partitions: topic.partitions.len(),
2324 replication_factor: topic.config.replication_factor as i32,
2325 })
2326 .collect();
2327
2328 Json(serde_json::json!({
2329 "topics": topic_list
2330 }))
2331 .into_response()
2332 } else {
2333 (
2334 StatusCode::SERVICE_UNAVAILABLE,
2335 Json(serde_json::json!({
2336 "error": "Kafka broker not available",
2337 "message": "Kafka broker is not enabled or not available."
2338 })),
2339 )
2340 .into_response()
2341 }
2342}
2343
2344#[cfg(feature = "kafka")]
2345async fn get_kafka_topic(
2347 State(state): State<ManagementState>,
2348 Path(topic_name): Path<String>,
2349) -> impl IntoResponse {
2350 if let Some(broker) = &state.kafka_broker {
2351 let topics = broker.topics.read().await;
2352 if let Some(topic) = topics.get(&topic_name) {
2353 Json(serde_json::json!({
2354 "name": topic_name,
2355 "partitions": topic.partitions.len(),
2356 "replication_factor": topic.config.replication_factor,
2357 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2358 "id": idx as i32,
2359 "leader": 0,
2360 "replicas": vec![0],
2361 "message_count": partition.messages.len()
2362 })).collect::<Vec<_>>()
2363 })).into_response()
2364 } else {
2365 (
2366 StatusCode::NOT_FOUND,
2367 Json(serde_json::json!({
2368 "error": "Topic not found",
2369 "topic": topic_name
2370 })),
2371 )
2372 .into_response()
2373 }
2374 } else {
2375 (
2376 StatusCode::SERVICE_UNAVAILABLE,
2377 Json(serde_json::json!({
2378 "error": "Kafka broker not available",
2379 "message": "Kafka broker is not enabled or not available."
2380 })),
2381 )
2382 .into_response()
2383 }
2384}
2385
2386#[cfg(feature = "kafka")]
2387async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2389 if let Some(broker) = &state.kafka_broker {
2390 let consumer_groups = broker.consumer_groups.read().await;
2391 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2392 .groups()
2393 .iter()
2394 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2395 group_id: group_id.clone(),
2396 members: group.members.len(),
2397 state: "Stable".to_string(), })
2399 .collect();
2400
2401 Json(serde_json::json!({
2402 "groups": groups
2403 }))
2404 .into_response()
2405 } else {
2406 (
2407 StatusCode::SERVICE_UNAVAILABLE,
2408 Json(serde_json::json!({
2409 "error": "Kafka broker not available",
2410 "message": "Kafka broker is not enabled or not available."
2411 })),
2412 )
2413 .into_response()
2414 }
2415}
2416
2417#[cfg(feature = "kafka")]
2418async fn get_kafka_group(
2420 State(state): State<ManagementState>,
2421 Path(group_id): Path<String>,
2422) -> impl IntoResponse {
2423 if let Some(broker) = &state.kafka_broker {
2424 let consumer_groups = broker.consumer_groups.read().await;
2425 if let Some(group) = consumer_groups.groups().get(&group_id) {
2426 Json(serde_json::json!({
2427 "group_id": group_id,
2428 "members": group.members.len(),
2429 "state": "Stable",
2430 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2431 "member_id": member_id,
2432 "client_id": member.client_id,
2433 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2434 "topic": a.topic,
2435 "partitions": a.partitions
2436 })).collect::<Vec<_>>()
2437 })).collect::<Vec<_>>(),
2438 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2439 "topic": topic,
2440 "partition": partition,
2441 "offset": offset
2442 })).collect::<Vec<_>>()
2443 })).into_response()
2444 } else {
2445 (
2446 StatusCode::NOT_FOUND,
2447 Json(serde_json::json!({
2448 "error": "Consumer group not found",
2449 "group_id": group_id
2450 })),
2451 )
2452 .into_response()
2453 }
2454 } else {
2455 (
2456 StatusCode::SERVICE_UNAVAILABLE,
2457 Json(serde_json::json!({
2458 "error": "Kafka broker not available",
2459 "message": "Kafka broker is not enabled or not available."
2460 })),
2461 )
2462 .into_response()
2463 }
2464}
2465
2466#[cfg(feature = "kafka")]
2469#[derive(Debug, Deserialize)]
2470pub struct KafkaProduceRequest {
2471 pub topic: String,
2473 #[serde(default)]
2475 pub key: Option<String>,
2476 pub value: String,
2478 #[serde(default)]
2480 pub partition: Option<i32>,
2481 #[serde(default)]
2483 pub headers: Option<std::collections::HashMap<String, String>>,
2484}
2485
2486#[cfg(feature = "kafka")]
2487async fn produce_kafka_message(
2489 State(state): State<ManagementState>,
2490 Json(request): Json<KafkaProduceRequest>,
2491) -> impl IntoResponse {
2492 if let Some(broker) = &state.kafka_broker {
2493 let mut topics = broker.topics.write().await;
2494
2495 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2497 mockforge_kafka::topics::Topic::new(
2498 request.topic.clone(),
2499 mockforge_kafka::topics::TopicConfig::default(),
2500 )
2501 });
2502
2503 let partition_id = if let Some(partition) = request.partition {
2505 partition
2506 } else {
2507 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2508 };
2509
2510 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2512 return (
2513 StatusCode::BAD_REQUEST,
2514 Json(serde_json::json!({
2515 "error": "Invalid partition",
2516 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2517 })),
2518 )
2519 .into_response();
2520 }
2521
2522 let key_clone = request.key.clone();
2524 let headers_clone = request.headers.clone();
2525 let message = mockforge_kafka::partitions::KafkaMessage {
2526 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2528 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2529 value: request.value.as_bytes().to_vec(),
2530 headers: headers_clone
2531 .clone()
2532 .unwrap_or_default()
2533 .into_iter()
2534 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2535 .collect(),
2536 };
2537
2538 match topic_entry.produce(partition_id, message).await {
2540 Ok(offset) => {
2541 if let Some(broker) = &state.kafka_broker {
2543 broker.metrics().record_messages_produced(1);
2544 }
2545
2546 #[cfg(feature = "kafka")]
2548 {
2549 let event = MessageEvent::Kafka(KafkaMessageEvent {
2550 topic: request.topic.clone(),
2551 key: key_clone,
2552 value: request.value.clone(),
2553 partition: partition_id,
2554 offset,
2555 headers: headers_clone,
2556 timestamp: chrono::Utc::now().to_rfc3339(),
2557 });
2558 let _ = state.message_events.send(event);
2559 }
2560
2561 Json(serde_json::json!({
2562 "success": true,
2563 "message": format!("Message produced to topic '{}'", request.topic),
2564 "topic": request.topic,
2565 "partition": partition_id,
2566 "offset": offset
2567 }))
2568 .into_response()
2569 }
2570 Err(e) => (
2571 StatusCode::INTERNAL_SERVER_ERROR,
2572 Json(serde_json::json!({
2573 "error": "Failed to produce message",
2574 "message": e.to_string()
2575 })),
2576 )
2577 .into_response(),
2578 }
2579 } else {
2580 (
2581 StatusCode::SERVICE_UNAVAILABLE,
2582 Json(serde_json::json!({
2583 "error": "Kafka broker not available",
2584 "message": "Kafka broker is not enabled or not available."
2585 })),
2586 )
2587 .into_response()
2588 }
2589}
2590
2591#[cfg(feature = "kafka")]
2592#[derive(Debug, Deserialize)]
2593pub struct KafkaBatchProduceRequest {
2594 pub messages: Vec<KafkaProduceRequest>,
2596 #[serde(default = "default_delay")]
2598 pub delay_ms: u64,
2599}
2600
2601#[cfg(feature = "kafka")]
2602async fn produce_kafka_batch(
2604 State(state): State<ManagementState>,
2605 Json(request): Json<KafkaBatchProduceRequest>,
2606) -> impl IntoResponse {
2607 if let Some(broker) = &state.kafka_broker {
2608 if request.messages.is_empty() {
2609 return (
2610 StatusCode::BAD_REQUEST,
2611 Json(serde_json::json!({
2612 "error": "Empty batch",
2613 "message": "At least one message is required"
2614 })),
2615 )
2616 .into_response();
2617 }
2618
2619 let mut results = Vec::new();
2620
2621 for (index, msg_request) in request.messages.iter().enumerate() {
2622 let mut topics = broker.topics.write().await;
2623
2624 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2626 mockforge_kafka::topics::Topic::new(
2627 msg_request.topic.clone(),
2628 mockforge_kafka::topics::TopicConfig::default(),
2629 )
2630 });
2631
2632 let partition_id = if let Some(partition) = msg_request.partition {
2634 partition
2635 } else {
2636 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2637 };
2638
2639 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2641 results.push(serde_json::json!({
2642 "index": index,
2643 "success": false,
2644 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2645 }));
2646 continue;
2647 }
2648
2649 let message = mockforge_kafka::partitions::KafkaMessage {
2651 offset: 0,
2652 timestamp: chrono::Utc::now().timestamp_millis(),
2653 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2654 value: msg_request.value.as_bytes().to_vec(),
2655 headers: msg_request
2656 .headers
2657 .clone()
2658 .unwrap_or_default()
2659 .into_iter()
2660 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2661 .collect(),
2662 };
2663
2664 match topic_entry.produce(partition_id, message).await {
2666 Ok(offset) => {
2667 if let Some(broker) = &state.kafka_broker {
2669 broker.metrics().record_messages_produced(1);
2670 }
2671
2672 let event = MessageEvent::Kafka(KafkaMessageEvent {
2674 topic: msg_request.topic.clone(),
2675 key: msg_request.key.clone(),
2676 value: msg_request.value.clone(),
2677 partition: partition_id,
2678 offset,
2679 headers: msg_request.headers.clone(),
2680 timestamp: chrono::Utc::now().to_rfc3339(),
2681 });
2682 let _ = state.message_events.send(event);
2683
2684 results.push(serde_json::json!({
2685 "index": index,
2686 "success": true,
2687 "topic": msg_request.topic,
2688 "partition": partition_id,
2689 "offset": offset
2690 }));
2691 }
2692 Err(e) => {
2693 results.push(serde_json::json!({
2694 "index": index,
2695 "success": false,
2696 "error": e.to_string()
2697 }));
2698 }
2699 }
2700
2701 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2703 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2704 }
2705 }
2706
2707 let success_count =
2708 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2709
2710 Json(serde_json::json!({
2711 "success": true,
2712 "total": request.messages.len(),
2713 "succeeded": success_count,
2714 "failed": request.messages.len() - success_count,
2715 "results": results
2716 }))
2717 .into_response()
2718 } else {
2719 (
2720 StatusCode::SERVICE_UNAVAILABLE,
2721 Json(serde_json::json!({
2722 "error": "Kafka broker not available",
2723 "message": "Kafka broker is not enabled or not available."
2724 })),
2725 )
2726 .into_response()
2727 }
2728}
2729
2730#[cfg(feature = "mqtt")]
2733async fn mqtt_messages_stream(
2735 State(state): State<ManagementState>,
2736 Query(params): Query<std::collections::HashMap<String, String>>,
2737) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2738 let rx = state.message_events.subscribe();
2739 let topic_filter = params.get("topic").cloned();
2740
2741 let stream = stream::unfold(rx, move |mut rx| {
2742 let topic_filter = topic_filter.clone();
2743
2744 async move {
2745 loop {
2746 match rx.recv().await {
2747 Ok(MessageEvent::Mqtt(event)) => {
2748 if let Some(filter) = &topic_filter {
2750 if !event.topic.contains(filter) {
2751 continue;
2752 }
2753 }
2754
2755 let event_json = serde_json::json!({
2756 "protocol": "mqtt",
2757 "topic": event.topic,
2758 "payload": event.payload,
2759 "qos": event.qos,
2760 "retain": event.retain,
2761 "timestamp": event.timestamp,
2762 });
2763
2764 if let Ok(event_data) = serde_json::to_string(&event_json) {
2765 let sse_event = Event::default().event("mqtt_message").data(event_data);
2766 return Some((Ok(sse_event), rx));
2767 }
2768 }
2769 #[cfg(feature = "kafka")]
2770 Ok(MessageEvent::Kafka(_)) => {
2771 continue;
2773 }
2774 Err(broadcast::error::RecvError::Closed) => {
2775 return None;
2776 }
2777 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2778 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2779 continue;
2780 }
2781 }
2782 }
2783 }
2784 });
2785
2786 Sse::new(stream).keep_alive(
2787 axum::response::sse::KeepAlive::new()
2788 .interval(std::time::Duration::from_secs(15))
2789 .text("keep-alive-text"),
2790 )
2791}
2792
2793#[cfg(feature = "kafka")]
2794async fn kafka_messages_stream(
2796 State(state): State<ManagementState>,
2797 Query(params): Query<std::collections::HashMap<String, String>>,
2798) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2799 let mut rx = state.message_events.subscribe();
2800 let topic_filter = params.get("topic").cloned();
2801
2802 let stream = stream::unfold(rx, move |mut rx| {
2803 let topic_filter = topic_filter.clone();
2804
2805 async move {
2806 loop {
2807 match rx.recv().await {
2808 #[cfg(feature = "mqtt")]
2809 Ok(MessageEvent::Mqtt(_)) => {
2810 continue;
2812 }
2813 Ok(MessageEvent::Kafka(event)) => {
2814 if let Some(filter) = &topic_filter {
2816 if !event.topic.contains(filter) {
2817 continue;
2818 }
2819 }
2820
2821 let event_json = serde_json::json!({
2822 "protocol": "kafka",
2823 "topic": event.topic,
2824 "key": event.key,
2825 "value": event.value,
2826 "partition": event.partition,
2827 "offset": event.offset,
2828 "headers": event.headers,
2829 "timestamp": event.timestamp,
2830 });
2831
2832 if let Ok(event_data) = serde_json::to_string(&event_json) {
2833 let sse_event =
2834 Event::default().event("kafka_message").data(event_data);
2835 return Some((Ok(sse_event), rx));
2836 }
2837 }
2838 Err(broadcast::error::RecvError::Closed) => {
2839 return None;
2840 }
2841 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2842 warn!("Kafka message stream lagged, skipped {} messages", skipped);
2843 continue;
2844 }
2845 }
2846 }
2847 }
2848 });
2849
2850 Sse::new(stream).keep_alive(
2851 axum::response::sse::KeepAlive::new()
2852 .interval(std::time::Duration::from_secs(15))
2853 .text("keep-alive-text"),
2854 )
2855}
2856
2857#[derive(Debug, Deserialize)]
2861pub struct GenerateSpecRequest {
2862 pub query: String,
2864 pub spec_type: String,
2866 pub api_version: Option<String>,
2868}
2869
2870#[derive(Debug, Deserialize)]
2872pub struct GenerateOpenApiFromTrafficRequest {
2873 #[serde(default)]
2875 pub database_path: Option<String>,
2876 #[serde(default)]
2878 pub since: Option<String>,
2879 #[serde(default)]
2881 pub until: Option<String>,
2882 #[serde(default)]
2884 pub path_pattern: Option<String>,
2885 #[serde(default = "default_min_confidence")]
2887 pub min_confidence: f64,
2888}
2889
2890fn default_min_confidence() -> f64 {
2891 0.7
2892}
2893
2894#[cfg(feature = "data-faker")]
2896async fn generate_ai_spec(
2897 State(_state): State<ManagementState>,
2898 Json(request): Json<GenerateSpecRequest>,
2899) -> impl IntoResponse {
2900 use mockforge_data::rag::{
2901 config::{LlmProvider, RagConfig},
2902 engine::RagEngine,
2903 storage::DocumentStorage,
2904 };
2905 use std::sync::Arc;
2906
2907 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
2909 .ok()
2910 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
2911
2912 if api_key.is_none() {
2914 return (
2915 StatusCode::SERVICE_UNAVAILABLE,
2916 Json(serde_json::json!({
2917 "error": "AI service not configured",
2918 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
2919 })),
2920 )
2921 .into_response();
2922 }
2923
2924 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
2926 .unwrap_or_else(|_| "openai".to_string())
2927 .to_lowercase();
2928
2929 let provider = match provider_str.as_str() {
2930 "openai" => LlmProvider::OpenAI,
2931 "anthropic" => LlmProvider::Anthropic,
2932 "ollama" => LlmProvider::Ollama,
2933 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
2934 _ => LlmProvider::OpenAI,
2935 };
2936
2937 let api_endpoint =
2938 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
2939 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
2940 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
2941 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
2942 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
2943 });
2944
2945 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
2946 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
2947 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
2948 LlmProvider::Ollama => "llama2".to_string(),
2949 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
2950 });
2951
2952 let mut rag_config = RagConfig::default();
2954 rag_config.provider = provider;
2955 rag_config.api_endpoint = api_endpoint;
2956 rag_config.api_key = api_key;
2957 rag_config.model = model;
2958 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
2959 .unwrap_or_else(|_| "4096".to_string())
2960 .parse()
2961 .unwrap_or(4096);
2962 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
2963 .unwrap_or_else(|_| "0.3".to_string())
2964 .parse()
2965 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
2967 .unwrap_or_else(|_| "60".to_string())
2968 .parse()
2969 .unwrap_or(60);
2970 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
2971 .unwrap_or_else(|_| "4000".to_string())
2972 .parse()
2973 .unwrap_or(4000);
2974
2975 let spec_type_label = match request.spec_type.as_str() {
2977 "openapi" => "OpenAPI 3.0",
2978 "graphql" => "GraphQL",
2979 "asyncapi" => "AsyncAPI",
2980 _ => "OpenAPI 3.0",
2981 };
2982
2983 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
2984
2985 let prompt = format!(
2986 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
2987
2988User Requirements:
2989{}
2990
2991Instructions:
29921. Generate a complete, valid {} specification
29932. Include all paths, operations, request/response schemas, and components
29943. Use realistic field names and data types
29954. Include proper descriptions and examples
29965. Follow {} best practices
29976. Return ONLY the specification, no additional explanation
29987. For OpenAPI, use version {}
2999
3000Return the specification in {} format."#,
3001 spec_type_label,
3002 request.query,
3003 spec_type_label,
3004 spec_type_label,
3005 api_version,
3006 if request.spec_type == "graphql" {
3007 "GraphQL SDL"
3008 } else {
3009 "YAML"
3010 }
3011 );
3012
3013 use mockforge_data::rag::storage::InMemoryStorage;
3018 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3019
3020 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3022 Ok(engine) => engine,
3023 Err(e) => {
3024 return (
3025 StatusCode::INTERNAL_SERVER_ERROR,
3026 Json(serde_json::json!({
3027 "error": "Failed to initialize RAG engine",
3028 "message": e.to_string()
3029 })),
3030 )
3031 .into_response();
3032 }
3033 };
3034
3035 match rag_engine.generate(&prompt, None).await {
3037 Ok(generated_text) => {
3038 let spec = if request.spec_type == "graphql" {
3040 extract_graphql_schema(&generated_text)
3042 } else {
3043 extract_yaml_spec(&generated_text)
3045 };
3046
3047 Json(serde_json::json!({
3048 "success": true,
3049 "spec": spec,
3050 "spec_type": request.spec_type,
3051 }))
3052 .into_response()
3053 }
3054 Err(e) => (
3055 StatusCode::INTERNAL_SERVER_ERROR,
3056 Json(serde_json::json!({
3057 "error": "AI generation failed",
3058 "message": e.to_string()
3059 })),
3060 )
3061 .into_response(),
3062 }
3063}
3064
3065#[cfg(not(feature = "data-faker"))]
3066async fn generate_ai_spec(
3067 State(_state): State<ManagementState>,
3068 Json(_request): Json<GenerateSpecRequest>,
3069) -> impl IntoResponse {
3070 (
3071 StatusCode::NOT_IMPLEMENTED,
3072 Json(serde_json::json!({
3073 "error": "AI features not enabled",
3074 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3075 })),
3076 )
3077 .into_response()
3078}
3079
3080#[cfg(feature = "behavioral-cloning")]
3082async fn generate_openapi_from_traffic(
3083 State(_state): State<ManagementState>,
3084 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3085) -> impl IntoResponse {
3086 use chrono::{DateTime, Utc};
3087 use mockforge_core::intelligent_behavior::{
3088 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3089 IntelligentBehaviorConfig,
3090 };
3091 use mockforge_recorder::{
3092 database::RecorderDatabase,
3093 openapi_export::{QueryFilters, RecordingsToOpenApi},
3094 };
3095 use std::path::PathBuf;
3096
3097 let db_path = if let Some(ref path) = request.database_path {
3099 PathBuf::from(path)
3100 } else {
3101 std::env::current_dir()
3102 .unwrap_or_else(|_| PathBuf::from("."))
3103 .join("recordings.db")
3104 };
3105
3106 let db = match RecorderDatabase::new(&db_path).await {
3108 Ok(db) => db,
3109 Err(e) => {
3110 return (
3111 StatusCode::BAD_REQUEST,
3112 Json(serde_json::json!({
3113 "error": "Database error",
3114 "message": format!("Failed to open recorder database: {}", e)
3115 })),
3116 )
3117 .into_response();
3118 }
3119 };
3120
3121 let since_dt = if let Some(ref since_str) = request.since {
3123 match DateTime::parse_from_rfc3339(since_str) {
3124 Ok(dt) => Some(dt.with_timezone(&Utc)),
3125 Err(e) => {
3126 return (
3127 StatusCode::BAD_REQUEST,
3128 Json(serde_json::json!({
3129 "error": "Invalid date format",
3130 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3131 })),
3132 )
3133 .into_response();
3134 }
3135 }
3136 } else {
3137 None
3138 };
3139
3140 let until_dt = if let Some(ref until_str) = request.until {
3141 match DateTime::parse_from_rfc3339(until_str) {
3142 Ok(dt) => Some(dt.with_timezone(&Utc)),
3143 Err(e) => {
3144 return (
3145 StatusCode::BAD_REQUEST,
3146 Json(serde_json::json!({
3147 "error": "Invalid date format",
3148 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3149 })),
3150 )
3151 .into_response();
3152 }
3153 }
3154 } else {
3155 None
3156 };
3157
3158 let query_filters = QueryFilters {
3160 since: since_dt,
3161 until: until_dt,
3162 path_pattern: request.path_pattern.clone(),
3163 min_status_code: None,
3164 max_requests: Some(1000),
3165 };
3166
3167 let exchanges_from_recorder =
3172 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3173 Ok(exchanges) => exchanges,
3174 Err(e) => {
3175 return (
3176 StatusCode::INTERNAL_SERVER_ERROR,
3177 Json(serde_json::json!({
3178 "error": "Query error",
3179 "message": format!("Failed to query HTTP exchanges: {}", e)
3180 })),
3181 )
3182 .into_response();
3183 }
3184 };
3185
3186 if exchanges_from_recorder.is_empty() {
3187 return (
3188 StatusCode::NOT_FOUND,
3189 Json(serde_json::json!({
3190 "error": "No exchanges found",
3191 "message": "No HTTP exchanges found matching the specified filters"
3192 })),
3193 )
3194 .into_response();
3195 }
3196
3197 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3199 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3200 .into_iter()
3201 .map(|e| LocalHttpExchange {
3202 method: e.method,
3203 path: e.path,
3204 query_params: e.query_params,
3205 headers: e.headers,
3206 body: e.body,
3207 body_encoding: e.body_encoding,
3208 status_code: e.status_code,
3209 response_headers: e.response_headers,
3210 response_body: e.response_body,
3211 response_body_encoding: e.response_body_encoding,
3212 timestamp: e.timestamp,
3213 })
3214 .collect();
3215
3216 let behavior_config = IntelligentBehaviorConfig::default();
3218 let gen_config = OpenApiGenerationConfig {
3219 min_confidence: request.min_confidence,
3220 behavior_model: Some(behavior_config.behavior_model),
3221 };
3222
3223 let generator = OpenApiSpecGenerator::new(gen_config);
3225 let result = match generator.generate_from_exchanges(exchanges).await {
3226 Ok(result) => result,
3227 Err(e) => {
3228 return (
3229 StatusCode::INTERNAL_SERVER_ERROR,
3230 Json(serde_json::json!({
3231 "error": "Generation error",
3232 "message": format!("Failed to generate OpenAPI spec: {}", e)
3233 })),
3234 )
3235 .into_response();
3236 }
3237 };
3238
3239 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3241 raw.clone()
3242 } else {
3243 match serde_json::to_value(&result.spec.spec) {
3244 Ok(json) => json,
3245 Err(e) => {
3246 return (
3247 StatusCode::INTERNAL_SERVER_ERROR,
3248 Json(serde_json::json!({
3249 "error": "Serialization error",
3250 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3251 })),
3252 )
3253 .into_response();
3254 }
3255 }
3256 };
3257
3258 let response = serde_json::json!({
3260 "spec": spec_json,
3261 "metadata": {
3262 "requests_analyzed": result.metadata.requests_analyzed,
3263 "paths_inferred": result.metadata.paths_inferred,
3264 "path_confidence": result.metadata.path_confidence,
3265 "generated_at": result.metadata.generated_at.to_rfc3339(),
3266 "duration_ms": result.metadata.duration_ms,
3267 }
3268 });
3269
3270 Json(response).into_response()
3271}
3272
3273async fn list_rule_explanations(
3275 State(state): State<ManagementState>,
3276 Query(params): Query<std::collections::HashMap<String, String>>,
3277) -> impl IntoResponse {
3278 use mockforge_core::intelligent_behavior::RuleType;
3279
3280 let explanations = state.rule_explanations.read().await;
3281 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3282
3283 if let Some(rule_type_str) = params.get("rule_type") {
3285 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3286 explanations_vec.retain(|e| e.rule_type == rule_type);
3287 }
3288 }
3289
3290 if let Some(min_confidence_str) = params.get("min_confidence") {
3292 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3293 explanations_vec.retain(|e| e.confidence >= min_confidence);
3294 }
3295 }
3296
3297 explanations_vec.sort_by(|a, b| {
3299 b.confidence
3300 .partial_cmp(&a.confidence)
3301 .unwrap_or(std::cmp::Ordering::Equal)
3302 .then_with(|| b.generated_at.cmp(&a.generated_at))
3303 });
3304
3305 Json(serde_json::json!({
3306 "explanations": explanations_vec,
3307 "total": explanations_vec.len(),
3308 }))
3309 .into_response()
3310}
3311
3312async fn get_rule_explanation(
3314 State(state): State<ManagementState>,
3315 Path(rule_id): Path<String>,
3316) -> impl IntoResponse {
3317 let explanations = state.rule_explanations.read().await;
3318
3319 match explanations.get(&rule_id) {
3320 Some(explanation) => Json(serde_json::json!({
3321 "explanation": explanation,
3322 }))
3323 .into_response(),
3324 None => (
3325 StatusCode::NOT_FOUND,
3326 Json(serde_json::json!({
3327 "error": "Rule explanation not found",
3328 "message": format!("No explanation found for rule ID: {}", rule_id)
3329 })),
3330 )
3331 .into_response(),
3332 }
3333}
3334
3335#[derive(Debug, Deserialize)]
3337pub struct LearnFromExamplesRequest {
3338 pub examples: Vec<ExamplePairRequest>,
3340 #[serde(default)]
3342 pub config: Option<serde_json::Value>,
3343}
3344
3345#[derive(Debug, Deserialize)]
3347pub struct ExamplePairRequest {
3348 pub request: serde_json::Value,
3350 pub response: serde_json::Value,
3352}
3353
3354async fn learn_from_examples(
3359 State(state): State<ManagementState>,
3360 Json(request): Json<LearnFromExamplesRequest>,
3361) -> impl IntoResponse {
3362 use mockforge_core::intelligent_behavior::{
3363 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3364 rule_generator::{ExamplePair, RuleGenerator},
3365 };
3366
3367 if request.examples.is_empty() {
3368 return (
3369 StatusCode::BAD_REQUEST,
3370 Json(serde_json::json!({
3371 "error": "No examples provided",
3372 "message": "At least one example pair is required"
3373 })),
3374 )
3375 .into_response();
3376 }
3377
3378 let example_pairs: Result<Vec<ExamplePair>, String> = request
3380 .examples
3381 .into_iter()
3382 .enumerate()
3383 .map(|(idx, ex)| {
3384 let method = ex
3386 .request
3387 .get("method")
3388 .and_then(|v| v.as_str())
3389 .map(|s| s.to_string())
3390 .unwrap_or_else(|| "GET".to_string());
3391 let path = ex
3392 .request
3393 .get("path")
3394 .and_then(|v| v.as_str())
3395 .map(|s| s.to_string())
3396 .unwrap_or_else(|| "/".to_string());
3397 let request_body = ex.request.get("body").cloned();
3398 let query_params = ex
3399 .request
3400 .get("query_params")
3401 .and_then(|v| v.as_object())
3402 .map(|obj| {
3403 obj.iter()
3404 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3405 .collect()
3406 })
3407 .unwrap_or_default();
3408 let headers = ex
3409 .request
3410 .get("headers")
3411 .and_then(|v| v.as_object())
3412 .map(|obj| {
3413 obj.iter()
3414 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3415 .collect()
3416 })
3417 .unwrap_or_default();
3418
3419 let status = ex
3421 .response
3422 .get("status_code")
3423 .or_else(|| ex.response.get("status"))
3424 .and_then(|v| v.as_u64())
3425 .map(|n| n as u16)
3426 .unwrap_or(200);
3427 let response_body = ex.response.get("body").cloned();
3428
3429 Ok(ExamplePair {
3430 method,
3431 path,
3432 request: request_body,
3433 status,
3434 response: response_body,
3435 query_params,
3436 headers,
3437 metadata: {
3438 let mut meta = std::collections::HashMap::new();
3439 meta.insert("source".to_string(), "api".to_string());
3440 meta.insert("example_index".to_string(), idx.to_string());
3441 meta
3442 },
3443 })
3444 })
3445 .collect();
3446
3447 let example_pairs = match example_pairs {
3448 Ok(pairs) => pairs,
3449 Err(e) => {
3450 return (
3451 StatusCode::BAD_REQUEST,
3452 Json(serde_json::json!({
3453 "error": "Invalid examples",
3454 "message": e
3455 })),
3456 )
3457 .into_response();
3458 }
3459 };
3460
3461 let behavior_config = if let Some(config_json) = request.config {
3463 serde_json::from_value(config_json)
3465 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3466 .behavior_model
3467 } else {
3468 BehaviorModelConfig::default()
3469 };
3470
3471 let generator = RuleGenerator::new(behavior_config);
3473
3474 let (rules, explanations) =
3476 match generator.generate_rules_with_explanations(example_pairs).await {
3477 Ok(result) => result,
3478 Err(e) => {
3479 return (
3480 StatusCode::INTERNAL_SERVER_ERROR,
3481 Json(serde_json::json!({
3482 "error": "Rule generation failed",
3483 "message": format!("Failed to generate rules: {}", e)
3484 })),
3485 )
3486 .into_response();
3487 }
3488 };
3489
3490 {
3492 let mut stored_explanations = state.rule_explanations.write().await;
3493 for explanation in &explanations {
3494 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3495 }
3496 }
3497
3498 let response = serde_json::json!({
3500 "success": true,
3501 "rules_generated": {
3502 "consistency_rules": rules.consistency_rules.len(),
3503 "schemas": rules.schemas.len(),
3504 "state_machines": rules.state_transitions.len(),
3505 "system_prompt": !rules.system_prompt.is_empty(),
3506 },
3507 "explanations": explanations.iter().map(|e| serde_json::json!({
3508 "rule_id": e.rule_id,
3509 "rule_type": e.rule_type,
3510 "confidence": e.confidence,
3511 "reasoning": e.reasoning,
3512 })).collect::<Vec<_>>(),
3513 "total_explanations": explanations.len(),
3514 });
3515
3516 Json(response).into_response()
3517}
3518
3519#[cfg(feature = "data-faker")]
3520fn extract_yaml_spec(text: &str) -> String {
3521 if let Some(start) = text.find("```yaml") {
3523 let yaml_start = text[start + 7..].trim_start();
3524 if let Some(end) = yaml_start.find("```") {
3525 return yaml_start[..end].trim().to_string();
3526 }
3527 }
3528 if let Some(start) = text.find("```") {
3529 let content_start = text[start + 3..].trim_start();
3530 if let Some(end) = content_start.find("```") {
3531 return content_start[..end].trim().to_string();
3532 }
3533 }
3534
3535 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3537 return text.trim().to_string();
3538 }
3539
3540 text.trim().to_string()
3542}
3543
3544#[cfg(feature = "data-faker")]
3546fn extract_graphql_schema(text: &str) -> String {
3547 if let Some(start) = text.find("```graphql") {
3549 let schema_start = text[start + 10..].trim_start();
3550 if let Some(end) = schema_start.find("```") {
3551 return schema_start[..end].trim().to_string();
3552 }
3553 }
3554 if let Some(start) = text.find("```") {
3555 let content_start = text[start + 3..].trim_start();
3556 if let Some(end) = content_start.find("```") {
3557 return content_start[..end].trim().to_string();
3558 }
3559 }
3560
3561 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3563 return text.trim().to_string();
3564 }
3565
3566 text.trim().to_string()
3567}
3568
3569async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3573 #[cfg(feature = "chaos")]
3574 {
3575 if let Some(chaos_state) = &_state.chaos_api_state {
3576 let config = chaos_state.config.read().await;
3577 Json(serde_json::json!({
3579 "enabled": config.enabled,
3580 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3581 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3582 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3583 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3584 }))
3585 .into_response()
3586 } else {
3587 Json(serde_json::json!({
3589 "enabled": false,
3590 "latency": null,
3591 "fault_injection": null,
3592 "rate_limit": null,
3593 "traffic_shaping": null,
3594 }))
3595 .into_response()
3596 }
3597 }
3598 #[cfg(not(feature = "chaos"))]
3599 {
3600 Json(serde_json::json!({
3602 "enabled": false,
3603 "latency": null,
3604 "fault_injection": null,
3605 "rate_limit": null,
3606 "traffic_shaping": null,
3607 }))
3608 .into_response()
3609 }
3610}
3611
3612#[derive(Debug, Deserialize)]
3614pub struct ChaosConfigUpdate {
3615 pub enabled: Option<bool>,
3617 pub latency: Option<serde_json::Value>,
3619 pub fault_injection: Option<serde_json::Value>,
3621 pub rate_limit: Option<serde_json::Value>,
3623 pub traffic_shaping: Option<serde_json::Value>,
3625}
3626
3627async fn update_chaos_config(
3629 State(_state): State<ManagementState>,
3630 Json(_config_update): Json<ChaosConfigUpdate>,
3631) -> impl IntoResponse {
3632 #[cfg(feature = "chaos")]
3633 {
3634 if let Some(chaos_state) = &_state.chaos_api_state {
3635 use mockforge_chaos::config::{
3636 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3637 TrafficShapingConfig,
3638 };
3639
3640 let mut config = chaos_state.config.write().await;
3641
3642 if let Some(enabled) = _config_update.enabled {
3644 config.enabled = enabled;
3645 }
3646
3647 if let Some(latency_json) = _config_update.latency {
3649 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3650 config.latency = Some(latency);
3651 }
3652 }
3653
3654 if let Some(fault_json) = _config_update.fault_injection {
3656 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3657 config.fault_injection = Some(fault);
3658 }
3659 }
3660
3661 if let Some(rate_json) = _config_update.rate_limit {
3663 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3664 config.rate_limit = Some(rate);
3665 }
3666 }
3667
3668 if let Some(traffic_json) = _config_update.traffic_shaping {
3670 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3671 config.traffic_shaping = Some(traffic);
3672 }
3673 }
3674
3675 drop(config);
3678
3679 info!("Chaos configuration updated successfully");
3680 Json(serde_json::json!({
3681 "success": true,
3682 "message": "Chaos configuration updated and applied"
3683 }))
3684 .into_response()
3685 } else {
3686 (
3687 StatusCode::SERVICE_UNAVAILABLE,
3688 Json(serde_json::json!({
3689 "success": false,
3690 "error": "Chaos API not available",
3691 "message": "Chaos engineering is not enabled or configured"
3692 })),
3693 )
3694 .into_response()
3695 }
3696 }
3697 #[cfg(not(feature = "chaos"))]
3698 {
3699 (
3700 StatusCode::NOT_IMPLEMENTED,
3701 Json(serde_json::json!({
3702 "success": false,
3703 "error": "Chaos feature not enabled",
3704 "message": "Chaos engineering feature is not compiled into this build"
3705 })),
3706 )
3707 .into_response()
3708 }
3709}
3710
3711async fn list_network_profiles() -> impl IntoResponse {
3715 use mockforge_core::network_profiles::NetworkProfileCatalog;
3716
3717 let catalog = NetworkProfileCatalog::default();
3718 let profiles: Vec<serde_json::Value> = catalog
3719 .list_profiles_with_description()
3720 .iter()
3721 .map(|(name, description)| {
3722 serde_json::json!({
3723 "name": name,
3724 "description": description,
3725 })
3726 })
3727 .collect();
3728
3729 Json(serde_json::json!({
3730 "profiles": profiles
3731 }))
3732 .into_response()
3733}
3734
3735#[derive(Debug, Deserialize)]
3736pub struct ApplyNetworkProfileRequest {
3738 pub profile_name: String,
3740}
3741
3742async fn apply_network_profile(
3744 State(state): State<ManagementState>,
3745 Json(request): Json<ApplyNetworkProfileRequest>,
3746) -> impl IntoResponse {
3747 use mockforge_core::network_profiles::NetworkProfileCatalog;
3748
3749 let catalog = NetworkProfileCatalog::default();
3750 if let Some(profile) = catalog.get(&request.profile_name) {
3751 if let Some(server_config) = &state.server_config {
3754 let mut config = server_config.write().await;
3755
3756 use mockforge_core::config::NetworkShapingConfig;
3758
3759 let network_shaping = NetworkShapingConfig {
3763 enabled: profile.traffic_shaping.bandwidth.enabled
3764 || profile.traffic_shaping.burst_loss.enabled,
3765 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3767 max_connections: 1000, };
3769
3770 if let Some(ref mut chaos) = config.observability.chaos {
3773 chaos.traffic_shaping = Some(network_shaping);
3774 } else {
3775 use mockforge_core::config::ChaosEngConfig;
3777 config.observability.chaos = Some(ChaosEngConfig {
3778 enabled: true,
3779 latency: None,
3780 fault_injection: None,
3781 rate_limit: None,
3782 traffic_shaping: Some(network_shaping),
3783 scenario: None,
3784 });
3785 }
3786
3787 info!("Network profile '{}' applied to server configuration", request.profile_name);
3788 } else {
3789 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3790 }
3791
3792 #[cfg(feature = "chaos")]
3794 {
3795 if let Some(chaos_state) = &state.chaos_api_state {
3796 use mockforge_chaos::config::TrafficShapingConfig;
3797
3798 let mut chaos_config = chaos_state.config.write().await;
3799 let chaos_traffic_shaping = TrafficShapingConfig {
3801 enabled: profile.traffic_shaping.bandwidth.enabled
3802 || profile.traffic_shaping.burst_loss.enabled,
3803 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3805 max_connections: 0,
3806 connection_timeout_ms: 30000,
3807 };
3808 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
3809 chaos_config.enabled = true; drop(chaos_config);
3811 info!("Network profile '{}' applied to chaos API state", request.profile_name);
3812 }
3813 }
3814
3815 Json(serde_json::json!({
3816 "success": true,
3817 "message": format!("Network profile '{}' applied", request.profile_name),
3818 "profile": {
3819 "name": profile.name,
3820 "description": profile.description,
3821 }
3822 }))
3823 .into_response()
3824 } else {
3825 (
3826 StatusCode::NOT_FOUND,
3827 Json(serde_json::json!({
3828 "error": "Profile not found",
3829 "message": format!("Network profile '{}' not found", request.profile_name)
3830 })),
3831 )
3832 .into_response()
3833 }
3834}
3835
3836pub fn management_router_with_ui_builder(
3838 state: ManagementState,
3839 server_config: mockforge_core::config::ServerConfig,
3840) -> Router {
3841 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
3842
3843 let management = management_router(state);
3845
3846 let ui_builder_state = UIBuilderState::new(server_config);
3848 let ui_builder = create_ui_builder_router(ui_builder_state);
3849
3850 management.nest("/ui-builder", ui_builder)
3852}
3853
3854pub fn management_router_with_spec_import(state: ManagementState) -> Router {
3856 use crate::spec_import::{spec_import_router, SpecImportState};
3857
3858 let management = management_router(state);
3860
3861 Router::new()
3863 .merge(management)
3864 .merge(spec_import_router(SpecImportState::new()))
3865}
3866
3867#[cfg(test)]
3868mod tests {
3869 use super::*;
3870
3871 #[tokio::test]
3872 async fn test_create_and_get_mock() {
3873 let state = ManagementState::new(None, None, 3000);
3874
3875 let mock = MockConfig {
3876 id: "test-1".to_string(),
3877 name: "Test Mock".to_string(),
3878 method: "GET".to_string(),
3879 path: "/test".to_string(),
3880 response: MockResponse {
3881 body: serde_json::json!({"message": "test"}),
3882 headers: None,
3883 },
3884 enabled: true,
3885 latency_ms: None,
3886 status_code: Some(200),
3887 request_match: None,
3888 priority: None,
3889 scenario: None,
3890 required_scenario_state: None,
3891 new_scenario_state: None,
3892 };
3893
3894 {
3896 let mut mocks = state.mocks.write().await;
3897 mocks.push(mock.clone());
3898 }
3899
3900 let mocks = state.mocks.read().await;
3902 let found = mocks.iter().find(|m| m.id == "test-1");
3903 assert!(found.is_some());
3904 assert_eq!(found.unwrap().name, "Test Mock");
3905 }
3906
3907 #[tokio::test]
3908 async fn test_server_stats() {
3909 let state = ManagementState::new(None, None, 3000);
3910
3911 {
3913 let mut mocks = state.mocks.write().await;
3914 mocks.push(MockConfig {
3915 id: "1".to_string(),
3916 name: "Mock 1".to_string(),
3917 method: "GET".to_string(),
3918 path: "/test1".to_string(),
3919 response: MockResponse {
3920 body: serde_json::json!({}),
3921 headers: None,
3922 },
3923 enabled: true,
3924 latency_ms: None,
3925 status_code: Some(200),
3926 request_match: None,
3927 priority: None,
3928 scenario: None,
3929 required_scenario_state: None,
3930 new_scenario_state: None,
3931 });
3932 mocks.push(MockConfig {
3933 id: "2".to_string(),
3934 name: "Mock 2".to_string(),
3935 method: "POST".to_string(),
3936 path: "/test2".to_string(),
3937 response: MockResponse {
3938 body: serde_json::json!({}),
3939 headers: None,
3940 },
3941 enabled: false,
3942 latency_ms: None,
3943 status_code: Some(201),
3944 request_match: None,
3945 priority: None,
3946 scenario: None,
3947 required_scenario_state: None,
3948 new_scenario_state: None,
3949 });
3950 }
3951
3952 let mocks = state.mocks.read().await;
3953 assert_eq!(mocks.len(), 2);
3954 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
3955 }
3956}