1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32fn get_message_broadcast_capacity() -> usize {
34 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45 #[cfg(feature = "mqtt")]
46 Mqtt(MqttMessageEvent),
48 #[cfg(feature = "kafka")]
49 Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57 pub topic: String,
59 pub payload: String,
61 pub qos: u8,
63 pub retain: bool,
65 pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72 pub topic: String,
73 pub key: Option<String>,
74 pub value: String,
75 pub partition: i32,
76 pub offset: i64,
77 pub headers: Option<std::collections::HashMap<String, String>>,
78 pub timestamp: String,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84 #[serde(skip_serializing_if = "String::is_empty")]
86 pub id: String,
87 pub name: String,
89 pub method: String,
91 pub path: String,
93 pub response: MockResponse,
95 #[serde(default = "default_true")]
97 pub enabled: bool,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub latency_ms: Option<u64>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub status_code: Option<u16>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub request_match: Option<RequestMatchCriteria>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub priority: Option<i32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub scenario: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub required_scenario_state: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122 true
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128 pub body: serde_json::Value,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140 pub headers: std::collections::HashMap<String, String>,
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub query_params: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub body_pattern: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub json_path: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub xpath: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub custom_matcher: Option<String>,
156}
157
158pub fn mock_matches_request(
167 mock: &MockConfig,
168 method: &str,
169 path: &str,
170 headers: &std::collections::HashMap<String, String>,
171 query_params: &std::collections::HashMap<String, String>,
172 body: Option<&[u8]>,
173) -> bool {
174 use regex::Regex;
175
176 if !mock.enabled {
178 return false;
179 }
180
181 if mock.method.to_uppercase() != method.to_uppercase() {
183 return false;
184 }
185
186 if !path_matches_pattern(&mock.path, path) {
188 return false;
189 }
190
191 if let Some(criteria) = &mock.request_match {
193 for (key, expected_value) in &criteria.headers {
195 let header_key_lower = key.to_lowercase();
196 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198 if let Some((_, actual_value)) = found {
199 if let Ok(re) = Regex::new(expected_value) {
201 if !re.is_match(actual_value) {
202 return false;
203 }
204 } else if actual_value != expected_value {
205 return false;
206 }
207 } else {
208 return false; }
210 }
211
212 for (key, expected_value) in &criteria.query_params {
214 if let Some(actual_value) = query_params.get(key) {
215 if actual_value != expected_value {
216 return false;
217 }
218 } else {
219 return false; }
221 }
222
223 if let Some(pattern) = &criteria.body_pattern {
225 if let Some(body_bytes) = body {
226 let body_str = String::from_utf8_lossy(body_bytes);
227 if let Ok(re) = Regex::new(pattern) {
229 if !re.is_match(&body_str) {
230 return false;
231 }
232 } else if body_str.as_ref() != pattern {
233 return false;
234 }
235 } else {
236 return false; }
238 }
239
240 if let Some(json_path) = &criteria.json_path {
242 if let Some(body_bytes) = body {
243 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245 if !json_path_exists(&json_value, json_path) {
247 return false;
248 }
249 }
250 }
251 }
252 }
253
254 if let Some(xpath) = &criteria.xpath {
256 if let Some(body_bytes) = body {
257 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258 if !xml_xpath_exists(body_str, xpath) {
259 return false;
260 }
261 } else {
262 return false;
263 }
264 } else {
265 return false; }
267 }
268
269 if let Some(custom) = &criteria.custom_matcher {
271 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272 return false;
273 }
274 }
275 }
276
277 true
278}
279
280fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282 if pattern == path {
284 return true;
285 }
286
287 if pattern == "*" {
289 return true;
290 }
291
292 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296 if pattern_parts.len() != path_parts.len() {
297 if pattern.contains('*') {
299 return matches_wildcard_pattern(pattern, path);
300 }
301 return false;
302 }
303
304 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307 continue; }
309
310 if pattern_part != path_part {
311 return false;
312 }
313 }
314
315 true
316}
317
318fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320 use regex::Regex;
321
322 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326 return re.is_match(path);
327 }
328
329 false
330}
331
332fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334 if let Some(path) = json_path.strip_prefix("$.") {
337 let parts: Vec<&str> = path.split('.').collect();
338
339 let mut current = json;
340 for part in parts {
341 if let Some(obj) = current.as_object() {
342 if let Some(value) = obj.get(part) {
343 current = value;
344 } else {
345 return false;
346 }
347 } else {
348 return false;
349 }
350 }
351 true
352 } else {
353 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355 false
356 }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361 name: String,
362 text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366 if segment.is_empty() {
367 return None;
368 }
369
370 let trimmed = segment.trim();
371 if let Some(bracket_start) = trimmed.find('[') {
372 if !trimmed.ends_with(']') {
373 return None;
374 }
375
376 let name = trimmed[..bracket_start].trim();
377 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378 let predicate = predicate.trim();
379
380 if let Some(raw) = predicate.strip_prefix("text()=") {
382 let raw = raw.trim();
383 if raw.len() >= 2
384 && ((raw.starts_with('"') && raw.ends_with('"'))
385 || (raw.starts_with('\'') && raw.ends_with('\'')))
386 {
387 let text = raw[1..raw.len() - 1].to_string();
388 if !name.is_empty() {
389 return Some(XPathSegment {
390 name: name.to_string(),
391 text_equals: Some(text),
392 });
393 }
394 }
395 }
396
397 None
398 } else {
399 Some(XPathSegment {
400 name: trimmed.to_string(),
401 text_equals: None,
402 })
403 }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407 if !node.is_element() {
408 return false;
409 }
410 if node.tag_name().name() != segment.name {
411 return false;
412 }
413 match &segment.text_equals {
414 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415 None => true,
416 }
417}
418
419fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426 let doc = match roxmltree::Document::parse(xml_body) {
427 Ok(doc) => doc,
428 Err(err) => {
429 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430 return false;
431 }
432 };
433
434 let expr = xpath.trim();
435 if expr.is_empty() {
436 return false;
437 }
438
439 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440 (true, rest)
441 } else if let Some(rest) = expr.strip_prefix('/') {
442 (false, rest)
443 } else {
444 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445 return false;
446 };
447
448 let segments: Vec<XPathSegment> = path_str
449 .split('/')
450 .filter(|s| !s.trim().is_empty())
451 .filter_map(parse_xpath_segment)
452 .collect();
453
454 if segments.is_empty() {
455 return false;
456 }
457
458 if is_descendant {
459 let first = &segments[0];
460 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461 let mut frontier = vec![node];
462 for segment in &segments[1..] {
463 let mut next_frontier = Vec::new();
464 for parent in &frontier {
465 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466 next_frontier.push(child);
467 }
468 }
469 if next_frontier.is_empty() {
470 frontier.clear();
471 break;
472 }
473 frontier = next_frontier;
474 }
475 if !frontier.is_empty() {
476 return true;
477 }
478 }
479 false
480 } else {
481 let mut frontier = vec![doc.root_element()];
482 for (index, segment) in segments.iter().enumerate() {
483 let mut next_frontier = Vec::new();
484 for parent in &frontier {
485 if index == 0 {
486 if segment_matches(*parent, segment) {
487 next_frontier.push(*parent);
488 }
489 continue;
490 }
491 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492 next_frontier.push(child);
493 }
494 }
495 if next_frontier.is_empty() {
496 return false;
497 }
498 frontier = next_frontier;
499 }
500 !frontier.is_empty()
501 }
502}
503
504fn evaluate_custom_matcher(
506 expression: &str,
507 method: &str,
508 path: &str,
509 headers: &std::collections::HashMap<String, String>,
510 query_params: &std::collections::HashMap<String, String>,
511 body: Option<&[u8]>,
512) -> bool {
513 use regex::Regex;
514
515 let expr = expression.trim();
516
517 if expr.contains("==") {
519 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520 if parts.len() != 2 {
521 return false;
522 }
523
524 let field = parts[0];
525 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527 match field {
528 "method" => method == expected_value,
529 "path" => path == expected_value,
530 _ if field.starts_with("headers.") => {
531 let header_name = &field[8..];
532 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533 }
534 _ if field.starts_with("query.") => {
535 let param_name = &field[6..];
536 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537 }
538 _ => false,
539 }
540 }
541 else if expr.contains("=~") {
543 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544 if parts.len() != 2 {
545 return false;
546 }
547
548 let field = parts[0];
549 let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551 if let Ok(re) = Regex::new(pattern) {
552 match field {
553 "method" => re.is_match(method),
554 "path" => re.is_match(path),
555 _ if field.starts_with("headers.") => {
556 let header_name = &field[8..];
557 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558 }
559 _ if field.starts_with("query.") => {
560 let param_name = &field[6..];
561 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562 }
563 _ => false,
564 }
565 } else {
566 false
567 }
568 }
569 else if expr.contains("contains") {
571 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572 if parts.len() != 2 {
573 return false;
574 }
575
576 let field = parts[0];
577 let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579 match field {
580 "path" => path.contains(search_value),
581 _ if field.starts_with("headers.") => {
582 let header_name = &field[8..];
583 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584 }
585 _ if field.starts_with("body") => {
586 if let Some(body_bytes) = body {
587 let body_str = String::from_utf8_lossy(body_bytes);
588 body_str.contains(search_value)
589 } else {
590 false
591 }
592 }
593 _ => false,
594 }
595 } else {
596 tracing::warn!("Unknown custom matcher expression format: {}", expr);
598 false
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605 pub uptime_seconds: u64,
607 pub total_requests: u64,
609 pub active_mocks: usize,
611 pub enabled_mocks: usize,
613 pub registered_routes: usize,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620 pub version: String,
622 pub port: u16,
624 pub has_openapi_spec: bool,
626 #[serde(skip_serializing_if = "Option::is_none")]
628 pub spec_path: Option<String>,
629}
630
631#[derive(Clone)]
633pub struct ManagementState {
634 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636 pub spec: Option<Arc<OpenApiSpec>>,
638 pub spec_path: Option<String>,
640 pub port: u16,
642 pub start_time: std::time::Instant,
644 pub request_counter: Arc<RwLock<u64>>,
646 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648 #[cfg(feature = "smtp")]
650 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651 #[cfg(feature = "mqtt")]
653 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654 #[cfg(feature = "kafka")]
656 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657 #[cfg(any(feature = "mqtt", feature = "kafka"))]
659 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660 pub state_machine_manager:
662 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667 pub rule_explanations: Arc<
669 RwLock<
670 std::collections::HashMap<
671 String,
672 mockforge_core::intelligent_behavior::RuleExplanation,
673 >,
674 >,
675 >,
676 #[cfg(feature = "chaos")]
678 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691 Self {
692 mocks: Arc::new(RwLock::new(Vec::new())),
693 spec,
694 spec_path,
695 port,
696 start_time: std::time::Instant::now(),
697 request_counter: Arc::new(RwLock::new(0)),
698 proxy_config: None,
699 #[cfg(feature = "smtp")]
700 smtp_registry: None,
701 #[cfg(feature = "mqtt")]
702 mqtt_broker: None,
703 #[cfg(feature = "kafka")]
704 kafka_broker: None,
705 #[cfg(any(feature = "mqtt", feature = "kafka"))]
706 message_events: {
707 let capacity = get_message_broadcast_capacity();
708 let (tx, _) = broadcast::channel(capacity);
709 Arc::new(tx)
710 },
711 state_machine_manager: Arc::new(RwLock::new(
712 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713 )),
714 ws_broadcast: None,
715 lifecycle_hooks: None,
716 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717 #[cfg(feature = "chaos")]
718 chaos_api_state: None,
719 server_config: None,
720 }
721 }
722
723 pub fn with_lifecycle_hooks(
725 mut self,
726 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727 ) -> Self {
728 self.lifecycle_hooks = Some(hooks);
729 self
730 }
731
732 pub fn with_ws_broadcast(
734 mut self,
735 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736 ) -> Self {
737 self.ws_broadcast = Some(ws_broadcast);
738 self
739 }
740
741 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743 self.proxy_config = Some(proxy_config);
744 self
745 }
746
747 #[cfg(feature = "smtp")]
748 pub fn with_smtp_registry(
750 mut self,
751 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752 ) -> Self {
753 self.smtp_registry = Some(smtp_registry);
754 self
755 }
756
757 #[cfg(feature = "mqtt")]
758 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760 self.mqtt_broker = Some(mqtt_broker);
761 self
762 }
763
764 #[cfg(feature = "kafka")]
765 pub fn with_kafka_broker(
767 mut self,
768 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769 ) -> Self {
770 self.kafka_broker = Some(kafka_broker);
771 self
772 }
773
774 #[cfg(feature = "chaos")]
775 pub fn with_chaos_api_state(
777 mut self,
778 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779 ) -> Self {
780 self.chaos_api_state = Some(chaos_api_state);
781 self
782 }
783
784 pub fn with_server_config(
786 mut self,
787 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788 ) -> Self {
789 self.server_config = Some(server_config);
790 self
791 }
792}
793
794async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796 let mocks = state.mocks.read().await;
797 Json(serde_json::json!({
798 "mocks": *mocks,
799 "total": mocks.len(),
800 "enabled": mocks.iter().filter(|m| m.enabled).count()
801 }))
802}
803
804async fn get_mock(
806 State(state): State<ManagementState>,
807 Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809 let mocks = state.mocks.read().await;
810 mocks
811 .iter()
812 .find(|m| m.id == id)
813 .cloned()
814 .map(Json)
815 .ok_or(StatusCode::NOT_FOUND)
816}
817
818async fn create_mock(
820 State(state): State<ManagementState>,
821 Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823 let mut mocks = state.mocks.write().await;
824
825 if mock.id.is_empty() {
827 mock.id = uuid::Uuid::new_v4().to_string();
828 }
829
830 if mocks.iter().any(|m| m.id == mock.id) {
832 return Err(StatusCode::CONFLICT);
833 }
834
835 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837 if let Some(hooks) = &state.lifecycle_hooks {
839 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840 id: mock.id.clone(),
841 name: mock.name.clone(),
842 config: serde_json::to_value(&mock).unwrap_or_default(),
843 };
844 hooks.invoke_mock_created(&event).await;
845 }
846
847 mocks.push(mock.clone());
848
849 if let Some(tx) = &state.ws_broadcast {
851 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852 }
853
854 Ok(Json(mock))
855}
856
857async fn update_mock(
859 State(state): State<ManagementState>,
860 Path(id): Path<String>,
861 Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863 let mut mocks = state.mocks.write().await;
864
865 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867 let old_mock = mocks[position].clone();
869
870 info!("Updating mock: {}", id);
871 mocks[position] = updated_mock.clone();
872
873 if let Some(hooks) = &state.lifecycle_hooks {
875 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876 id: updated_mock.id.clone(),
877 name: updated_mock.name.clone(),
878 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879 };
880 hooks.invoke_mock_updated(&event).await;
881
882 if old_mock.enabled != updated_mock.enabled {
884 let state_event = if updated_mock.enabled {
885 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886 id: updated_mock.id.clone(),
887 }
888 } else {
889 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890 id: updated_mock.id.clone(),
891 }
892 };
893 hooks.invoke_mock_state_changed(&state_event).await;
894 }
895 }
896
897 if let Some(tx) = &state.ws_broadcast {
899 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900 }
901
902 Ok(Json(updated_mock))
903}
904
905async fn delete_mock(
907 State(state): State<ManagementState>,
908 Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910 let mut mocks = state.mocks.write().await;
911
912 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914 let deleted_mock = mocks[position].clone();
916
917 info!("Deleting mock: {}", id);
918 mocks.remove(position);
919
920 if let Some(hooks) = &state.lifecycle_hooks {
922 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923 id: deleted_mock.id.clone(),
924 name: deleted_mock.name.clone(),
925 };
926 hooks.invoke_mock_deleted(&event).await;
927 }
928
929 if let Some(tx) = &state.ws_broadcast {
931 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932 }
933
934 Ok(StatusCode::NO_CONTENT)
935}
936
937#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940 pub config: serde_json::Value,
942 #[serde(default = "default_format")]
944 pub format: String,
945}
946
947fn default_format() -> String {
948 "json".to_string()
949}
950
951async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953 use mockforge_core::config::ServerConfig;
954
955 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956 "yaml" | "yml" => {
957 let yaml_str = match serde_json::to_string(&request.config) {
958 Ok(s) => s,
959 Err(e) => {
960 return (
961 StatusCode::BAD_REQUEST,
962 Json(serde_json::json!({
963 "valid": false,
964 "error": format!("Failed to convert to string: {}", e),
965 "message": "Configuration validation failed"
966 })),
967 )
968 .into_response();
969 }
970 };
971 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972 }
973 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974 };
975
976 match config_result {
977 Ok(_) => Json(serde_json::json!({
978 "valid": true,
979 "message": "Configuration is valid"
980 }))
981 .into_response(),
982 Err(e) => (
983 StatusCode::BAD_REQUEST,
984 Json(serde_json::json!({
985 "valid": false,
986 "error": format!("Invalid configuration: {}", e),
987 "message": "Configuration validation failed"
988 })),
989 )
990 .into_response(),
991 }
992}
993
994#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997 pub updates: serde_json::Value,
999}
1000
1001async fn bulk_update_config(
1009 State(_state): State<ManagementState>,
1010 Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012 if !request.updates.is_object() {
1014 return (
1015 StatusCode::BAD_REQUEST,
1016 Json(serde_json::json!({
1017 "error": "Invalid request",
1018 "message": "Updates must be a JSON object"
1019 })),
1020 )
1021 .into_response();
1022 }
1023
1024 use mockforge_core::config::ServerConfig;
1026
1027 let base_config = ServerConfig::default();
1029 let base_json = match serde_json::to_value(&base_config) {
1030 Ok(v) => v,
1031 Err(e) => {
1032 return (
1033 StatusCode::INTERNAL_SERVER_ERROR,
1034 Json(serde_json::json!({
1035 "error": "Internal error",
1036 "message": format!("Failed to serialize base config: {}", e)
1037 })),
1038 )
1039 .into_response();
1040 }
1041 };
1042
1043 let mut merged = base_json.clone();
1045 if let (Some(merged_obj), Some(updates_obj)) =
1046 (merged.as_object_mut(), request.updates.as_object())
1047 {
1048 for (key, value) in updates_obj {
1049 merged_obj.insert(key.clone(), value.clone());
1050 }
1051 }
1052
1053 match serde_json::from_value::<ServerConfig>(merged) {
1055 Ok(_) => {
1056 Json(serde_json::json!({
1063 "success": true,
1064 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
1065 "updates_received": request.updates,
1066 "validated": true
1067 }))
1068 .into_response()
1069 }
1070 Err(e) => (
1071 StatusCode::BAD_REQUEST,
1072 Json(serde_json::json!({
1073 "error": "Invalid configuration",
1074 "message": format!("Configuration validation failed: {}", e),
1075 "validated": false
1076 })),
1077 )
1078 .into_response(),
1079 }
1080}
1081
1082async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1084 let mocks = state.mocks.read().await;
1085 let request_count = *state.request_counter.read().await;
1086
1087 Json(ServerStats {
1088 uptime_seconds: state.start_time.elapsed().as_secs(),
1089 total_requests: request_count,
1090 active_mocks: mocks.len(),
1091 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1092 registered_routes: mocks.len(), })
1094}
1095
1096async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1098 Json(ServerConfig {
1099 version: env!("CARGO_PKG_VERSION").to_string(),
1100 port: state.port,
1101 has_openapi_spec: state.spec.is_some(),
1102 spec_path: state.spec_path.clone(),
1103 })
1104}
1105
1106async fn health_check() -> Json<serde_json::Value> {
1108 Json(serde_json::json!({
1109 "status": "healthy",
1110 "service": "mockforge-management",
1111 "timestamp": chrono::Utc::now().to_rfc3339()
1112 }))
1113}
1114
1115#[derive(Debug, Clone, Serialize, Deserialize)]
1117#[serde(rename_all = "lowercase")]
1118pub enum ExportFormat {
1119 Json,
1121 Yaml,
1123}
1124
1125async fn export_mocks(
1127 State(state): State<ManagementState>,
1128 Query(params): Query<std::collections::HashMap<String, String>>,
1129) -> Result<(StatusCode, String), StatusCode> {
1130 let mocks = state.mocks.read().await;
1131
1132 let format = params
1133 .get("format")
1134 .map(|f| match f.as_str() {
1135 "yaml" | "yml" => ExportFormat::Yaml,
1136 _ => ExportFormat::Json,
1137 })
1138 .unwrap_or(ExportFormat::Json);
1139
1140 match format {
1141 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1142 .map(|json| (StatusCode::OK, json))
1143 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1144 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1145 .map(|yaml| (StatusCode::OK, yaml))
1146 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1147 }
1148}
1149
1150async fn import_mocks(
1152 State(state): State<ManagementState>,
1153 Json(mocks): Json<Vec<MockConfig>>,
1154) -> impl IntoResponse {
1155 let mut current_mocks = state.mocks.write().await;
1156 current_mocks.clear();
1157 current_mocks.extend(mocks);
1158 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1159}
1160
1161#[cfg(feature = "smtp")]
1162async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1164 if let Some(ref smtp_registry) = state.smtp_registry {
1165 match smtp_registry.get_emails() {
1166 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1167 Err(e) => (
1168 StatusCode::INTERNAL_SERVER_ERROR,
1169 Json(serde_json::json!({
1170 "error": "Failed to retrieve emails",
1171 "message": e.to_string()
1172 })),
1173 ),
1174 }
1175 } else {
1176 (
1177 StatusCode::NOT_IMPLEMENTED,
1178 Json(serde_json::json!({
1179 "error": "SMTP mailbox management not available",
1180 "message": "SMTP server is not enabled or registry not available."
1181 })),
1182 )
1183 }
1184}
1185
1186#[cfg(feature = "smtp")]
1188async fn get_smtp_email(
1189 State(state): State<ManagementState>,
1190 Path(id): Path<String>,
1191) -> impl IntoResponse {
1192 if let Some(ref smtp_registry) = state.smtp_registry {
1193 match smtp_registry.get_email_by_id(&id) {
1194 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1195 Ok(None) => (
1196 StatusCode::NOT_FOUND,
1197 Json(serde_json::json!({
1198 "error": "Email not found",
1199 "id": id
1200 })),
1201 ),
1202 Err(e) => (
1203 StatusCode::INTERNAL_SERVER_ERROR,
1204 Json(serde_json::json!({
1205 "error": "Failed to retrieve email",
1206 "message": e.to_string()
1207 })),
1208 ),
1209 }
1210 } else {
1211 (
1212 StatusCode::NOT_IMPLEMENTED,
1213 Json(serde_json::json!({
1214 "error": "SMTP mailbox management not available",
1215 "message": "SMTP server is not enabled or registry not available."
1216 })),
1217 )
1218 }
1219}
1220
1221#[cfg(feature = "smtp")]
1223async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1224 if let Some(ref smtp_registry) = state.smtp_registry {
1225 match smtp_registry.clear_mailbox() {
1226 Ok(()) => (
1227 StatusCode::OK,
1228 Json(serde_json::json!({
1229 "message": "Mailbox cleared successfully"
1230 })),
1231 ),
1232 Err(e) => (
1233 StatusCode::INTERNAL_SERVER_ERROR,
1234 Json(serde_json::json!({
1235 "error": "Failed to clear mailbox",
1236 "message": e.to_string()
1237 })),
1238 ),
1239 }
1240 } else {
1241 (
1242 StatusCode::NOT_IMPLEMENTED,
1243 Json(serde_json::json!({
1244 "error": "SMTP mailbox management not available",
1245 "message": "SMTP server is not enabled or registry not available."
1246 })),
1247 )
1248 }
1249}
1250
1251#[cfg(feature = "smtp")]
1253async fn export_smtp_mailbox(
1254 Query(params): Query<std::collections::HashMap<String, String>>,
1255) -> impl IntoResponse {
1256 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1257 (
1258 StatusCode::NOT_IMPLEMENTED,
1259 Json(serde_json::json!({
1260 "error": "SMTP mailbox management not available via HTTP API",
1261 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1262 "requested_format": format
1263 })),
1264 )
1265}
1266
1267#[cfg(feature = "smtp")]
1269async fn search_smtp_emails(
1270 State(state): State<ManagementState>,
1271 Query(params): Query<std::collections::HashMap<String, String>>,
1272) -> impl IntoResponse {
1273 if let Some(ref smtp_registry) = state.smtp_registry {
1274 let filters = EmailSearchFilters {
1275 sender: params.get("sender").cloned(),
1276 recipient: params.get("recipient").cloned(),
1277 subject: params.get("subject").cloned(),
1278 body: params.get("body").cloned(),
1279 since: params
1280 .get("since")
1281 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1282 .map(|dt| dt.with_timezone(&chrono::Utc)),
1283 until: params
1284 .get("until")
1285 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1286 .map(|dt| dt.with_timezone(&chrono::Utc)),
1287 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1288 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1289 };
1290
1291 match smtp_registry.search_emails(filters) {
1292 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1293 Err(e) => (
1294 StatusCode::INTERNAL_SERVER_ERROR,
1295 Json(serde_json::json!({
1296 "error": "Failed to search emails",
1297 "message": e.to_string()
1298 })),
1299 ),
1300 }
1301 } else {
1302 (
1303 StatusCode::NOT_IMPLEMENTED,
1304 Json(serde_json::json!({
1305 "error": "SMTP mailbox management not available",
1306 "message": "SMTP server is not enabled or registry not available."
1307 })),
1308 )
1309 }
1310}
1311
1312#[cfg(feature = "mqtt")]
1314#[derive(Debug, Clone, Serialize, Deserialize)]
1315pub struct MqttBrokerStats {
1316 pub connected_clients: usize,
1318 pub active_topics: usize,
1320 pub retained_messages: usize,
1322 pub total_subscriptions: usize,
1324}
1325
1326#[cfg(feature = "mqtt")]
1328async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1329 if let Some(broker) = &state.mqtt_broker {
1330 let connected_clients = broker.get_connected_clients().await.len();
1331 let active_topics = broker.get_active_topics().await.len();
1332 let stats = broker.get_topic_stats().await;
1333
1334 let broker_stats = MqttBrokerStats {
1335 connected_clients,
1336 active_topics,
1337 retained_messages: stats.retained_messages,
1338 total_subscriptions: stats.total_subscriptions,
1339 };
1340
1341 Json(broker_stats).into_response()
1342 } else {
1343 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1344 }
1345}
1346
1347#[cfg(feature = "mqtt")]
1348async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1349 if let Some(broker) = &state.mqtt_broker {
1350 let clients = broker.get_connected_clients().await;
1351 Json(serde_json::json!({
1352 "clients": clients
1353 }))
1354 .into_response()
1355 } else {
1356 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1357 }
1358}
1359
1360#[cfg(feature = "mqtt")]
1361async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1362 if let Some(broker) = &state.mqtt_broker {
1363 let topics = broker.get_active_topics().await;
1364 Json(serde_json::json!({
1365 "topics": topics
1366 }))
1367 .into_response()
1368 } else {
1369 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1370 }
1371}
1372
1373#[cfg(feature = "mqtt")]
1374async fn disconnect_mqtt_client(
1375 State(state): State<ManagementState>,
1376 Path(client_id): Path<String>,
1377) -> impl IntoResponse {
1378 if let Some(broker) = &state.mqtt_broker {
1379 match broker.disconnect_client(&client_id).await {
1380 Ok(_) => {
1381 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1382 }
1383 Err(e) => {
1384 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1385 .into_response()
1386 }
1387 }
1388 } else {
1389 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1390 }
1391}
1392
1393#[cfg(feature = "mqtt")]
1396#[derive(Debug, Deserialize)]
1398pub struct MqttPublishRequest {
1399 pub topic: String,
1401 pub payload: String,
1403 #[serde(default = "default_qos")]
1405 pub qos: u8,
1406 #[serde(default)]
1408 pub retain: bool,
1409}
1410
1411#[cfg(feature = "mqtt")]
1412fn default_qos() -> u8 {
1413 0
1414}
1415
1416#[cfg(feature = "mqtt")]
1417async fn publish_mqtt_message_handler(
1419 State(state): State<ManagementState>,
1420 Json(request): Json<serde_json::Value>,
1421) -> impl IntoResponse {
1422 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1424 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1425 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1426 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1427
1428 if topic.is_none() || payload.is_none() {
1429 return (
1430 StatusCode::BAD_REQUEST,
1431 Json(serde_json::json!({
1432 "error": "Invalid request",
1433 "message": "Missing required fields: topic and payload"
1434 })),
1435 );
1436 }
1437
1438 let topic = topic.unwrap();
1439 let payload = payload.unwrap();
1440
1441 if let Some(broker) = &state.mqtt_broker {
1442 if qos > 2 {
1444 return (
1445 StatusCode::BAD_REQUEST,
1446 Json(serde_json::json!({
1447 "error": "Invalid QoS",
1448 "message": "QoS must be 0, 1, or 2"
1449 })),
1450 );
1451 }
1452
1453 let payload_bytes = payload.as_bytes().to_vec();
1455 let client_id = "mockforge-management-api".to_string();
1456
1457 let publish_result = broker
1458 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1459 .await
1460 .map_err(|e| format!("{}", e));
1461
1462 match publish_result {
1463 Ok(_) => {
1464 let event = MessageEvent::Mqtt(MqttMessageEvent {
1466 topic: topic.clone(),
1467 payload: payload.clone(),
1468 qos,
1469 retain,
1470 timestamp: chrono::Utc::now().to_rfc3339(),
1471 });
1472 let _ = state.message_events.send(event);
1473
1474 (
1475 StatusCode::OK,
1476 Json(serde_json::json!({
1477 "success": true,
1478 "message": format!("Message published to topic '{}'", topic),
1479 "topic": topic,
1480 "qos": qos,
1481 "retain": retain
1482 })),
1483 )
1484 }
1485 Err(error_msg) => (
1486 StatusCode::INTERNAL_SERVER_ERROR,
1487 Json(serde_json::json!({
1488 "error": "Failed to publish message",
1489 "message": error_msg
1490 })),
1491 ),
1492 }
1493 } else {
1494 (
1495 StatusCode::SERVICE_UNAVAILABLE,
1496 Json(serde_json::json!({
1497 "error": "MQTT broker not available",
1498 "message": "MQTT broker is not enabled or not available."
1499 })),
1500 )
1501 }
1502}
1503
1504#[cfg(not(feature = "mqtt"))]
1505async fn publish_mqtt_message_handler(
1507 State(_state): State<ManagementState>,
1508 Json(_request): Json<serde_json::Value>,
1509) -> impl IntoResponse {
1510 (
1511 StatusCode::SERVICE_UNAVAILABLE,
1512 Json(serde_json::json!({
1513 "error": "MQTT feature not enabled",
1514 "message": "MQTT support is not compiled into this build"
1515 })),
1516 )
1517}
1518
1519#[cfg(feature = "mqtt")]
1520#[derive(Debug, Deserialize)]
1522pub struct MqttBatchPublishRequest {
1523 pub messages: Vec<MqttPublishRequest>,
1525 #[serde(default = "default_delay")]
1527 pub delay_ms: u64,
1528}
1529
1530#[cfg(feature = "mqtt")]
1531fn default_delay() -> u64 {
1532 100
1533}
1534
1535#[cfg(feature = "mqtt")]
1536async fn publish_mqtt_batch_handler(
1538 State(state): State<ManagementState>,
1539 Json(request): Json<serde_json::Value>,
1540) -> impl IntoResponse {
1541 let messages_json = request.get("messages").and_then(|v| v.as_array());
1543 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1544
1545 if messages_json.is_none() {
1546 return (
1547 StatusCode::BAD_REQUEST,
1548 Json(serde_json::json!({
1549 "error": "Invalid request",
1550 "message": "Missing required field: messages"
1551 })),
1552 );
1553 }
1554
1555 let messages_json = messages_json.unwrap();
1556
1557 if let Some(broker) = &state.mqtt_broker {
1558 if messages_json.is_empty() {
1559 return (
1560 StatusCode::BAD_REQUEST,
1561 Json(serde_json::json!({
1562 "error": "Empty batch",
1563 "message": "At least one message is required"
1564 })),
1565 );
1566 }
1567
1568 let mut results = Vec::new();
1569 let client_id = "mockforge-management-api".to_string();
1570
1571 for (index, msg_json) in messages_json.iter().enumerate() {
1572 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1573 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1574 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1575 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1576
1577 if topic.is_none() || payload.is_none() {
1578 results.push(serde_json::json!({
1579 "index": index,
1580 "success": false,
1581 "error": "Missing required fields: topic and payload"
1582 }));
1583 continue;
1584 }
1585
1586 let topic = topic.unwrap();
1587 let payload = payload.unwrap();
1588
1589 if qos > 2 {
1591 results.push(serde_json::json!({
1592 "index": index,
1593 "success": false,
1594 "error": "Invalid QoS (must be 0, 1, or 2)"
1595 }));
1596 continue;
1597 }
1598
1599 let payload_bytes = payload.as_bytes().to_vec();
1601
1602 let publish_result = broker
1603 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1604 .await
1605 .map_err(|e| format!("{}", e));
1606
1607 match publish_result {
1608 Ok(_) => {
1609 let event = MessageEvent::Mqtt(MqttMessageEvent {
1611 topic: topic.clone(),
1612 payload: payload.clone(),
1613 qos,
1614 retain,
1615 timestamp: chrono::Utc::now().to_rfc3339(),
1616 });
1617 let _ = state.message_events.send(event);
1618
1619 results.push(serde_json::json!({
1620 "index": index,
1621 "success": true,
1622 "topic": topic,
1623 "qos": qos
1624 }));
1625 }
1626 Err(error_msg) => {
1627 results.push(serde_json::json!({
1628 "index": index,
1629 "success": false,
1630 "error": error_msg
1631 }));
1632 }
1633 }
1634
1635 if index < messages_json.len() - 1 && delay_ms > 0 {
1637 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1638 }
1639 }
1640
1641 let success_count =
1642 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1643
1644 (
1645 StatusCode::OK,
1646 Json(serde_json::json!({
1647 "success": true,
1648 "total": messages_json.len(),
1649 "succeeded": success_count,
1650 "failed": messages_json.len() - success_count,
1651 "results": results
1652 })),
1653 )
1654 } else {
1655 (
1656 StatusCode::SERVICE_UNAVAILABLE,
1657 Json(serde_json::json!({
1658 "error": "MQTT broker not available",
1659 "message": "MQTT broker is not enabled or not available."
1660 })),
1661 )
1662 }
1663}
1664
1665#[cfg(not(feature = "mqtt"))]
1666async fn publish_mqtt_batch_handler(
1668 State(_state): State<ManagementState>,
1669 Json(_request): Json<serde_json::Value>,
1670) -> impl IntoResponse {
1671 (
1672 StatusCode::SERVICE_UNAVAILABLE,
1673 Json(serde_json::json!({
1674 "error": "MQTT feature not enabled",
1675 "message": "MQTT support is not compiled into this build"
1676 })),
1677 )
1678}
1679
1680#[derive(Debug, Deserialize)]
1684struct SetMigrationModeRequest {
1685 mode: String,
1686}
1687
1688async fn get_migration_routes(
1690 State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692 let proxy_config = match &state.proxy_config {
1693 Some(config) => config,
1694 None => {
1695 return Ok(Json(serde_json::json!({
1696 "error": "Migration not configured. Proxy config not available."
1697 })));
1698 }
1699 };
1700
1701 let config = proxy_config.read().await;
1702 let routes = config.get_migration_routes();
1703
1704 Ok(Json(serde_json::json!({
1705 "routes": routes
1706 })))
1707}
1708
1709async fn toggle_route_migration(
1711 State(state): State<ManagementState>,
1712 Path(pattern): Path<String>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714 let proxy_config = match &state.proxy_config {
1715 Some(config) => config,
1716 None => {
1717 return Ok(Json(serde_json::json!({
1718 "error": "Migration not configured. Proxy config not available."
1719 })));
1720 }
1721 };
1722
1723 let mut config = proxy_config.write().await;
1724 let new_mode = match config.toggle_route_migration(&pattern) {
1725 Some(mode) => mode,
1726 None => {
1727 return Ok(Json(serde_json::json!({
1728 "error": format!("Route pattern not found: {}", pattern)
1729 })));
1730 }
1731 };
1732
1733 Ok(Json(serde_json::json!({
1734 "pattern": pattern,
1735 "mode": format!("{:?}", new_mode).to_lowercase()
1736 })))
1737}
1738
1739async fn set_route_migration_mode(
1741 State(state): State<ManagementState>,
1742 Path(pattern): Path<String>,
1743 Json(request): Json<SetMigrationModeRequest>,
1744) -> Result<Json<serde_json::Value>, StatusCode> {
1745 let proxy_config = match &state.proxy_config {
1746 Some(config) => config,
1747 None => {
1748 return Ok(Json(serde_json::json!({
1749 "error": "Migration not configured. Proxy config not available."
1750 })));
1751 }
1752 };
1753
1754 use mockforge_core::proxy::config::MigrationMode;
1755 let mode = match request.mode.to_lowercase().as_str() {
1756 "mock" => MigrationMode::Mock,
1757 "shadow" => MigrationMode::Shadow,
1758 "real" => MigrationMode::Real,
1759 "auto" => MigrationMode::Auto,
1760 _ => {
1761 return Ok(Json(serde_json::json!({
1762 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1763 })));
1764 }
1765 };
1766
1767 let mut config = proxy_config.write().await;
1768 let updated = config.update_rule_migration_mode(&pattern, mode);
1769
1770 if !updated {
1771 return Ok(Json(serde_json::json!({
1772 "error": format!("Route pattern not found: {}", pattern)
1773 })));
1774 }
1775
1776 Ok(Json(serde_json::json!({
1777 "pattern": pattern,
1778 "mode": format!("{:?}", mode).to_lowercase()
1779 })))
1780}
1781
1782async fn toggle_group_migration(
1784 State(state): State<ManagementState>,
1785 Path(group): Path<String>,
1786) -> Result<Json<serde_json::Value>, StatusCode> {
1787 let proxy_config = match &state.proxy_config {
1788 Some(config) => config,
1789 None => {
1790 return Ok(Json(serde_json::json!({
1791 "error": "Migration not configured. Proxy config not available."
1792 })));
1793 }
1794 };
1795
1796 let mut config = proxy_config.write().await;
1797 let new_mode = config.toggle_group_migration(&group);
1798
1799 Ok(Json(serde_json::json!({
1800 "group": group,
1801 "mode": format!("{:?}", new_mode).to_lowercase()
1802 })))
1803}
1804
1805async fn set_group_migration_mode(
1807 State(state): State<ManagementState>,
1808 Path(group): Path<String>,
1809 Json(request): Json<SetMigrationModeRequest>,
1810) -> Result<Json<serde_json::Value>, StatusCode> {
1811 let proxy_config = match &state.proxy_config {
1812 Some(config) => config,
1813 None => {
1814 return Ok(Json(serde_json::json!({
1815 "error": "Migration not configured. Proxy config not available."
1816 })));
1817 }
1818 };
1819
1820 use mockforge_core::proxy::config::MigrationMode;
1821 let mode = match request.mode.to_lowercase().as_str() {
1822 "mock" => MigrationMode::Mock,
1823 "shadow" => MigrationMode::Shadow,
1824 "real" => MigrationMode::Real,
1825 "auto" => MigrationMode::Auto,
1826 _ => {
1827 return Ok(Json(serde_json::json!({
1828 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1829 })));
1830 }
1831 };
1832
1833 let mut config = proxy_config.write().await;
1834 config.update_group_migration_mode(&group, mode);
1835
1836 Ok(Json(serde_json::json!({
1837 "group": group,
1838 "mode": format!("{:?}", mode).to_lowercase()
1839 })))
1840}
1841
1842async fn get_migration_groups(
1844 State(state): State<ManagementState>,
1845) -> Result<Json<serde_json::Value>, StatusCode> {
1846 let proxy_config = match &state.proxy_config {
1847 Some(config) => config,
1848 None => {
1849 return Ok(Json(serde_json::json!({
1850 "error": "Migration not configured. Proxy config not available."
1851 })));
1852 }
1853 };
1854
1855 let config = proxy_config.read().await;
1856 let groups = config.get_migration_groups();
1857
1858 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1860 .into_iter()
1861 .map(|(name, info)| {
1862 (
1863 name,
1864 serde_json::json!({
1865 "name": info.name,
1866 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1867 "route_count": info.route_count
1868 }),
1869 )
1870 })
1871 .collect();
1872
1873 Ok(Json(serde_json::json!(groups_json)))
1874}
1875
1876async fn get_migration_status(
1878 State(state): State<ManagementState>,
1879) -> Result<Json<serde_json::Value>, StatusCode> {
1880 let proxy_config = match &state.proxy_config {
1881 Some(config) => config,
1882 None => {
1883 return Ok(Json(serde_json::json!({
1884 "error": "Migration not configured. Proxy config not available."
1885 })));
1886 }
1887 };
1888
1889 let config = proxy_config.read().await;
1890 let routes = config.get_migration_routes();
1891 let groups = config.get_migration_groups();
1892
1893 let mut mock_count = 0;
1894 let mut shadow_count = 0;
1895 let mut real_count = 0;
1896 let mut auto_count = 0;
1897
1898 for route in &routes {
1899 match route.migration_mode {
1900 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1901 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1902 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1903 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1904 }
1905 }
1906
1907 Ok(Json(serde_json::json!({
1908 "total_routes": routes.len(),
1909 "mock_routes": mock_count,
1910 "shadow_routes": shadow_count,
1911 "real_routes": real_count,
1912 "auto_routes": auto_count,
1913 "total_groups": groups.len(),
1914 "migration_enabled": config.migration_enabled
1915 })))
1916}
1917
1918#[derive(Debug, Deserialize, Serialize)]
1922pub struct ProxyRuleRequest {
1923 pub pattern: String,
1925 #[serde(rename = "type")]
1927 pub rule_type: String,
1928 #[serde(default)]
1930 pub status_codes: Vec<u16>,
1931 pub body_transforms: Vec<BodyTransformRequest>,
1933 #[serde(default = "default_true")]
1935 pub enabled: bool,
1936}
1937
1938#[derive(Debug, Deserialize, Serialize)]
1940pub struct BodyTransformRequest {
1941 pub path: String,
1943 pub replace: String,
1945 #[serde(default)]
1947 pub operation: String,
1948}
1949
1950#[derive(Debug, Serialize)]
1952pub struct ProxyRuleResponse {
1953 pub id: usize,
1955 pub pattern: String,
1957 #[serde(rename = "type")]
1959 pub rule_type: String,
1960 pub status_codes: Vec<u16>,
1962 pub body_transforms: Vec<BodyTransformRequest>,
1964 pub enabled: bool,
1966}
1967
1968async fn list_proxy_rules(
1970 State(state): State<ManagementState>,
1971) -> Result<Json<serde_json::Value>, StatusCode> {
1972 let proxy_config = match &state.proxy_config {
1973 Some(config) => config,
1974 None => {
1975 return Ok(Json(serde_json::json!({
1976 "error": "Proxy not configured. Proxy config not available."
1977 })));
1978 }
1979 };
1980
1981 let config = proxy_config.read().await;
1982
1983 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1984
1985 for (idx, rule) in config.request_replacements.iter().enumerate() {
1987 rules.push(ProxyRuleResponse {
1988 id: idx,
1989 pattern: rule.pattern.clone(),
1990 rule_type: "request".to_string(),
1991 status_codes: Vec::new(),
1992 body_transforms: rule
1993 .body_transforms
1994 .iter()
1995 .map(|t| BodyTransformRequest {
1996 path: t.path.clone(),
1997 replace: t.replace.clone(),
1998 operation: format!("{:?}", t.operation).to_lowercase(),
1999 })
2000 .collect(),
2001 enabled: rule.enabled,
2002 });
2003 }
2004
2005 let request_count = config.request_replacements.len();
2007 for (idx, rule) in config.response_replacements.iter().enumerate() {
2008 rules.push(ProxyRuleResponse {
2009 id: request_count + idx,
2010 pattern: rule.pattern.clone(),
2011 rule_type: "response".to_string(),
2012 status_codes: rule.status_codes.clone(),
2013 body_transforms: rule
2014 .body_transforms
2015 .iter()
2016 .map(|t| BodyTransformRequest {
2017 path: t.path.clone(),
2018 replace: t.replace.clone(),
2019 operation: format!("{:?}", t.operation).to_lowercase(),
2020 })
2021 .collect(),
2022 enabled: rule.enabled,
2023 });
2024 }
2025
2026 Ok(Json(serde_json::json!({
2027 "rules": rules
2028 })))
2029}
2030
2031async fn create_proxy_rule(
2033 State(state): State<ManagementState>,
2034 Json(request): Json<ProxyRuleRequest>,
2035) -> Result<Json<serde_json::Value>, StatusCode> {
2036 let proxy_config = match &state.proxy_config {
2037 Some(config) => config,
2038 None => {
2039 return Ok(Json(serde_json::json!({
2040 "error": "Proxy not configured. Proxy config not available."
2041 })));
2042 }
2043 };
2044
2045 if request.body_transforms.is_empty() {
2047 return Ok(Json(serde_json::json!({
2048 "error": "At least one body transform is required"
2049 })));
2050 }
2051
2052 let body_transforms: Vec<BodyTransform> = request
2053 .body_transforms
2054 .iter()
2055 .map(|t| {
2056 let op = match t.operation.as_str() {
2057 "replace" => TransformOperation::Replace,
2058 "add" => TransformOperation::Add,
2059 "remove" => TransformOperation::Remove,
2060 _ => TransformOperation::Replace,
2061 };
2062 BodyTransform {
2063 path: t.path.clone(),
2064 replace: t.replace.clone(),
2065 operation: op,
2066 }
2067 })
2068 .collect();
2069
2070 let new_rule = BodyTransformRule {
2071 pattern: request.pattern.clone(),
2072 status_codes: request.status_codes.clone(),
2073 body_transforms,
2074 enabled: request.enabled,
2075 };
2076
2077 let mut config = proxy_config.write().await;
2078
2079 let rule_id = if request.rule_type == "request" {
2080 config.request_replacements.push(new_rule);
2081 config.request_replacements.len() - 1
2082 } else if request.rule_type == "response" {
2083 config.response_replacements.push(new_rule);
2084 config.request_replacements.len() + config.response_replacements.len() - 1
2085 } else {
2086 return Ok(Json(serde_json::json!({
2087 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2088 })));
2089 };
2090
2091 Ok(Json(serde_json::json!({
2092 "id": rule_id,
2093 "message": "Rule created successfully"
2094 })))
2095}
2096
2097async fn get_proxy_rule(
2099 State(state): State<ManagementState>,
2100 Path(id): Path<String>,
2101) -> Result<Json<serde_json::Value>, StatusCode> {
2102 let proxy_config = match &state.proxy_config {
2103 Some(config) => config,
2104 None => {
2105 return Ok(Json(serde_json::json!({
2106 "error": "Proxy not configured. Proxy config not available."
2107 })));
2108 }
2109 };
2110
2111 let config = proxy_config.read().await;
2112 let rule_id: usize = match id.parse() {
2113 Ok(id) => id,
2114 Err(_) => {
2115 return Ok(Json(serde_json::json!({
2116 "error": format!("Invalid rule ID: {}", id)
2117 })));
2118 }
2119 };
2120
2121 let request_count = config.request_replacements.len();
2122
2123 if rule_id < request_count {
2124 let rule = &config.request_replacements[rule_id];
2126 Ok(Json(serde_json::json!({
2127 "id": rule_id,
2128 "pattern": rule.pattern,
2129 "type": "request",
2130 "status_codes": [],
2131 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2132 "path": t.path,
2133 "replace": t.replace,
2134 "operation": format!("{:?}", t.operation).to_lowercase()
2135 })).collect::<Vec<_>>(),
2136 "enabled": rule.enabled
2137 })))
2138 } else if rule_id < request_count + config.response_replacements.len() {
2139 let response_idx = rule_id - request_count;
2141 let rule = &config.response_replacements[response_idx];
2142 Ok(Json(serde_json::json!({
2143 "id": rule_id,
2144 "pattern": rule.pattern,
2145 "type": "response",
2146 "status_codes": rule.status_codes,
2147 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2148 "path": t.path,
2149 "replace": t.replace,
2150 "operation": format!("{:?}", t.operation).to_lowercase()
2151 })).collect::<Vec<_>>(),
2152 "enabled": rule.enabled
2153 })))
2154 } else {
2155 Ok(Json(serde_json::json!({
2156 "error": format!("Rule ID {} not found", rule_id)
2157 })))
2158 }
2159}
2160
2161async fn update_proxy_rule(
2163 State(state): State<ManagementState>,
2164 Path(id): Path<String>,
2165 Json(request): Json<ProxyRuleRequest>,
2166) -> Result<Json<serde_json::Value>, StatusCode> {
2167 let proxy_config = match &state.proxy_config {
2168 Some(config) => config,
2169 None => {
2170 return Ok(Json(serde_json::json!({
2171 "error": "Proxy not configured. Proxy config not available."
2172 })));
2173 }
2174 };
2175
2176 let mut config = proxy_config.write().await;
2177 let rule_id: usize = match id.parse() {
2178 Ok(id) => id,
2179 Err(_) => {
2180 return Ok(Json(serde_json::json!({
2181 "error": format!("Invalid rule ID: {}", id)
2182 })));
2183 }
2184 };
2185
2186 let body_transforms: Vec<BodyTransform> = request
2187 .body_transforms
2188 .iter()
2189 .map(|t| {
2190 let op = match t.operation.as_str() {
2191 "replace" => TransformOperation::Replace,
2192 "add" => TransformOperation::Add,
2193 "remove" => TransformOperation::Remove,
2194 _ => TransformOperation::Replace,
2195 };
2196 BodyTransform {
2197 path: t.path.clone(),
2198 replace: t.replace.clone(),
2199 operation: op,
2200 }
2201 })
2202 .collect();
2203
2204 let updated_rule = BodyTransformRule {
2205 pattern: request.pattern.clone(),
2206 status_codes: request.status_codes.clone(),
2207 body_transforms,
2208 enabled: request.enabled,
2209 };
2210
2211 let request_count = config.request_replacements.len();
2212
2213 if rule_id < request_count {
2214 config.request_replacements[rule_id] = updated_rule;
2216 } else if rule_id < request_count + config.response_replacements.len() {
2217 let response_idx = rule_id - request_count;
2219 config.response_replacements[response_idx] = updated_rule;
2220 } else {
2221 return Ok(Json(serde_json::json!({
2222 "error": format!("Rule ID {} not found", rule_id)
2223 })));
2224 }
2225
2226 Ok(Json(serde_json::json!({
2227 "id": rule_id,
2228 "message": "Rule updated successfully"
2229 })))
2230}
2231
2232async fn delete_proxy_rule(
2234 State(state): State<ManagementState>,
2235 Path(id): Path<String>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237 let proxy_config = match &state.proxy_config {
2238 Some(config) => config,
2239 None => {
2240 return Ok(Json(serde_json::json!({
2241 "error": "Proxy not configured. Proxy config not available."
2242 })));
2243 }
2244 };
2245
2246 let mut config = proxy_config.write().await;
2247 let rule_id: usize = match id.parse() {
2248 Ok(id) => id,
2249 Err(_) => {
2250 return Ok(Json(serde_json::json!({
2251 "error": format!("Invalid rule ID: {}", id)
2252 })));
2253 }
2254 };
2255
2256 let request_count = config.request_replacements.len();
2257
2258 if rule_id < request_count {
2259 config.request_replacements.remove(rule_id);
2261 } else if rule_id < request_count + config.response_replacements.len() {
2262 let response_idx = rule_id - request_count;
2264 config.response_replacements.remove(response_idx);
2265 } else {
2266 return Ok(Json(serde_json::json!({
2267 "error": format!("Rule ID {} not found", rule_id)
2268 })));
2269 }
2270
2271 Ok(Json(serde_json::json!({
2272 "id": rule_id,
2273 "message": "Rule deleted successfully"
2274 })))
2275}
2276
2277async fn get_proxy_inspect(
2279 State(state): State<ManagementState>,
2280 Query(params): Query<std::collections::HashMap<String, String>>,
2281) -> Result<Json<serde_json::Value>, StatusCode> {
2282 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2283 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2284
2285 let proxy_config = match &state.proxy_config {
2286 Some(config) => config.read().await,
2287 None => {
2288 return Ok(Json(serde_json::json!({
2289 "error": "Proxy not configured. Proxy config not available."
2290 })));
2291 }
2292 };
2293
2294 let mut rules = Vec::new();
2295 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2296 rules.push(serde_json::json!({
2297 "id": idx,
2298 "kind": "request",
2299 "pattern": rule.pattern,
2300 "enabled": rule.enabled,
2301 "status_codes": rule.status_codes,
2302 "transform_count": rule.body_transforms.len(),
2303 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2304 "path": t.path,
2305 "operation": t.operation,
2306 "replace": t.replace
2307 })).collect::<Vec<_>>()
2308 }));
2309 }
2310 let request_rule_count = rules.len();
2311 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2312 rules.push(serde_json::json!({
2313 "id": request_rule_count + idx,
2314 "kind": "response",
2315 "pattern": rule.pattern,
2316 "enabled": rule.enabled,
2317 "status_codes": rule.status_codes,
2318 "transform_count": rule.body_transforms.len(),
2319 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2320 "path": t.path,
2321 "operation": t.operation,
2322 "replace": t.replace
2323 })).collect::<Vec<_>>()
2324 }));
2325 }
2326
2327 let total = rules.len();
2328 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2329
2330 Ok(Json(serde_json::json!({
2331 "enabled": proxy_config.enabled,
2332 "target_url": proxy_config.target_url,
2333 "prefix": proxy_config.prefix,
2334 "timeout_seconds": proxy_config.timeout_seconds,
2335 "follow_redirects": proxy_config.follow_redirects,
2336 "passthrough_by_default": proxy_config.passthrough_by_default,
2337 "rules": paged_rules,
2338 "request_rule_count": request_rule_count,
2339 "response_rule_count": total.saturating_sub(request_rule_count),
2340 "limit": limit,
2341 "offset": offset,
2342 "total": total
2343 })))
2344}
2345
2346pub fn management_router(state: ManagementState) -> Router {
2348 let router = Router::new()
2349 .route("/health", get(health_check))
2350 .route("/stats", get(get_stats))
2351 .route("/config", get(get_config))
2352 .route("/config/validate", post(validate_config))
2353 .route("/config/bulk", post(bulk_update_config))
2354 .route("/mocks", get(list_mocks))
2355 .route("/mocks", post(create_mock))
2356 .route("/mocks/{id}", get(get_mock))
2357 .route("/mocks/{id}", put(update_mock))
2358 .route("/mocks/{id}", delete(delete_mock))
2359 .route("/export", get(export_mocks))
2360 .route("/import", post(import_mocks));
2361
2362 #[cfg(feature = "smtp")]
2363 let router = router
2364 .route("/smtp/mailbox", get(list_smtp_emails))
2365 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2366 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2367 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2368 .route("/smtp/mailbox/search", get(search_smtp_emails));
2369
2370 #[cfg(not(feature = "smtp"))]
2371 let router = router;
2372
2373 #[cfg(feature = "mqtt")]
2375 let router = router
2376 .route("/mqtt/stats", get(get_mqtt_stats))
2377 .route("/mqtt/clients", get(get_mqtt_clients))
2378 .route("/mqtt/topics", get(get_mqtt_topics))
2379 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2380 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2381 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2382 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2383
2384 #[cfg(not(feature = "mqtt"))]
2385 let router = router
2386 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2387 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2388
2389 #[cfg(feature = "kafka")]
2390 let router = router
2391 .route("/kafka/stats", get(get_kafka_stats))
2392 .route("/kafka/topics", get(get_kafka_topics))
2393 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2394 .route("/kafka/groups", get(get_kafka_groups))
2395 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2396 .route("/kafka/produce", post(produce_kafka_message))
2397 .route("/kafka/produce/batch", post(produce_kafka_batch))
2398 .route("/kafka/messages/stream", get(kafka_messages_stream));
2399
2400 #[cfg(not(feature = "kafka"))]
2401 let router = router;
2402
2403 let router = router
2405 .route("/migration/routes", get(get_migration_routes))
2406 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2407 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2408 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2409 .route("/migration/groups/{group}", put(set_group_migration_mode))
2410 .route("/migration/groups", get(get_migration_groups))
2411 .route("/migration/status", get(get_migration_status));
2412
2413 let router = router
2415 .route("/proxy/rules", get(list_proxy_rules))
2416 .route("/proxy/rules", post(create_proxy_rule))
2417 .route("/proxy/rules/{id}", get(get_proxy_rule))
2418 .route("/proxy/rules/{id}", put(update_proxy_rule))
2419 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2420 .route("/proxy/inspect", get(get_proxy_inspect));
2421
2422 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2424
2425 let router = router.nest(
2427 "/snapshot-diff",
2428 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2429 );
2430
2431 #[cfg(feature = "behavioral-cloning")]
2432 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2433
2434 let router = router
2435 .route("/mockai/learn", post(learn_from_examples))
2436 .route("/mockai/rules/explanations", get(list_rule_explanations))
2437 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2438 .route("/chaos/config", get(get_chaos_config))
2439 .route("/chaos/config", post(update_chaos_config))
2440 .route("/network/profiles", get(list_network_profiles))
2441 .route("/network/profile/apply", post(apply_network_profile));
2442
2443 let router =
2445 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2446
2447 router.with_state(state)
2448}
2449
2450#[cfg(feature = "kafka")]
2451#[derive(Debug, Clone, Serialize, Deserialize)]
2452pub struct KafkaBrokerStats {
2453 pub topics: usize,
2455 pub partitions: usize,
2457 pub consumer_groups: usize,
2459 pub messages_produced: u64,
2461 pub messages_consumed: u64,
2463}
2464
2465#[cfg(feature = "kafka")]
2466#[derive(Debug, Clone, Serialize, Deserialize)]
2467pub struct KafkaTopicInfo {
2468 pub name: String,
2469 pub partitions: usize,
2470 pub replication_factor: i32,
2471}
2472
2473#[cfg(feature = "kafka")]
2474#[derive(Debug, Clone, Serialize, Deserialize)]
2475pub struct KafkaConsumerGroupInfo {
2476 pub group_id: String,
2477 pub members: usize,
2478 pub state: String,
2479}
2480
2481#[cfg(feature = "kafka")]
2482async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2484 if let Some(broker) = &state.kafka_broker {
2485 let topics = broker.topics.read().await;
2486 let consumer_groups = broker.consumer_groups.read().await;
2487
2488 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2489
2490 let metrics_snapshot = broker.metrics().snapshot();
2492
2493 let stats = KafkaBrokerStats {
2494 topics: topics.len(),
2495 partitions: total_partitions,
2496 consumer_groups: consumer_groups.groups().len(),
2497 messages_produced: metrics_snapshot.messages_produced_total,
2498 messages_consumed: metrics_snapshot.messages_consumed_total,
2499 };
2500
2501 Json(stats).into_response()
2502 } else {
2503 (
2504 StatusCode::SERVICE_UNAVAILABLE,
2505 Json(serde_json::json!({
2506 "error": "Kafka broker not available",
2507 "message": "Kafka broker is not enabled or not available."
2508 })),
2509 )
2510 .into_response()
2511 }
2512}
2513
2514#[cfg(feature = "kafka")]
2515async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2517 if let Some(broker) = &state.kafka_broker {
2518 let topics = broker.topics.read().await;
2519 let topic_list: Vec<KafkaTopicInfo> = topics
2520 .iter()
2521 .map(|(name, topic)| KafkaTopicInfo {
2522 name: name.clone(),
2523 partitions: topic.partitions.len(),
2524 replication_factor: topic.config.replication_factor as i32,
2525 })
2526 .collect();
2527
2528 Json(serde_json::json!({
2529 "topics": topic_list
2530 }))
2531 .into_response()
2532 } else {
2533 (
2534 StatusCode::SERVICE_UNAVAILABLE,
2535 Json(serde_json::json!({
2536 "error": "Kafka broker not available",
2537 "message": "Kafka broker is not enabled or not available."
2538 })),
2539 )
2540 .into_response()
2541 }
2542}
2543
2544#[cfg(feature = "kafka")]
2545async fn get_kafka_topic(
2547 State(state): State<ManagementState>,
2548 Path(topic_name): Path<String>,
2549) -> impl IntoResponse {
2550 if let Some(broker) = &state.kafka_broker {
2551 let topics = broker.topics.read().await;
2552 if let Some(topic) = topics.get(&topic_name) {
2553 Json(serde_json::json!({
2554 "name": topic_name,
2555 "partitions": topic.partitions.len(),
2556 "replication_factor": topic.config.replication_factor,
2557 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2558 "id": idx as i32,
2559 "leader": 0,
2560 "replicas": vec![0],
2561 "message_count": partition.messages.len()
2562 })).collect::<Vec<_>>()
2563 })).into_response()
2564 } else {
2565 (
2566 StatusCode::NOT_FOUND,
2567 Json(serde_json::json!({
2568 "error": "Topic not found",
2569 "topic": topic_name
2570 })),
2571 )
2572 .into_response()
2573 }
2574 } else {
2575 (
2576 StatusCode::SERVICE_UNAVAILABLE,
2577 Json(serde_json::json!({
2578 "error": "Kafka broker not available",
2579 "message": "Kafka broker is not enabled or not available."
2580 })),
2581 )
2582 .into_response()
2583 }
2584}
2585
2586#[cfg(feature = "kafka")]
2587async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2589 if let Some(broker) = &state.kafka_broker {
2590 let consumer_groups = broker.consumer_groups.read().await;
2591 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2592 .groups()
2593 .iter()
2594 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2595 group_id: group_id.clone(),
2596 members: group.members.len(),
2597 state: "Stable".to_string(), })
2599 .collect();
2600
2601 Json(serde_json::json!({
2602 "groups": groups
2603 }))
2604 .into_response()
2605 } else {
2606 (
2607 StatusCode::SERVICE_UNAVAILABLE,
2608 Json(serde_json::json!({
2609 "error": "Kafka broker not available",
2610 "message": "Kafka broker is not enabled or not available."
2611 })),
2612 )
2613 .into_response()
2614 }
2615}
2616
2617#[cfg(feature = "kafka")]
2618async fn get_kafka_group(
2620 State(state): State<ManagementState>,
2621 Path(group_id): Path<String>,
2622) -> impl IntoResponse {
2623 if let Some(broker) = &state.kafka_broker {
2624 let consumer_groups = broker.consumer_groups.read().await;
2625 if let Some(group) = consumer_groups.groups().get(&group_id) {
2626 Json(serde_json::json!({
2627 "group_id": group_id,
2628 "members": group.members.len(),
2629 "state": "Stable",
2630 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2631 "member_id": member_id,
2632 "client_id": member.client_id,
2633 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2634 "topic": a.topic,
2635 "partitions": a.partitions
2636 })).collect::<Vec<_>>()
2637 })).collect::<Vec<_>>(),
2638 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2639 "topic": topic,
2640 "partition": partition,
2641 "offset": offset
2642 })).collect::<Vec<_>>()
2643 })).into_response()
2644 } else {
2645 (
2646 StatusCode::NOT_FOUND,
2647 Json(serde_json::json!({
2648 "error": "Consumer group not found",
2649 "group_id": group_id
2650 })),
2651 )
2652 .into_response()
2653 }
2654 } else {
2655 (
2656 StatusCode::SERVICE_UNAVAILABLE,
2657 Json(serde_json::json!({
2658 "error": "Kafka broker not available",
2659 "message": "Kafka broker is not enabled or not available."
2660 })),
2661 )
2662 .into_response()
2663 }
2664}
2665
2666#[cfg(feature = "kafka")]
2669#[derive(Debug, Deserialize)]
2670pub struct KafkaProduceRequest {
2671 pub topic: String,
2673 #[serde(default)]
2675 pub key: Option<String>,
2676 pub value: String,
2678 #[serde(default)]
2680 pub partition: Option<i32>,
2681 #[serde(default)]
2683 pub headers: Option<std::collections::HashMap<String, String>>,
2684}
2685
2686#[cfg(feature = "kafka")]
2687async fn produce_kafka_message(
2689 State(state): State<ManagementState>,
2690 Json(request): Json<KafkaProduceRequest>,
2691) -> impl IntoResponse {
2692 if let Some(broker) = &state.kafka_broker {
2693 let mut topics = broker.topics.write().await;
2694
2695 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2697 mockforge_kafka::topics::Topic::new(
2698 request.topic.clone(),
2699 mockforge_kafka::topics::TopicConfig::default(),
2700 )
2701 });
2702
2703 let partition_id = if let Some(partition) = request.partition {
2705 partition
2706 } else {
2707 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2708 };
2709
2710 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2712 return (
2713 StatusCode::BAD_REQUEST,
2714 Json(serde_json::json!({
2715 "error": "Invalid partition",
2716 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2717 })),
2718 )
2719 .into_response();
2720 }
2721
2722 let key_clone = request.key.clone();
2724 let headers_clone = request.headers.clone();
2725 let message = mockforge_kafka::partitions::KafkaMessage {
2726 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2728 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2729 value: request.value.as_bytes().to_vec(),
2730 headers: headers_clone
2731 .clone()
2732 .unwrap_or_default()
2733 .into_iter()
2734 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2735 .collect(),
2736 };
2737
2738 match topic_entry.produce(partition_id, message).await {
2740 Ok(offset) => {
2741 if let Some(broker) = &state.kafka_broker {
2743 broker.metrics().record_messages_produced(1);
2744 }
2745
2746 #[cfg(feature = "kafka")]
2748 {
2749 let event = MessageEvent::Kafka(KafkaMessageEvent {
2750 topic: request.topic.clone(),
2751 key: key_clone,
2752 value: request.value.clone(),
2753 partition: partition_id,
2754 offset,
2755 headers: headers_clone,
2756 timestamp: chrono::Utc::now().to_rfc3339(),
2757 });
2758 let _ = state.message_events.send(event);
2759 }
2760
2761 Json(serde_json::json!({
2762 "success": true,
2763 "message": format!("Message produced to topic '{}'", request.topic),
2764 "topic": request.topic,
2765 "partition": partition_id,
2766 "offset": offset
2767 }))
2768 .into_response()
2769 }
2770 Err(e) => (
2771 StatusCode::INTERNAL_SERVER_ERROR,
2772 Json(serde_json::json!({
2773 "error": "Failed to produce message",
2774 "message": e.to_string()
2775 })),
2776 )
2777 .into_response(),
2778 }
2779 } else {
2780 (
2781 StatusCode::SERVICE_UNAVAILABLE,
2782 Json(serde_json::json!({
2783 "error": "Kafka broker not available",
2784 "message": "Kafka broker is not enabled or not available."
2785 })),
2786 )
2787 .into_response()
2788 }
2789}
2790
2791#[cfg(feature = "kafka")]
2792#[derive(Debug, Deserialize)]
2793pub struct KafkaBatchProduceRequest {
2794 pub messages: Vec<KafkaProduceRequest>,
2796 #[serde(default = "default_delay")]
2798 pub delay_ms: u64,
2799}
2800
2801#[cfg(feature = "kafka")]
2802async fn produce_kafka_batch(
2804 State(state): State<ManagementState>,
2805 Json(request): Json<KafkaBatchProduceRequest>,
2806) -> impl IntoResponse {
2807 if let Some(broker) = &state.kafka_broker {
2808 if request.messages.is_empty() {
2809 return (
2810 StatusCode::BAD_REQUEST,
2811 Json(serde_json::json!({
2812 "error": "Empty batch",
2813 "message": "At least one message is required"
2814 })),
2815 )
2816 .into_response();
2817 }
2818
2819 let mut results = Vec::new();
2820
2821 for (index, msg_request) in request.messages.iter().enumerate() {
2822 let mut topics = broker.topics.write().await;
2823
2824 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2826 mockforge_kafka::topics::Topic::new(
2827 msg_request.topic.clone(),
2828 mockforge_kafka::topics::TopicConfig::default(),
2829 )
2830 });
2831
2832 let partition_id = if let Some(partition) = msg_request.partition {
2834 partition
2835 } else {
2836 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2837 };
2838
2839 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2841 results.push(serde_json::json!({
2842 "index": index,
2843 "success": false,
2844 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2845 }));
2846 continue;
2847 }
2848
2849 let message = mockforge_kafka::partitions::KafkaMessage {
2851 offset: 0,
2852 timestamp: chrono::Utc::now().timestamp_millis(),
2853 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2854 value: msg_request.value.as_bytes().to_vec(),
2855 headers: msg_request
2856 .headers
2857 .clone()
2858 .unwrap_or_default()
2859 .into_iter()
2860 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2861 .collect(),
2862 };
2863
2864 match topic_entry.produce(partition_id, message).await {
2866 Ok(offset) => {
2867 if let Some(broker) = &state.kafka_broker {
2869 broker.metrics().record_messages_produced(1);
2870 }
2871
2872 let event = MessageEvent::Kafka(KafkaMessageEvent {
2874 topic: msg_request.topic.clone(),
2875 key: msg_request.key.clone(),
2876 value: msg_request.value.clone(),
2877 partition: partition_id,
2878 offset,
2879 headers: msg_request.headers.clone(),
2880 timestamp: chrono::Utc::now().to_rfc3339(),
2881 });
2882 let _ = state.message_events.send(event);
2883
2884 results.push(serde_json::json!({
2885 "index": index,
2886 "success": true,
2887 "topic": msg_request.topic,
2888 "partition": partition_id,
2889 "offset": offset
2890 }));
2891 }
2892 Err(e) => {
2893 results.push(serde_json::json!({
2894 "index": index,
2895 "success": false,
2896 "error": e.to_string()
2897 }));
2898 }
2899 }
2900
2901 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2903 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2904 }
2905 }
2906
2907 let success_count =
2908 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2909
2910 Json(serde_json::json!({
2911 "success": true,
2912 "total": request.messages.len(),
2913 "succeeded": success_count,
2914 "failed": request.messages.len() - success_count,
2915 "results": results
2916 }))
2917 .into_response()
2918 } else {
2919 (
2920 StatusCode::SERVICE_UNAVAILABLE,
2921 Json(serde_json::json!({
2922 "error": "Kafka broker not available",
2923 "message": "Kafka broker is not enabled or not available."
2924 })),
2925 )
2926 .into_response()
2927 }
2928}
2929
2930#[cfg(feature = "mqtt")]
2933async fn mqtt_messages_stream(
2935 State(state): State<ManagementState>,
2936 Query(params): Query<std::collections::HashMap<String, String>>,
2937) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2938 let rx = state.message_events.subscribe();
2939 let topic_filter = params.get("topic").cloned();
2940
2941 let stream = stream::unfold(rx, move |mut rx| {
2942 let topic_filter = topic_filter.clone();
2943
2944 async move {
2945 loop {
2946 match rx.recv().await {
2947 Ok(MessageEvent::Mqtt(event)) => {
2948 if let Some(filter) = &topic_filter {
2950 if !event.topic.contains(filter) {
2951 continue;
2952 }
2953 }
2954
2955 let event_json = serde_json::json!({
2956 "protocol": "mqtt",
2957 "topic": event.topic,
2958 "payload": event.payload,
2959 "qos": event.qos,
2960 "retain": event.retain,
2961 "timestamp": event.timestamp,
2962 });
2963
2964 if let Ok(event_data) = serde_json::to_string(&event_json) {
2965 let sse_event = Event::default().event("mqtt_message").data(event_data);
2966 return Some((Ok(sse_event), rx));
2967 }
2968 }
2969 #[cfg(feature = "kafka")]
2970 Ok(MessageEvent::Kafka(_)) => {
2971 continue;
2973 }
2974 Err(broadcast::error::RecvError::Closed) => {
2975 return None;
2976 }
2977 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2978 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2979 continue;
2980 }
2981 }
2982 }
2983 }
2984 });
2985
2986 Sse::new(stream).keep_alive(
2987 axum::response::sse::KeepAlive::new()
2988 .interval(std::time::Duration::from_secs(15))
2989 .text("keep-alive-text"),
2990 )
2991}
2992
2993#[cfg(feature = "kafka")]
2994async fn kafka_messages_stream(
2996 State(state): State<ManagementState>,
2997 Query(params): Query<std::collections::HashMap<String, String>>,
2998) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2999 let mut rx = state.message_events.subscribe();
3000 let topic_filter = params.get("topic").cloned();
3001
3002 let stream = stream::unfold(rx, move |mut rx| {
3003 let topic_filter = topic_filter.clone();
3004
3005 async move {
3006 loop {
3007 match rx.recv().await {
3008 #[cfg(feature = "mqtt")]
3009 Ok(MessageEvent::Mqtt(_)) => {
3010 continue;
3012 }
3013 Ok(MessageEvent::Kafka(event)) => {
3014 if let Some(filter) = &topic_filter {
3016 if !event.topic.contains(filter) {
3017 continue;
3018 }
3019 }
3020
3021 let event_json = serde_json::json!({
3022 "protocol": "kafka",
3023 "topic": event.topic,
3024 "key": event.key,
3025 "value": event.value,
3026 "partition": event.partition,
3027 "offset": event.offset,
3028 "headers": event.headers,
3029 "timestamp": event.timestamp,
3030 });
3031
3032 if let Ok(event_data) = serde_json::to_string(&event_json) {
3033 let sse_event =
3034 Event::default().event("kafka_message").data(event_data);
3035 return Some((Ok(sse_event), rx));
3036 }
3037 }
3038 Err(broadcast::error::RecvError::Closed) => {
3039 return None;
3040 }
3041 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3042 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3043 continue;
3044 }
3045 }
3046 }
3047 }
3048 });
3049
3050 Sse::new(stream).keep_alive(
3051 axum::response::sse::KeepAlive::new()
3052 .interval(std::time::Duration::from_secs(15))
3053 .text("keep-alive-text"),
3054 )
3055}
3056
3057#[derive(Debug, Deserialize)]
3061pub struct GenerateSpecRequest {
3062 pub query: String,
3064 pub spec_type: String,
3066 pub api_version: Option<String>,
3068}
3069
3070#[derive(Debug, Deserialize)]
3072pub struct GenerateOpenApiFromTrafficRequest {
3073 #[serde(default)]
3075 pub database_path: Option<String>,
3076 #[serde(default)]
3078 pub since: Option<String>,
3079 #[serde(default)]
3081 pub until: Option<String>,
3082 #[serde(default)]
3084 pub path_pattern: Option<String>,
3085 #[serde(default = "default_min_confidence")]
3087 pub min_confidence: f64,
3088}
3089
3090fn default_min_confidence() -> f64 {
3091 0.7
3092}
3093
3094#[cfg(feature = "data-faker")]
3096async fn generate_ai_spec(
3097 State(_state): State<ManagementState>,
3098 Json(request): Json<GenerateSpecRequest>,
3099) -> impl IntoResponse {
3100 use mockforge_data::rag::{
3101 config::{LlmProvider, RagConfig},
3102 engine::RagEngine,
3103 storage::DocumentStorage,
3104 };
3105 use std::sync::Arc;
3106
3107 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3109 .ok()
3110 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3111
3112 if api_key.is_none() {
3114 return (
3115 StatusCode::SERVICE_UNAVAILABLE,
3116 Json(serde_json::json!({
3117 "error": "AI service not configured",
3118 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3119 })),
3120 )
3121 .into_response();
3122 }
3123
3124 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3126 .unwrap_or_else(|_| "openai".to_string())
3127 .to_lowercase();
3128
3129 let provider = match provider_str.as_str() {
3130 "openai" => LlmProvider::OpenAI,
3131 "anthropic" => LlmProvider::Anthropic,
3132 "ollama" => LlmProvider::Ollama,
3133 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3134 _ => LlmProvider::OpenAI,
3135 };
3136
3137 let api_endpoint =
3138 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3139 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3140 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3141 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3142 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3143 });
3144
3145 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3146 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3147 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3148 LlmProvider::Ollama => "llama2".to_string(),
3149 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3150 });
3151
3152 let mut rag_config = RagConfig::default();
3154 rag_config.provider = provider;
3155 rag_config.api_endpoint = api_endpoint;
3156 rag_config.api_key = api_key;
3157 rag_config.model = model;
3158 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3159 .unwrap_or_else(|_| "4096".to_string())
3160 .parse()
3161 .unwrap_or(4096);
3162 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3163 .unwrap_or_else(|_| "0.3".to_string())
3164 .parse()
3165 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3167 .unwrap_or_else(|_| "60".to_string())
3168 .parse()
3169 .unwrap_or(60);
3170 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3171 .unwrap_or_else(|_| "4000".to_string())
3172 .parse()
3173 .unwrap_or(4000);
3174
3175 let spec_type_label = match request.spec_type.as_str() {
3177 "openapi" => "OpenAPI 3.0",
3178 "graphql" => "GraphQL",
3179 "asyncapi" => "AsyncAPI",
3180 _ => "OpenAPI 3.0",
3181 };
3182
3183 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3184
3185 let prompt = format!(
3186 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3187
3188User Requirements:
3189{}
3190
3191Instructions:
31921. Generate a complete, valid {} specification
31932. Include all paths, operations, request/response schemas, and components
31943. Use realistic field names and data types
31954. Include proper descriptions and examples
31965. Follow {} best practices
31976. Return ONLY the specification, no additional explanation
31987. For OpenAPI, use version {}
3199
3200Return the specification in {} format."#,
3201 spec_type_label,
3202 request.query,
3203 spec_type_label,
3204 spec_type_label,
3205 api_version,
3206 if request.spec_type == "graphql" {
3207 "GraphQL SDL"
3208 } else {
3209 "YAML"
3210 }
3211 );
3212
3213 use mockforge_data::rag::storage::InMemoryStorage;
3218 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3219
3220 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3222 Ok(engine) => engine,
3223 Err(e) => {
3224 return (
3225 StatusCode::INTERNAL_SERVER_ERROR,
3226 Json(serde_json::json!({
3227 "error": "Failed to initialize RAG engine",
3228 "message": e.to_string()
3229 })),
3230 )
3231 .into_response();
3232 }
3233 };
3234
3235 match rag_engine.generate(&prompt, None).await {
3237 Ok(generated_text) => {
3238 let spec = if request.spec_type == "graphql" {
3240 extract_graphql_schema(&generated_text)
3242 } else {
3243 extract_yaml_spec(&generated_text)
3245 };
3246
3247 Json(serde_json::json!({
3248 "success": true,
3249 "spec": spec,
3250 "spec_type": request.spec_type,
3251 }))
3252 .into_response()
3253 }
3254 Err(e) => (
3255 StatusCode::INTERNAL_SERVER_ERROR,
3256 Json(serde_json::json!({
3257 "error": "AI generation failed",
3258 "message": e.to_string()
3259 })),
3260 )
3261 .into_response(),
3262 }
3263}
3264
3265#[cfg(not(feature = "data-faker"))]
3266async fn generate_ai_spec(
3267 State(_state): State<ManagementState>,
3268 Json(_request): Json<GenerateSpecRequest>,
3269) -> impl IntoResponse {
3270 (
3271 StatusCode::NOT_IMPLEMENTED,
3272 Json(serde_json::json!({
3273 "error": "AI features not enabled",
3274 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3275 })),
3276 )
3277 .into_response()
3278}
3279
3280#[cfg(feature = "behavioral-cloning")]
3282async fn generate_openapi_from_traffic(
3283 State(_state): State<ManagementState>,
3284 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3285) -> impl IntoResponse {
3286 use chrono::{DateTime, Utc};
3287 use mockforge_core::intelligent_behavior::{
3288 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3289 IntelligentBehaviorConfig,
3290 };
3291 use mockforge_recorder::{
3292 database::RecorderDatabase,
3293 openapi_export::{QueryFilters, RecordingsToOpenApi},
3294 };
3295 use std::path::PathBuf;
3296
3297 let db_path = if let Some(ref path) = request.database_path {
3299 PathBuf::from(path)
3300 } else {
3301 std::env::current_dir()
3302 .unwrap_or_else(|_| PathBuf::from("."))
3303 .join("recordings.db")
3304 };
3305
3306 let db = match RecorderDatabase::new(&db_path).await {
3308 Ok(db) => db,
3309 Err(e) => {
3310 return (
3311 StatusCode::BAD_REQUEST,
3312 Json(serde_json::json!({
3313 "error": "Database error",
3314 "message": format!("Failed to open recorder database: {}", e)
3315 })),
3316 )
3317 .into_response();
3318 }
3319 };
3320
3321 let since_dt = if let Some(ref since_str) = request.since {
3323 match DateTime::parse_from_rfc3339(since_str) {
3324 Ok(dt) => Some(dt.with_timezone(&Utc)),
3325 Err(e) => {
3326 return (
3327 StatusCode::BAD_REQUEST,
3328 Json(serde_json::json!({
3329 "error": "Invalid date format",
3330 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3331 })),
3332 )
3333 .into_response();
3334 }
3335 }
3336 } else {
3337 None
3338 };
3339
3340 let until_dt = if let Some(ref until_str) = request.until {
3341 match DateTime::parse_from_rfc3339(until_str) {
3342 Ok(dt) => Some(dt.with_timezone(&Utc)),
3343 Err(e) => {
3344 return (
3345 StatusCode::BAD_REQUEST,
3346 Json(serde_json::json!({
3347 "error": "Invalid date format",
3348 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3349 })),
3350 )
3351 .into_response();
3352 }
3353 }
3354 } else {
3355 None
3356 };
3357
3358 let query_filters = QueryFilters {
3360 since: since_dt,
3361 until: until_dt,
3362 path_pattern: request.path_pattern.clone(),
3363 min_status_code: None,
3364 max_requests: Some(1000),
3365 };
3366
3367 let exchanges_from_recorder =
3372 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3373 Ok(exchanges) => exchanges,
3374 Err(e) => {
3375 return (
3376 StatusCode::INTERNAL_SERVER_ERROR,
3377 Json(serde_json::json!({
3378 "error": "Query error",
3379 "message": format!("Failed to query HTTP exchanges: {}", e)
3380 })),
3381 )
3382 .into_response();
3383 }
3384 };
3385
3386 if exchanges_from_recorder.is_empty() {
3387 return (
3388 StatusCode::NOT_FOUND,
3389 Json(serde_json::json!({
3390 "error": "No exchanges found",
3391 "message": "No HTTP exchanges found matching the specified filters"
3392 })),
3393 )
3394 .into_response();
3395 }
3396
3397 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3399 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3400 .into_iter()
3401 .map(|e| LocalHttpExchange {
3402 method: e.method,
3403 path: e.path,
3404 query_params: e.query_params,
3405 headers: e.headers,
3406 body: e.body,
3407 body_encoding: e.body_encoding,
3408 status_code: e.status_code,
3409 response_headers: e.response_headers,
3410 response_body: e.response_body,
3411 response_body_encoding: e.response_body_encoding,
3412 timestamp: e.timestamp,
3413 })
3414 .collect();
3415
3416 let behavior_config = IntelligentBehaviorConfig::default();
3418 let gen_config = OpenApiGenerationConfig {
3419 min_confidence: request.min_confidence,
3420 behavior_model: Some(behavior_config.behavior_model),
3421 };
3422
3423 let generator = OpenApiSpecGenerator::new(gen_config);
3425 let result = match generator.generate_from_exchanges(exchanges).await {
3426 Ok(result) => result,
3427 Err(e) => {
3428 return (
3429 StatusCode::INTERNAL_SERVER_ERROR,
3430 Json(serde_json::json!({
3431 "error": "Generation error",
3432 "message": format!("Failed to generate OpenAPI spec: {}", e)
3433 })),
3434 )
3435 .into_response();
3436 }
3437 };
3438
3439 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3441 raw.clone()
3442 } else {
3443 match serde_json::to_value(&result.spec.spec) {
3444 Ok(json) => json,
3445 Err(e) => {
3446 return (
3447 StatusCode::INTERNAL_SERVER_ERROR,
3448 Json(serde_json::json!({
3449 "error": "Serialization error",
3450 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3451 })),
3452 )
3453 .into_response();
3454 }
3455 }
3456 };
3457
3458 let response = serde_json::json!({
3460 "spec": spec_json,
3461 "metadata": {
3462 "requests_analyzed": result.metadata.requests_analyzed,
3463 "paths_inferred": result.metadata.paths_inferred,
3464 "path_confidence": result.metadata.path_confidence,
3465 "generated_at": result.metadata.generated_at.to_rfc3339(),
3466 "duration_ms": result.metadata.duration_ms,
3467 }
3468 });
3469
3470 Json(response).into_response()
3471}
3472
3473async fn list_rule_explanations(
3475 State(state): State<ManagementState>,
3476 Query(params): Query<std::collections::HashMap<String, String>>,
3477) -> impl IntoResponse {
3478 use mockforge_core::intelligent_behavior::RuleType;
3479
3480 let explanations = state.rule_explanations.read().await;
3481 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3482
3483 if let Some(rule_type_str) = params.get("rule_type") {
3485 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3486 explanations_vec.retain(|e| e.rule_type == rule_type);
3487 }
3488 }
3489
3490 if let Some(min_confidence_str) = params.get("min_confidence") {
3492 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3493 explanations_vec.retain(|e| e.confidence >= min_confidence);
3494 }
3495 }
3496
3497 explanations_vec.sort_by(|a, b| {
3499 b.confidence
3500 .partial_cmp(&a.confidence)
3501 .unwrap_or(std::cmp::Ordering::Equal)
3502 .then_with(|| b.generated_at.cmp(&a.generated_at))
3503 });
3504
3505 Json(serde_json::json!({
3506 "explanations": explanations_vec,
3507 "total": explanations_vec.len(),
3508 }))
3509 .into_response()
3510}
3511
3512async fn get_rule_explanation(
3514 State(state): State<ManagementState>,
3515 Path(rule_id): Path<String>,
3516) -> impl IntoResponse {
3517 let explanations = state.rule_explanations.read().await;
3518
3519 match explanations.get(&rule_id) {
3520 Some(explanation) => Json(serde_json::json!({
3521 "explanation": explanation,
3522 }))
3523 .into_response(),
3524 None => (
3525 StatusCode::NOT_FOUND,
3526 Json(serde_json::json!({
3527 "error": "Rule explanation not found",
3528 "message": format!("No explanation found for rule ID: {}", rule_id)
3529 })),
3530 )
3531 .into_response(),
3532 }
3533}
3534
3535#[derive(Debug, Deserialize)]
3537pub struct LearnFromExamplesRequest {
3538 pub examples: Vec<ExamplePairRequest>,
3540 #[serde(default)]
3542 pub config: Option<serde_json::Value>,
3543}
3544
3545#[derive(Debug, Deserialize)]
3547pub struct ExamplePairRequest {
3548 pub request: serde_json::Value,
3550 pub response: serde_json::Value,
3552}
3553
3554async fn learn_from_examples(
3559 State(state): State<ManagementState>,
3560 Json(request): Json<LearnFromExamplesRequest>,
3561) -> impl IntoResponse {
3562 use mockforge_core::intelligent_behavior::{
3563 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3564 rule_generator::{ExamplePair, RuleGenerator},
3565 };
3566
3567 if request.examples.is_empty() {
3568 return (
3569 StatusCode::BAD_REQUEST,
3570 Json(serde_json::json!({
3571 "error": "No examples provided",
3572 "message": "At least one example pair is required"
3573 })),
3574 )
3575 .into_response();
3576 }
3577
3578 let example_pairs: Result<Vec<ExamplePair>, String> = request
3580 .examples
3581 .into_iter()
3582 .enumerate()
3583 .map(|(idx, ex)| {
3584 let method = ex
3586 .request
3587 .get("method")
3588 .and_then(|v| v.as_str())
3589 .map(|s| s.to_string())
3590 .unwrap_or_else(|| "GET".to_string());
3591 let path = ex
3592 .request
3593 .get("path")
3594 .and_then(|v| v.as_str())
3595 .map(|s| s.to_string())
3596 .unwrap_or_else(|| "/".to_string());
3597 let request_body = ex.request.get("body").cloned();
3598 let query_params = ex
3599 .request
3600 .get("query_params")
3601 .and_then(|v| v.as_object())
3602 .map(|obj| {
3603 obj.iter()
3604 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3605 .collect()
3606 })
3607 .unwrap_or_default();
3608 let headers = ex
3609 .request
3610 .get("headers")
3611 .and_then(|v| v.as_object())
3612 .map(|obj| {
3613 obj.iter()
3614 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3615 .collect()
3616 })
3617 .unwrap_or_default();
3618
3619 let status = ex
3621 .response
3622 .get("status_code")
3623 .or_else(|| ex.response.get("status"))
3624 .and_then(|v| v.as_u64())
3625 .map(|n| n as u16)
3626 .unwrap_or(200);
3627 let response_body = ex.response.get("body").cloned();
3628
3629 Ok(ExamplePair {
3630 method,
3631 path,
3632 request: request_body,
3633 status,
3634 response: response_body,
3635 query_params,
3636 headers,
3637 metadata: {
3638 let mut meta = std::collections::HashMap::new();
3639 meta.insert("source".to_string(), "api".to_string());
3640 meta.insert("example_index".to_string(), idx.to_string());
3641 meta
3642 },
3643 })
3644 })
3645 .collect();
3646
3647 let example_pairs = match example_pairs {
3648 Ok(pairs) => pairs,
3649 Err(e) => {
3650 return (
3651 StatusCode::BAD_REQUEST,
3652 Json(serde_json::json!({
3653 "error": "Invalid examples",
3654 "message": e
3655 })),
3656 )
3657 .into_response();
3658 }
3659 };
3660
3661 let behavior_config = if let Some(config_json) = request.config {
3663 serde_json::from_value(config_json)
3665 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3666 .behavior_model
3667 } else {
3668 BehaviorModelConfig::default()
3669 };
3670
3671 let generator = RuleGenerator::new(behavior_config);
3673
3674 let (rules, explanations) =
3676 match generator.generate_rules_with_explanations(example_pairs).await {
3677 Ok(result) => result,
3678 Err(e) => {
3679 return (
3680 StatusCode::INTERNAL_SERVER_ERROR,
3681 Json(serde_json::json!({
3682 "error": "Rule generation failed",
3683 "message": format!("Failed to generate rules: {}", e)
3684 })),
3685 )
3686 .into_response();
3687 }
3688 };
3689
3690 {
3692 let mut stored_explanations = state.rule_explanations.write().await;
3693 for explanation in &explanations {
3694 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3695 }
3696 }
3697
3698 let response = serde_json::json!({
3700 "success": true,
3701 "rules_generated": {
3702 "consistency_rules": rules.consistency_rules.len(),
3703 "schemas": rules.schemas.len(),
3704 "state_machines": rules.state_transitions.len(),
3705 "system_prompt": !rules.system_prompt.is_empty(),
3706 },
3707 "explanations": explanations.iter().map(|e| serde_json::json!({
3708 "rule_id": e.rule_id,
3709 "rule_type": e.rule_type,
3710 "confidence": e.confidence,
3711 "reasoning": e.reasoning,
3712 })).collect::<Vec<_>>(),
3713 "total_explanations": explanations.len(),
3714 });
3715
3716 Json(response).into_response()
3717}
3718
3719#[cfg(feature = "data-faker")]
3720fn extract_yaml_spec(text: &str) -> String {
3721 if let Some(start) = text.find("```yaml") {
3723 let yaml_start = text[start + 7..].trim_start();
3724 if let Some(end) = yaml_start.find("```") {
3725 return yaml_start[..end].trim().to_string();
3726 }
3727 }
3728 if let Some(start) = text.find("```") {
3729 let content_start = text[start + 3..].trim_start();
3730 if let Some(end) = content_start.find("```") {
3731 return content_start[..end].trim().to_string();
3732 }
3733 }
3734
3735 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3737 return text.trim().to_string();
3738 }
3739
3740 text.trim().to_string()
3742}
3743
3744#[cfg(feature = "data-faker")]
3746fn extract_graphql_schema(text: &str) -> String {
3747 if let Some(start) = text.find("```graphql") {
3749 let schema_start = text[start + 10..].trim_start();
3750 if let Some(end) = schema_start.find("```") {
3751 return schema_start[..end].trim().to_string();
3752 }
3753 }
3754 if let Some(start) = text.find("```") {
3755 let content_start = text[start + 3..].trim_start();
3756 if let Some(end) = content_start.find("```") {
3757 return content_start[..end].trim().to_string();
3758 }
3759 }
3760
3761 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3763 return text.trim().to_string();
3764 }
3765
3766 text.trim().to_string()
3767}
3768
3769async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3773 #[cfg(feature = "chaos")]
3774 {
3775 if let Some(chaos_state) = &_state.chaos_api_state {
3776 let config = chaos_state.config.read().await;
3777 Json(serde_json::json!({
3779 "enabled": config.enabled,
3780 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3781 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3782 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3783 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3784 }))
3785 .into_response()
3786 } else {
3787 Json(serde_json::json!({
3789 "enabled": false,
3790 "latency": null,
3791 "fault_injection": null,
3792 "rate_limit": null,
3793 "traffic_shaping": null,
3794 }))
3795 .into_response()
3796 }
3797 }
3798 #[cfg(not(feature = "chaos"))]
3799 {
3800 Json(serde_json::json!({
3802 "enabled": false,
3803 "latency": null,
3804 "fault_injection": null,
3805 "rate_limit": null,
3806 "traffic_shaping": null,
3807 }))
3808 .into_response()
3809 }
3810}
3811
3812#[derive(Debug, Deserialize)]
3814pub struct ChaosConfigUpdate {
3815 pub enabled: Option<bool>,
3817 pub latency: Option<serde_json::Value>,
3819 pub fault_injection: Option<serde_json::Value>,
3821 pub rate_limit: Option<serde_json::Value>,
3823 pub traffic_shaping: Option<serde_json::Value>,
3825}
3826
3827async fn update_chaos_config(
3829 State(_state): State<ManagementState>,
3830 Json(_config_update): Json<ChaosConfigUpdate>,
3831) -> impl IntoResponse {
3832 #[cfg(feature = "chaos")]
3833 {
3834 if let Some(chaos_state) = &_state.chaos_api_state {
3835 use mockforge_chaos::config::{
3836 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3837 TrafficShapingConfig,
3838 };
3839
3840 let mut config = chaos_state.config.write().await;
3841
3842 if let Some(enabled) = _config_update.enabled {
3844 config.enabled = enabled;
3845 }
3846
3847 if let Some(latency_json) = _config_update.latency {
3849 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3850 config.latency = Some(latency);
3851 }
3852 }
3853
3854 if let Some(fault_json) = _config_update.fault_injection {
3856 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3857 config.fault_injection = Some(fault);
3858 }
3859 }
3860
3861 if let Some(rate_json) = _config_update.rate_limit {
3863 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3864 config.rate_limit = Some(rate);
3865 }
3866 }
3867
3868 if let Some(traffic_json) = _config_update.traffic_shaping {
3870 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3871 config.traffic_shaping = Some(traffic);
3872 }
3873 }
3874
3875 drop(config);
3878
3879 info!("Chaos configuration updated successfully");
3880 Json(serde_json::json!({
3881 "success": true,
3882 "message": "Chaos configuration updated and applied"
3883 }))
3884 .into_response()
3885 } else {
3886 (
3887 StatusCode::SERVICE_UNAVAILABLE,
3888 Json(serde_json::json!({
3889 "success": false,
3890 "error": "Chaos API not available",
3891 "message": "Chaos engineering is not enabled or configured"
3892 })),
3893 )
3894 .into_response()
3895 }
3896 }
3897 #[cfg(not(feature = "chaos"))]
3898 {
3899 (
3900 StatusCode::NOT_IMPLEMENTED,
3901 Json(serde_json::json!({
3902 "success": false,
3903 "error": "Chaos feature not enabled",
3904 "message": "Chaos engineering feature is not compiled into this build"
3905 })),
3906 )
3907 .into_response()
3908 }
3909}
3910
3911async fn list_network_profiles() -> impl IntoResponse {
3915 use mockforge_core::network_profiles::NetworkProfileCatalog;
3916
3917 let catalog = NetworkProfileCatalog::default();
3918 let profiles: Vec<serde_json::Value> = catalog
3919 .list_profiles_with_description()
3920 .iter()
3921 .map(|(name, description)| {
3922 serde_json::json!({
3923 "name": name,
3924 "description": description,
3925 })
3926 })
3927 .collect();
3928
3929 Json(serde_json::json!({
3930 "profiles": profiles
3931 }))
3932 .into_response()
3933}
3934
3935#[derive(Debug, Deserialize)]
3936pub struct ApplyNetworkProfileRequest {
3938 pub profile_name: String,
3940}
3941
3942async fn apply_network_profile(
3944 State(state): State<ManagementState>,
3945 Json(request): Json<ApplyNetworkProfileRequest>,
3946) -> impl IntoResponse {
3947 use mockforge_core::network_profiles::NetworkProfileCatalog;
3948
3949 let catalog = NetworkProfileCatalog::default();
3950 if let Some(profile) = catalog.get(&request.profile_name) {
3951 if let Some(server_config) = &state.server_config {
3954 let mut config = server_config.write().await;
3955
3956 use mockforge_core::config::NetworkShapingConfig;
3958
3959 let network_shaping = NetworkShapingConfig {
3963 enabled: profile.traffic_shaping.bandwidth.enabled
3964 || profile.traffic_shaping.burst_loss.enabled,
3965 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3967 max_connections: 1000, };
3969
3970 if let Some(ref mut chaos) = config.observability.chaos {
3973 chaos.traffic_shaping = Some(network_shaping);
3974 } else {
3975 use mockforge_core::config::ChaosEngConfig;
3977 config.observability.chaos = Some(ChaosEngConfig {
3978 enabled: true,
3979 latency: None,
3980 fault_injection: None,
3981 rate_limit: None,
3982 traffic_shaping: Some(network_shaping),
3983 scenario: None,
3984 });
3985 }
3986
3987 info!("Network profile '{}' applied to server configuration", request.profile_name);
3988 } else {
3989 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3990 }
3991
3992 #[cfg(feature = "chaos")]
3994 {
3995 if let Some(chaos_state) = &state.chaos_api_state {
3996 use mockforge_chaos::config::TrafficShapingConfig;
3997
3998 let mut chaos_config = chaos_state.config.write().await;
3999 let chaos_traffic_shaping = TrafficShapingConfig {
4001 enabled: profile.traffic_shaping.bandwidth.enabled
4002 || profile.traffic_shaping.burst_loss.enabled,
4003 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4005 max_connections: 0,
4006 connection_timeout_ms: 30000,
4007 };
4008 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4009 chaos_config.enabled = true; drop(chaos_config);
4011 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4012 }
4013 }
4014
4015 Json(serde_json::json!({
4016 "success": true,
4017 "message": format!("Network profile '{}' applied", request.profile_name),
4018 "profile": {
4019 "name": profile.name,
4020 "description": profile.description,
4021 }
4022 }))
4023 .into_response()
4024 } else {
4025 (
4026 StatusCode::NOT_FOUND,
4027 Json(serde_json::json!({
4028 "error": "Profile not found",
4029 "message": format!("Network profile '{}' not found", request.profile_name)
4030 })),
4031 )
4032 .into_response()
4033 }
4034}
4035
4036pub fn management_router_with_ui_builder(
4038 state: ManagementState,
4039 server_config: mockforge_core::config::ServerConfig,
4040) -> Router {
4041 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4042
4043 let management = management_router(state);
4045
4046 let ui_builder_state = UIBuilderState::new(server_config);
4048 let ui_builder = create_ui_builder_router(ui_builder_state);
4049
4050 management.nest("/ui-builder", ui_builder)
4052}
4053
4054pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4056 use crate::spec_import::{spec_import_router, SpecImportState};
4057
4058 let management = management_router(state);
4060
4061 Router::new()
4063 .merge(management)
4064 .merge(spec_import_router(SpecImportState::new()))
4065}
4066
4067#[cfg(test)]
4068mod tests {
4069 use super::*;
4070
4071 #[tokio::test]
4072 async fn test_create_and_get_mock() {
4073 let state = ManagementState::new(None, None, 3000);
4074
4075 let mock = MockConfig {
4076 id: "test-1".to_string(),
4077 name: "Test Mock".to_string(),
4078 method: "GET".to_string(),
4079 path: "/test".to_string(),
4080 response: MockResponse {
4081 body: serde_json::json!({"message": "test"}),
4082 headers: None,
4083 },
4084 enabled: true,
4085 latency_ms: None,
4086 status_code: Some(200),
4087 request_match: None,
4088 priority: None,
4089 scenario: None,
4090 required_scenario_state: None,
4091 new_scenario_state: None,
4092 };
4093
4094 {
4096 let mut mocks = state.mocks.write().await;
4097 mocks.push(mock.clone());
4098 }
4099
4100 let mocks = state.mocks.read().await;
4102 let found = mocks.iter().find(|m| m.id == "test-1");
4103 assert!(found.is_some());
4104 assert_eq!(found.unwrap().name, "Test Mock");
4105 }
4106
4107 #[tokio::test]
4108 async fn test_server_stats() {
4109 let state = ManagementState::new(None, None, 3000);
4110
4111 {
4113 let mut mocks = state.mocks.write().await;
4114 mocks.push(MockConfig {
4115 id: "1".to_string(),
4116 name: "Mock 1".to_string(),
4117 method: "GET".to_string(),
4118 path: "/test1".to_string(),
4119 response: MockResponse {
4120 body: serde_json::json!({}),
4121 headers: None,
4122 },
4123 enabled: true,
4124 latency_ms: None,
4125 status_code: Some(200),
4126 request_match: None,
4127 priority: None,
4128 scenario: None,
4129 required_scenario_state: None,
4130 new_scenario_state: None,
4131 });
4132 mocks.push(MockConfig {
4133 id: "2".to_string(),
4134 name: "Mock 2".to_string(),
4135 method: "POST".to_string(),
4136 path: "/test2".to_string(),
4137 response: MockResponse {
4138 body: serde_json::json!({}),
4139 headers: None,
4140 },
4141 enabled: false,
4142 latency_ms: None,
4143 status_code: Some(201),
4144 request_match: None,
4145 priority: None,
4146 scenario: None,
4147 required_scenario_state: None,
4148 new_scenario_state: None,
4149 });
4150 }
4151
4152 let mocks = state.mocks.read().await;
4153 assert_eq!(mocks.len(), 2);
4154 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4155 }
4156
4157 #[test]
4158 fn test_mock_matches_request_with_xpath_absolute_path() {
4159 let mock = MockConfig {
4160 id: "xpath-1".to_string(),
4161 name: "XPath Match".to_string(),
4162 method: "POST".to_string(),
4163 path: "/xml".to_string(),
4164 response: MockResponse {
4165 body: serde_json::json!({"ok": true}),
4166 headers: None,
4167 },
4168 enabled: true,
4169 latency_ms: None,
4170 status_code: Some(200),
4171 request_match: Some(RequestMatchCriteria {
4172 xpath: Some("/root/order/id".to_string()),
4173 ..Default::default()
4174 }),
4175 priority: None,
4176 scenario: None,
4177 required_scenario_state: None,
4178 new_scenario_state: None,
4179 };
4180
4181 let body = br#"<root><order><id>123</id></order></root>"#;
4182 let headers = std::collections::HashMap::new();
4183 let query = std::collections::HashMap::new();
4184
4185 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4186 }
4187
4188 #[test]
4189 fn test_mock_matches_request_with_xpath_text_predicate() {
4190 let mock = MockConfig {
4191 id: "xpath-2".to_string(),
4192 name: "XPath Predicate Match".to_string(),
4193 method: "POST".to_string(),
4194 path: "/xml".to_string(),
4195 response: MockResponse {
4196 body: serde_json::json!({"ok": true}),
4197 headers: None,
4198 },
4199 enabled: true,
4200 latency_ms: None,
4201 status_code: Some(200),
4202 request_match: Some(RequestMatchCriteria {
4203 xpath: Some("//order/id[text()='123']".to_string()),
4204 ..Default::default()
4205 }),
4206 priority: None,
4207 scenario: None,
4208 required_scenario_state: None,
4209 new_scenario_state: None,
4210 };
4211
4212 let body = br#"<root><order><id>123</id></order></root>"#;
4213 let headers = std::collections::HashMap::new();
4214 let query = std::collections::HashMap::new();
4215
4216 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4217 }
4218
4219 #[test]
4220 fn test_mock_matches_request_with_xpath_no_match() {
4221 let mock = MockConfig {
4222 id: "xpath-3".to_string(),
4223 name: "XPath No Match".to_string(),
4224 method: "POST".to_string(),
4225 path: "/xml".to_string(),
4226 response: MockResponse {
4227 body: serde_json::json!({"ok": true}),
4228 headers: None,
4229 },
4230 enabled: true,
4231 latency_ms: None,
4232 status_code: Some(200),
4233 request_match: Some(RequestMatchCriteria {
4234 xpath: Some("//order/id[text()='456']".to_string()),
4235 ..Default::default()
4236 }),
4237 priority: None,
4238 scenario: None,
4239 required_scenario_state: None,
4240 new_scenario_state: None,
4241 };
4242
4243 let body = br#"<root><order><id>123</id></order></root>"#;
4244 let headers = std::collections::HashMap::new();
4245 let query = std::collections::HashMap::new();
4246
4247 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4248 }
4249}