1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32fn get_message_broadcast_capacity() -> usize {
34 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45 #[cfg(feature = "mqtt")]
46 Mqtt(MqttMessageEvent),
48 #[cfg(feature = "kafka")]
49 Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57 pub topic: String,
59 pub payload: String,
61 pub qos: u8,
63 pub retain: bool,
65 pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72 pub topic: String,
73 pub key: Option<String>,
74 pub value: String,
75 pub partition: i32,
76 pub offset: i64,
77 pub headers: Option<std::collections::HashMap<String, String>>,
78 pub timestamp: String,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84 #[serde(skip_serializing_if = "String::is_empty")]
86 pub id: String,
87 pub name: String,
89 pub method: String,
91 pub path: String,
93 pub response: MockResponse,
95 #[serde(default = "default_true")]
97 pub enabled: bool,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub latency_ms: Option<u64>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub status_code: Option<u16>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub request_match: Option<RequestMatchCriteria>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub priority: Option<i32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub scenario: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub required_scenario_state: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122 true
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128 pub body: serde_json::Value,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140 pub headers: std::collections::HashMap<String, String>,
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub query_params: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub body_pattern: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub json_path: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub xpath: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub custom_matcher: Option<String>,
156}
157
158pub fn mock_matches_request(
167 mock: &MockConfig,
168 method: &str,
169 path: &str,
170 headers: &std::collections::HashMap<String, String>,
171 query_params: &std::collections::HashMap<String, String>,
172 body: Option<&[u8]>,
173) -> bool {
174 use regex::Regex;
175
176 if !mock.enabled {
178 return false;
179 }
180
181 if mock.method.to_uppercase() != method.to_uppercase() {
183 return false;
184 }
185
186 if !path_matches_pattern(&mock.path, path) {
188 return false;
189 }
190
191 if let Some(criteria) = &mock.request_match {
193 for (key, expected_value) in &criteria.headers {
195 let header_key_lower = key.to_lowercase();
196 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198 if let Some((_, actual_value)) = found {
199 if let Ok(re) = Regex::new(expected_value) {
201 if !re.is_match(actual_value) {
202 return false;
203 }
204 } else if actual_value != expected_value {
205 return false;
206 }
207 } else {
208 return false; }
210 }
211
212 for (key, expected_value) in &criteria.query_params {
214 if let Some(actual_value) = query_params.get(key) {
215 if actual_value != expected_value {
216 return false;
217 }
218 } else {
219 return false; }
221 }
222
223 if let Some(pattern) = &criteria.body_pattern {
225 if let Some(body_bytes) = body {
226 let body_str = String::from_utf8_lossy(body_bytes);
227 if let Ok(re) = Regex::new(pattern) {
229 if !re.is_match(&body_str) {
230 return false;
231 }
232 } else if body_str.as_ref() != pattern {
233 return false;
234 }
235 } else {
236 return false; }
238 }
239
240 if let Some(json_path) = &criteria.json_path {
242 if let Some(body_bytes) = body {
243 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245 if !json_path_exists(&json_value, json_path) {
247 return false;
248 }
249 }
250 }
251 }
252 }
253
254 if let Some(xpath) = &criteria.xpath {
256 if let Some(body_bytes) = body {
257 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258 if !xml_xpath_exists(body_str, xpath) {
259 return false;
260 }
261 } else {
262 return false;
263 }
264 } else {
265 return false; }
267 }
268
269 if let Some(custom) = &criteria.custom_matcher {
271 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272 return false;
273 }
274 }
275 }
276
277 true
278}
279
280fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282 if pattern == path {
284 return true;
285 }
286
287 if pattern == "*" {
289 return true;
290 }
291
292 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296 if pattern_parts.len() != path_parts.len() {
297 if pattern.contains('*') {
299 return matches_wildcard_pattern(pattern, path);
300 }
301 return false;
302 }
303
304 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307 continue; }
309
310 if pattern_part != path_part {
311 return false;
312 }
313 }
314
315 true
316}
317
318fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320 use regex::Regex;
321
322 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326 return re.is_match(path);
327 }
328
329 false
330}
331
332fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334 if let Some(path) = json_path.strip_prefix("$.") {
337 let parts: Vec<&str> = path.split('.').collect();
338
339 let mut current = json;
340 for part in parts {
341 if let Some(obj) = current.as_object() {
342 if let Some(value) = obj.get(part) {
343 current = value;
344 } else {
345 return false;
346 }
347 } else {
348 return false;
349 }
350 }
351 true
352 } else {
353 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355 false
356 }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361 name: String,
362 text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366 if segment.is_empty() {
367 return None;
368 }
369
370 let trimmed = segment.trim();
371 if let Some(bracket_start) = trimmed.find('[') {
372 if !trimmed.ends_with(']') {
373 return None;
374 }
375
376 let name = trimmed[..bracket_start].trim();
377 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378 let predicate = predicate.trim();
379
380 if let Some(raw) = predicate.strip_prefix("text()=") {
382 let raw = raw.trim();
383 if raw.len() >= 2
384 && ((raw.starts_with('"') && raw.ends_with('"'))
385 || (raw.starts_with('\'') && raw.ends_with('\'')))
386 {
387 let text = raw[1..raw.len() - 1].to_string();
388 if !name.is_empty() {
389 return Some(XPathSegment {
390 name: name.to_string(),
391 text_equals: Some(text),
392 });
393 }
394 }
395 }
396
397 None
398 } else {
399 Some(XPathSegment {
400 name: trimmed.to_string(),
401 text_equals: None,
402 })
403 }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407 if !node.is_element() {
408 return false;
409 }
410 if node.tag_name().name() != segment.name {
411 return false;
412 }
413 match &segment.text_equals {
414 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415 None => true,
416 }
417}
418
419fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426 let doc = match roxmltree::Document::parse(xml_body) {
427 Ok(doc) => doc,
428 Err(err) => {
429 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430 return false;
431 }
432 };
433
434 let expr = xpath.trim();
435 if expr.is_empty() {
436 return false;
437 }
438
439 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440 (true, rest)
441 } else if let Some(rest) = expr.strip_prefix('/') {
442 (false, rest)
443 } else {
444 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445 return false;
446 };
447
448 let segments: Vec<XPathSegment> = path_str
449 .split('/')
450 .filter(|s| !s.trim().is_empty())
451 .filter_map(parse_xpath_segment)
452 .collect();
453
454 if segments.is_empty() {
455 return false;
456 }
457
458 if is_descendant {
459 let first = &segments[0];
460 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461 let mut frontier = vec![node];
462 for segment in &segments[1..] {
463 let mut next_frontier = Vec::new();
464 for parent in &frontier {
465 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466 next_frontier.push(child);
467 }
468 }
469 if next_frontier.is_empty() {
470 frontier.clear();
471 break;
472 }
473 frontier = next_frontier;
474 }
475 if !frontier.is_empty() {
476 return true;
477 }
478 }
479 false
480 } else {
481 let mut frontier = vec![doc.root_element()];
482 for (index, segment) in segments.iter().enumerate() {
483 let mut next_frontier = Vec::new();
484 for parent in &frontier {
485 if index == 0 {
486 if segment_matches(*parent, segment) {
487 next_frontier.push(*parent);
488 }
489 continue;
490 }
491 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492 next_frontier.push(child);
493 }
494 }
495 if next_frontier.is_empty() {
496 return false;
497 }
498 frontier = next_frontier;
499 }
500 !frontier.is_empty()
501 }
502}
503
504fn evaluate_custom_matcher(
506 expression: &str,
507 method: &str,
508 path: &str,
509 headers: &std::collections::HashMap<String, String>,
510 query_params: &std::collections::HashMap<String, String>,
511 body: Option<&[u8]>,
512) -> bool {
513 use regex::Regex;
514
515 let expr = expression.trim();
516
517 if expr.contains("==") {
519 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520 if parts.len() != 2 {
521 return false;
522 }
523
524 let field = parts[0];
525 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527 match field {
528 "method" => method == expected_value,
529 "path" => path == expected_value,
530 _ if field.starts_with("headers.") => {
531 let header_name = &field[8..];
532 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533 }
534 _ if field.starts_with("query.") => {
535 let param_name = &field[6..];
536 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537 }
538 _ => false,
539 }
540 }
541 else if expr.contains("=~") {
543 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544 if parts.len() != 2 {
545 return false;
546 }
547
548 let field = parts[0];
549 let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551 if let Ok(re) = Regex::new(pattern) {
552 match field {
553 "method" => re.is_match(method),
554 "path" => re.is_match(path),
555 _ if field.starts_with("headers.") => {
556 let header_name = &field[8..];
557 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558 }
559 _ if field.starts_with("query.") => {
560 let param_name = &field[6..];
561 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562 }
563 _ => false,
564 }
565 } else {
566 false
567 }
568 }
569 else if expr.contains("contains") {
571 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572 if parts.len() != 2 {
573 return false;
574 }
575
576 let field = parts[0];
577 let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579 match field {
580 "path" => path.contains(search_value),
581 _ if field.starts_with("headers.") => {
582 let header_name = &field[8..];
583 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584 }
585 _ if field.starts_with("body") => {
586 if let Some(body_bytes) = body {
587 let body_str = String::from_utf8_lossy(body_bytes);
588 body_str.contains(search_value)
589 } else {
590 false
591 }
592 }
593 _ => false,
594 }
595 } else {
596 tracing::warn!("Unknown custom matcher expression format: {}", expr);
598 false
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605 pub uptime_seconds: u64,
607 pub total_requests: u64,
609 pub active_mocks: usize,
611 pub enabled_mocks: usize,
613 pub registered_routes: usize,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620 pub version: String,
622 pub port: u16,
624 pub has_openapi_spec: bool,
626 #[serde(skip_serializing_if = "Option::is_none")]
628 pub spec_path: Option<String>,
629}
630
631#[derive(Clone)]
633pub struct ManagementState {
634 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636 pub spec: Option<Arc<OpenApiSpec>>,
638 pub spec_path: Option<String>,
640 pub port: u16,
642 pub start_time: std::time::Instant,
644 pub request_counter: Arc<RwLock<u64>>,
646 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648 #[cfg(feature = "smtp")]
650 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651 #[cfg(feature = "mqtt")]
653 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654 #[cfg(feature = "kafka")]
656 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657 #[cfg(any(feature = "mqtt", feature = "kafka"))]
659 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660 pub state_machine_manager:
662 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667 pub rule_explanations: Arc<
669 RwLock<
670 std::collections::HashMap<
671 String,
672 mockforge_core::intelligent_behavior::RuleExplanation,
673 >,
674 >,
675 >,
676 #[cfg(feature = "chaos")]
678 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691 Self {
692 mocks: Arc::new(RwLock::new(Vec::new())),
693 spec,
694 spec_path,
695 port,
696 start_time: std::time::Instant::now(),
697 request_counter: Arc::new(RwLock::new(0)),
698 proxy_config: None,
699 #[cfg(feature = "smtp")]
700 smtp_registry: None,
701 #[cfg(feature = "mqtt")]
702 mqtt_broker: None,
703 #[cfg(feature = "kafka")]
704 kafka_broker: None,
705 #[cfg(any(feature = "mqtt", feature = "kafka"))]
706 message_events: {
707 let capacity = get_message_broadcast_capacity();
708 let (tx, _) = broadcast::channel(capacity);
709 Arc::new(tx)
710 },
711 state_machine_manager: Arc::new(RwLock::new(
712 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713 )),
714 ws_broadcast: None,
715 lifecycle_hooks: None,
716 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717 #[cfg(feature = "chaos")]
718 chaos_api_state: None,
719 server_config: None,
720 }
721 }
722
723 pub fn with_lifecycle_hooks(
725 mut self,
726 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727 ) -> Self {
728 self.lifecycle_hooks = Some(hooks);
729 self
730 }
731
732 pub fn with_ws_broadcast(
734 mut self,
735 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736 ) -> Self {
737 self.ws_broadcast = Some(ws_broadcast);
738 self
739 }
740
741 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743 self.proxy_config = Some(proxy_config);
744 self
745 }
746
747 #[cfg(feature = "smtp")]
748 pub fn with_smtp_registry(
750 mut self,
751 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752 ) -> Self {
753 self.smtp_registry = Some(smtp_registry);
754 self
755 }
756
757 #[cfg(feature = "mqtt")]
758 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760 self.mqtt_broker = Some(mqtt_broker);
761 self
762 }
763
764 #[cfg(feature = "kafka")]
765 pub fn with_kafka_broker(
767 mut self,
768 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769 ) -> Self {
770 self.kafka_broker = Some(kafka_broker);
771 self
772 }
773
774 #[cfg(feature = "chaos")]
775 pub fn with_chaos_api_state(
777 mut self,
778 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779 ) -> Self {
780 self.chaos_api_state = Some(chaos_api_state);
781 self
782 }
783
784 pub fn with_server_config(
786 mut self,
787 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788 ) -> Self {
789 self.server_config = Some(server_config);
790 self
791 }
792}
793
794async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796 let mocks = state.mocks.read().await;
797 Json(serde_json::json!({
798 "mocks": *mocks,
799 "total": mocks.len(),
800 "enabled": mocks.iter().filter(|m| m.enabled).count()
801 }))
802}
803
804async fn get_mock(
806 State(state): State<ManagementState>,
807 Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809 let mocks = state.mocks.read().await;
810 mocks
811 .iter()
812 .find(|m| m.id == id)
813 .cloned()
814 .map(Json)
815 .ok_or(StatusCode::NOT_FOUND)
816}
817
818async fn create_mock(
820 State(state): State<ManagementState>,
821 Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823 let mut mocks = state.mocks.write().await;
824
825 if mock.id.is_empty() {
827 mock.id = uuid::Uuid::new_v4().to_string();
828 }
829
830 if mocks.iter().any(|m| m.id == mock.id) {
832 return Err(StatusCode::CONFLICT);
833 }
834
835 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837 if let Some(hooks) = &state.lifecycle_hooks {
839 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840 id: mock.id.clone(),
841 name: mock.name.clone(),
842 config: serde_json::to_value(&mock).unwrap_or_default(),
843 };
844 hooks.invoke_mock_created(&event).await;
845 }
846
847 mocks.push(mock.clone());
848
849 if let Some(tx) = &state.ws_broadcast {
851 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852 }
853
854 Ok(Json(mock))
855}
856
857async fn update_mock(
859 State(state): State<ManagementState>,
860 Path(id): Path<String>,
861 Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863 let mut mocks = state.mocks.write().await;
864
865 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867 let old_mock = mocks[position].clone();
869
870 info!("Updating mock: {}", id);
871 mocks[position] = updated_mock.clone();
872
873 if let Some(hooks) = &state.lifecycle_hooks {
875 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876 id: updated_mock.id.clone(),
877 name: updated_mock.name.clone(),
878 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879 };
880 hooks.invoke_mock_updated(&event).await;
881
882 if old_mock.enabled != updated_mock.enabled {
884 let state_event = if updated_mock.enabled {
885 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886 id: updated_mock.id.clone(),
887 }
888 } else {
889 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890 id: updated_mock.id.clone(),
891 }
892 };
893 hooks.invoke_mock_state_changed(&state_event).await;
894 }
895 }
896
897 if let Some(tx) = &state.ws_broadcast {
899 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900 }
901
902 Ok(Json(updated_mock))
903}
904
905async fn delete_mock(
907 State(state): State<ManagementState>,
908 Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910 let mut mocks = state.mocks.write().await;
911
912 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914 let deleted_mock = mocks[position].clone();
916
917 info!("Deleting mock: {}", id);
918 mocks.remove(position);
919
920 if let Some(hooks) = &state.lifecycle_hooks {
922 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923 id: deleted_mock.id.clone(),
924 name: deleted_mock.name.clone(),
925 };
926 hooks.invoke_mock_deleted(&event).await;
927 }
928
929 if let Some(tx) = &state.ws_broadcast {
931 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932 }
933
934 Ok(StatusCode::NO_CONTENT)
935}
936
937#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940 pub config: serde_json::Value,
942 #[serde(default = "default_format")]
944 pub format: String,
945}
946
947fn default_format() -> String {
948 "json".to_string()
949}
950
951async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953 use mockforge_core::config::ServerConfig;
954
955 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956 "yaml" | "yml" => {
957 let yaml_str = match serde_json::to_string(&request.config) {
958 Ok(s) => s,
959 Err(e) => {
960 return (
961 StatusCode::BAD_REQUEST,
962 Json(serde_json::json!({
963 "valid": false,
964 "error": format!("Failed to convert to string: {}", e),
965 "message": "Configuration validation failed"
966 })),
967 )
968 .into_response();
969 }
970 };
971 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972 }
973 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974 };
975
976 match config_result {
977 Ok(_) => Json(serde_json::json!({
978 "valid": true,
979 "message": "Configuration is valid"
980 }))
981 .into_response(),
982 Err(e) => (
983 StatusCode::BAD_REQUEST,
984 Json(serde_json::json!({
985 "valid": false,
986 "error": format!("Invalid configuration: {}", e),
987 "message": "Configuration validation failed"
988 })),
989 )
990 .into_response(),
991 }
992}
993
994#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997 pub updates: serde_json::Value,
999}
1000
1001async fn bulk_update_config(
1009 State(_state): State<ManagementState>,
1010 Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012 if !request.updates.is_object() {
1014 return (
1015 StatusCode::BAD_REQUEST,
1016 Json(serde_json::json!({
1017 "error": "Invalid request",
1018 "message": "Updates must be a JSON object"
1019 })),
1020 )
1021 .into_response();
1022 }
1023
1024 use mockforge_core::config::ServerConfig;
1026
1027 let base_config = ServerConfig::default();
1029 let base_json = match serde_json::to_value(&base_config) {
1030 Ok(v) => v,
1031 Err(e) => {
1032 return (
1033 StatusCode::INTERNAL_SERVER_ERROR,
1034 Json(serde_json::json!({
1035 "error": "Internal error",
1036 "message": format!("Failed to serialize base config: {}", e)
1037 })),
1038 )
1039 .into_response();
1040 }
1041 };
1042
1043 let mut merged = base_json.clone();
1045 if let (Some(merged_obj), Some(updates_obj)) =
1046 (merged.as_object_mut(), request.updates.as_object())
1047 {
1048 for (key, value) in updates_obj {
1049 merged_obj.insert(key.clone(), value.clone());
1050 }
1051 }
1052
1053 match serde_json::from_value::<ServerConfig>(merged) {
1055 Ok(_) => {
1056 Json(serde_json::json!({
1063 "success": true,
1064 "message": "Bulk configuration update validated successfully. Note: Runtime application requires ServerConfig in ManagementState and hot-reload support.",
1065 "updates_received": request.updates,
1066 "validated": true
1067 }))
1068 .into_response()
1069 }
1070 Err(e) => (
1071 StatusCode::BAD_REQUEST,
1072 Json(serde_json::json!({
1073 "error": "Invalid configuration",
1074 "message": format!("Configuration validation failed: {}", e),
1075 "validated": false
1076 })),
1077 )
1078 .into_response(),
1079 }
1080}
1081
1082async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1084 let mocks = state.mocks.read().await;
1085 let request_count = *state.request_counter.read().await;
1086
1087 Json(ServerStats {
1088 uptime_seconds: state.start_time.elapsed().as_secs(),
1089 total_requests: request_count,
1090 active_mocks: mocks.len(),
1091 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1092 registered_routes: mocks.len(), })
1094}
1095
1096async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1098 Json(ServerConfig {
1099 version: env!("CARGO_PKG_VERSION").to_string(),
1100 port: state.port,
1101 has_openapi_spec: state.spec.is_some(),
1102 spec_path: state.spec_path.clone(),
1103 })
1104}
1105
1106async fn health_check() -> Json<serde_json::Value> {
1108 Json(serde_json::json!({
1109 "status": "healthy",
1110 "service": "mockforge-management",
1111 "timestamp": chrono::Utc::now().to_rfc3339()
1112 }))
1113}
1114
1115#[derive(Debug, Clone, Serialize, Deserialize)]
1117#[serde(rename_all = "lowercase")]
1118pub enum ExportFormat {
1119 Json,
1121 Yaml,
1123}
1124
1125async fn export_mocks(
1127 State(state): State<ManagementState>,
1128 Query(params): Query<std::collections::HashMap<String, String>>,
1129) -> Result<(StatusCode, String), StatusCode> {
1130 let mocks = state.mocks.read().await;
1131
1132 let format = params
1133 .get("format")
1134 .map(|f| match f.as_str() {
1135 "yaml" | "yml" => ExportFormat::Yaml,
1136 _ => ExportFormat::Json,
1137 })
1138 .unwrap_or(ExportFormat::Json);
1139
1140 match format {
1141 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1142 .map(|json| (StatusCode::OK, json))
1143 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1144 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1145 .map(|yaml| (StatusCode::OK, yaml))
1146 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1147 }
1148}
1149
1150async fn import_mocks(
1152 State(state): State<ManagementState>,
1153 Json(mocks): Json<Vec<MockConfig>>,
1154) -> impl IntoResponse {
1155 let mut current_mocks = state.mocks.write().await;
1156 current_mocks.clear();
1157 current_mocks.extend(mocks);
1158 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1159}
1160
1161#[cfg(feature = "smtp")]
1162async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1164 if let Some(ref smtp_registry) = state.smtp_registry {
1165 match smtp_registry.get_emails() {
1166 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1167 Err(e) => (
1168 StatusCode::INTERNAL_SERVER_ERROR,
1169 Json(serde_json::json!({
1170 "error": "Failed to retrieve emails",
1171 "message": e.to_string()
1172 })),
1173 ),
1174 }
1175 } else {
1176 (
1177 StatusCode::NOT_IMPLEMENTED,
1178 Json(serde_json::json!({
1179 "error": "SMTP mailbox management not available",
1180 "message": "SMTP server is not enabled or registry not available."
1181 })),
1182 )
1183 }
1184}
1185
1186#[cfg(feature = "smtp")]
1188async fn get_smtp_email(
1189 State(state): State<ManagementState>,
1190 Path(id): Path<String>,
1191) -> impl IntoResponse {
1192 if let Some(ref smtp_registry) = state.smtp_registry {
1193 match smtp_registry.get_email_by_id(&id) {
1194 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1195 Ok(None) => (
1196 StatusCode::NOT_FOUND,
1197 Json(serde_json::json!({
1198 "error": "Email not found",
1199 "id": id
1200 })),
1201 ),
1202 Err(e) => (
1203 StatusCode::INTERNAL_SERVER_ERROR,
1204 Json(serde_json::json!({
1205 "error": "Failed to retrieve email",
1206 "message": e.to_string()
1207 })),
1208 ),
1209 }
1210 } else {
1211 (
1212 StatusCode::NOT_IMPLEMENTED,
1213 Json(serde_json::json!({
1214 "error": "SMTP mailbox management not available",
1215 "message": "SMTP server is not enabled or registry not available."
1216 })),
1217 )
1218 }
1219}
1220
1221#[cfg(feature = "smtp")]
1223async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1224 if let Some(ref smtp_registry) = state.smtp_registry {
1225 match smtp_registry.clear_mailbox() {
1226 Ok(()) => (
1227 StatusCode::OK,
1228 Json(serde_json::json!({
1229 "message": "Mailbox cleared successfully"
1230 })),
1231 ),
1232 Err(e) => (
1233 StatusCode::INTERNAL_SERVER_ERROR,
1234 Json(serde_json::json!({
1235 "error": "Failed to clear mailbox",
1236 "message": e.to_string()
1237 })),
1238 ),
1239 }
1240 } else {
1241 (
1242 StatusCode::NOT_IMPLEMENTED,
1243 Json(serde_json::json!({
1244 "error": "SMTP mailbox management not available",
1245 "message": "SMTP server is not enabled or registry not available."
1246 })),
1247 )
1248 }
1249}
1250
1251#[cfg(feature = "smtp")]
1253async fn export_smtp_mailbox(
1254 Query(params): Query<std::collections::HashMap<String, String>>,
1255) -> impl IntoResponse {
1256 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1257 (
1258 StatusCode::NOT_IMPLEMENTED,
1259 Json(serde_json::json!({
1260 "error": "SMTP mailbox management not available via HTTP API",
1261 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1262 "requested_format": format
1263 })),
1264 )
1265}
1266
1267#[cfg(feature = "smtp")]
1269async fn search_smtp_emails(
1270 State(state): State<ManagementState>,
1271 Query(params): Query<std::collections::HashMap<String, String>>,
1272) -> impl IntoResponse {
1273 if let Some(ref smtp_registry) = state.smtp_registry {
1274 let filters = EmailSearchFilters {
1275 sender: params.get("sender").cloned(),
1276 recipient: params.get("recipient").cloned(),
1277 subject: params.get("subject").cloned(),
1278 body: params.get("body").cloned(),
1279 since: params
1280 .get("since")
1281 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1282 .map(|dt| dt.with_timezone(&chrono::Utc)),
1283 until: params
1284 .get("until")
1285 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1286 .map(|dt| dt.with_timezone(&chrono::Utc)),
1287 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1288 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1289 };
1290
1291 match smtp_registry.search_emails(filters) {
1292 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1293 Err(e) => (
1294 StatusCode::INTERNAL_SERVER_ERROR,
1295 Json(serde_json::json!({
1296 "error": "Failed to search emails",
1297 "message": e.to_string()
1298 })),
1299 ),
1300 }
1301 } else {
1302 (
1303 StatusCode::NOT_IMPLEMENTED,
1304 Json(serde_json::json!({
1305 "error": "SMTP mailbox management not available",
1306 "message": "SMTP server is not enabled or registry not available."
1307 })),
1308 )
1309 }
1310}
1311
1312#[cfg(feature = "mqtt")]
1314#[derive(Debug, Clone, Serialize, Deserialize)]
1315pub struct MqttBrokerStats {
1316 pub connected_clients: usize,
1318 pub active_topics: usize,
1320 pub retained_messages: usize,
1322 pub total_subscriptions: usize,
1324}
1325
1326#[cfg(feature = "mqtt")]
1328async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1329 if let Some(broker) = &state.mqtt_broker {
1330 let connected_clients = broker.get_connected_clients().await.len();
1331 let active_topics = broker.get_active_topics().await.len();
1332 let stats = broker.get_topic_stats().await;
1333
1334 let broker_stats = MqttBrokerStats {
1335 connected_clients,
1336 active_topics,
1337 retained_messages: stats.retained_messages,
1338 total_subscriptions: stats.total_subscriptions,
1339 };
1340
1341 Json(broker_stats).into_response()
1342 } else {
1343 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1344 }
1345}
1346
1347#[cfg(feature = "mqtt")]
1348async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1349 if let Some(broker) = &state.mqtt_broker {
1350 let clients = broker.get_connected_clients().await;
1351 Json(serde_json::json!({
1352 "clients": clients
1353 }))
1354 .into_response()
1355 } else {
1356 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1357 }
1358}
1359
1360#[cfg(feature = "mqtt")]
1361async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1362 if let Some(broker) = &state.mqtt_broker {
1363 let topics = broker.get_active_topics().await;
1364 Json(serde_json::json!({
1365 "topics": topics
1366 }))
1367 .into_response()
1368 } else {
1369 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1370 }
1371}
1372
1373#[cfg(feature = "mqtt")]
1374async fn disconnect_mqtt_client(
1375 State(state): State<ManagementState>,
1376 Path(client_id): Path<String>,
1377) -> impl IntoResponse {
1378 if let Some(broker) = &state.mqtt_broker {
1379 match broker.disconnect_client(&client_id).await {
1380 Ok(_) => {
1381 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1382 }
1383 Err(e) => {
1384 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1385 .into_response()
1386 }
1387 }
1388 } else {
1389 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1390 }
1391}
1392
1393#[cfg(feature = "mqtt")]
1396#[derive(Debug, Deserialize)]
1398pub struct MqttPublishRequest {
1399 pub topic: String,
1401 pub payload: String,
1403 #[serde(default = "default_qos")]
1405 pub qos: u8,
1406 #[serde(default)]
1408 pub retain: bool,
1409}
1410
1411#[cfg(feature = "mqtt")]
1412fn default_qos() -> u8 {
1413 0
1414}
1415
1416#[cfg(feature = "mqtt")]
1417async fn publish_mqtt_message_handler(
1419 State(state): State<ManagementState>,
1420 Json(request): Json<serde_json::Value>,
1421) -> impl IntoResponse {
1422 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1424 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1425 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1426 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1427
1428 if topic.is_none() || payload.is_none() {
1429 return (
1430 StatusCode::BAD_REQUEST,
1431 Json(serde_json::json!({
1432 "error": "Invalid request",
1433 "message": "Missing required fields: topic and payload"
1434 })),
1435 );
1436 }
1437
1438 let topic = topic.unwrap();
1439 let payload = payload.unwrap();
1440
1441 if let Some(broker) = &state.mqtt_broker {
1442 if qos > 2 {
1444 return (
1445 StatusCode::BAD_REQUEST,
1446 Json(serde_json::json!({
1447 "error": "Invalid QoS",
1448 "message": "QoS must be 0, 1, or 2"
1449 })),
1450 );
1451 }
1452
1453 let payload_bytes = payload.as_bytes().to_vec();
1455 let client_id = "mockforge-management-api".to_string();
1456
1457 let publish_result = broker
1458 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1459 .await
1460 .map_err(|e| format!("{}", e));
1461
1462 match publish_result {
1463 Ok(_) => {
1464 let event = MessageEvent::Mqtt(MqttMessageEvent {
1466 topic: topic.clone(),
1467 payload: payload.clone(),
1468 qos,
1469 retain,
1470 timestamp: chrono::Utc::now().to_rfc3339(),
1471 });
1472 let _ = state.message_events.send(event);
1473
1474 (
1475 StatusCode::OK,
1476 Json(serde_json::json!({
1477 "success": true,
1478 "message": format!("Message published to topic '{}'", topic),
1479 "topic": topic,
1480 "qos": qos,
1481 "retain": retain
1482 })),
1483 )
1484 }
1485 Err(error_msg) => (
1486 StatusCode::INTERNAL_SERVER_ERROR,
1487 Json(serde_json::json!({
1488 "error": "Failed to publish message",
1489 "message": error_msg
1490 })),
1491 ),
1492 }
1493 } else {
1494 (
1495 StatusCode::SERVICE_UNAVAILABLE,
1496 Json(serde_json::json!({
1497 "error": "MQTT broker not available",
1498 "message": "MQTT broker is not enabled or not available."
1499 })),
1500 )
1501 }
1502}
1503
1504#[cfg(not(feature = "mqtt"))]
1505async fn publish_mqtt_message_handler(
1507 State(_state): State<ManagementState>,
1508 Json(_request): Json<serde_json::Value>,
1509) -> impl IntoResponse {
1510 (
1511 StatusCode::SERVICE_UNAVAILABLE,
1512 Json(serde_json::json!({
1513 "error": "MQTT feature not enabled",
1514 "message": "MQTT support is not compiled into this build"
1515 })),
1516 )
1517}
1518
1519#[cfg(feature = "mqtt")]
1520#[derive(Debug, Deserialize)]
1522pub struct MqttBatchPublishRequest {
1523 pub messages: Vec<MqttPublishRequest>,
1525 #[serde(default = "default_delay")]
1527 pub delay_ms: u64,
1528}
1529
1530#[cfg(feature = "mqtt")]
1531fn default_delay() -> u64 {
1532 100
1533}
1534
1535#[cfg(feature = "mqtt")]
1536async fn publish_mqtt_batch_handler(
1538 State(state): State<ManagementState>,
1539 Json(request): Json<serde_json::Value>,
1540) -> impl IntoResponse {
1541 let messages_json = request.get("messages").and_then(|v| v.as_array());
1543 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1544
1545 if messages_json.is_none() {
1546 return (
1547 StatusCode::BAD_REQUEST,
1548 Json(serde_json::json!({
1549 "error": "Invalid request",
1550 "message": "Missing required field: messages"
1551 })),
1552 );
1553 }
1554
1555 let messages_json = messages_json.unwrap();
1556
1557 if let Some(broker) = &state.mqtt_broker {
1558 if messages_json.is_empty() {
1559 return (
1560 StatusCode::BAD_REQUEST,
1561 Json(serde_json::json!({
1562 "error": "Empty batch",
1563 "message": "At least one message is required"
1564 })),
1565 );
1566 }
1567
1568 let mut results = Vec::new();
1569 let client_id = "mockforge-management-api".to_string();
1570
1571 for (index, msg_json) in messages_json.iter().enumerate() {
1572 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1573 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1574 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1575 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1576
1577 if topic.is_none() || payload.is_none() {
1578 results.push(serde_json::json!({
1579 "index": index,
1580 "success": false,
1581 "error": "Missing required fields: topic and payload"
1582 }));
1583 continue;
1584 }
1585
1586 let topic = topic.unwrap();
1587 let payload = payload.unwrap();
1588
1589 if qos > 2 {
1591 results.push(serde_json::json!({
1592 "index": index,
1593 "success": false,
1594 "error": "Invalid QoS (must be 0, 1, or 2)"
1595 }));
1596 continue;
1597 }
1598
1599 let payload_bytes = payload.as_bytes().to_vec();
1601
1602 let publish_result = broker
1603 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1604 .await
1605 .map_err(|e| format!("{}", e));
1606
1607 match publish_result {
1608 Ok(_) => {
1609 let event = MessageEvent::Mqtt(MqttMessageEvent {
1611 topic: topic.clone(),
1612 payload: payload.clone(),
1613 qos,
1614 retain,
1615 timestamp: chrono::Utc::now().to_rfc3339(),
1616 });
1617 let _ = state.message_events.send(event);
1618
1619 results.push(serde_json::json!({
1620 "index": index,
1621 "success": true,
1622 "topic": topic,
1623 "qos": qos
1624 }));
1625 }
1626 Err(error_msg) => {
1627 results.push(serde_json::json!({
1628 "index": index,
1629 "success": false,
1630 "error": error_msg
1631 }));
1632 }
1633 }
1634
1635 if index < messages_json.len() - 1 && delay_ms > 0 {
1637 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1638 }
1639 }
1640
1641 let success_count =
1642 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1643
1644 (
1645 StatusCode::OK,
1646 Json(serde_json::json!({
1647 "success": true,
1648 "total": messages_json.len(),
1649 "succeeded": success_count,
1650 "failed": messages_json.len() - success_count,
1651 "results": results
1652 })),
1653 )
1654 } else {
1655 (
1656 StatusCode::SERVICE_UNAVAILABLE,
1657 Json(serde_json::json!({
1658 "error": "MQTT broker not available",
1659 "message": "MQTT broker is not enabled or not available."
1660 })),
1661 )
1662 }
1663}
1664
1665#[cfg(not(feature = "mqtt"))]
1666async fn publish_mqtt_batch_handler(
1668 State(_state): State<ManagementState>,
1669 Json(_request): Json<serde_json::Value>,
1670) -> impl IntoResponse {
1671 (
1672 StatusCode::SERVICE_UNAVAILABLE,
1673 Json(serde_json::json!({
1674 "error": "MQTT feature not enabled",
1675 "message": "MQTT support is not compiled into this build"
1676 })),
1677 )
1678}
1679
1680#[derive(Debug, Deserialize)]
1684struct SetMigrationModeRequest {
1685 mode: String,
1686}
1687
1688async fn get_migration_routes(
1690 State(state): State<ManagementState>,
1691) -> Result<Json<serde_json::Value>, StatusCode> {
1692 let proxy_config = match &state.proxy_config {
1693 Some(config) => config,
1694 None => {
1695 return Ok(Json(serde_json::json!({
1696 "error": "Migration not configured. Proxy config not available."
1697 })));
1698 }
1699 };
1700
1701 let config = proxy_config.read().await;
1702 let routes = config.get_migration_routes();
1703
1704 Ok(Json(serde_json::json!({
1705 "routes": routes
1706 })))
1707}
1708
1709async fn toggle_route_migration(
1711 State(state): State<ManagementState>,
1712 Path(pattern): Path<String>,
1713) -> Result<Json<serde_json::Value>, StatusCode> {
1714 let proxy_config = match &state.proxy_config {
1715 Some(config) => config,
1716 None => {
1717 return Ok(Json(serde_json::json!({
1718 "error": "Migration not configured. Proxy config not available."
1719 })));
1720 }
1721 };
1722
1723 let mut config = proxy_config.write().await;
1724 let new_mode = match config.toggle_route_migration(&pattern) {
1725 Some(mode) => mode,
1726 None => {
1727 return Ok(Json(serde_json::json!({
1728 "error": format!("Route pattern not found: {}", pattern)
1729 })));
1730 }
1731 };
1732
1733 Ok(Json(serde_json::json!({
1734 "pattern": pattern,
1735 "mode": format!("{:?}", new_mode).to_lowercase()
1736 })))
1737}
1738
1739async fn set_route_migration_mode(
1741 State(state): State<ManagementState>,
1742 Path(pattern): Path<String>,
1743 Json(request): Json<SetMigrationModeRequest>,
1744) -> Result<Json<serde_json::Value>, StatusCode> {
1745 let proxy_config = match &state.proxy_config {
1746 Some(config) => config,
1747 None => {
1748 return Ok(Json(serde_json::json!({
1749 "error": "Migration not configured. Proxy config not available."
1750 })));
1751 }
1752 };
1753
1754 use mockforge_core::proxy::config::MigrationMode;
1755 let mode = match request.mode.to_lowercase().as_str() {
1756 "mock" => MigrationMode::Mock,
1757 "shadow" => MigrationMode::Shadow,
1758 "real" => MigrationMode::Real,
1759 "auto" => MigrationMode::Auto,
1760 _ => {
1761 return Ok(Json(serde_json::json!({
1762 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1763 })));
1764 }
1765 };
1766
1767 let mut config = proxy_config.write().await;
1768 let updated = config.update_rule_migration_mode(&pattern, mode);
1769
1770 if !updated {
1771 return Ok(Json(serde_json::json!({
1772 "error": format!("Route pattern not found: {}", pattern)
1773 })));
1774 }
1775
1776 Ok(Json(serde_json::json!({
1777 "pattern": pattern,
1778 "mode": format!("{:?}", mode).to_lowercase()
1779 })))
1780}
1781
1782async fn toggle_group_migration(
1784 State(state): State<ManagementState>,
1785 Path(group): Path<String>,
1786) -> Result<Json<serde_json::Value>, StatusCode> {
1787 let proxy_config = match &state.proxy_config {
1788 Some(config) => config,
1789 None => {
1790 return Ok(Json(serde_json::json!({
1791 "error": "Migration not configured. Proxy config not available."
1792 })));
1793 }
1794 };
1795
1796 let mut config = proxy_config.write().await;
1797 let new_mode = config.toggle_group_migration(&group);
1798
1799 Ok(Json(serde_json::json!({
1800 "group": group,
1801 "mode": format!("{:?}", new_mode).to_lowercase()
1802 })))
1803}
1804
1805async fn set_group_migration_mode(
1807 State(state): State<ManagementState>,
1808 Path(group): Path<String>,
1809 Json(request): Json<SetMigrationModeRequest>,
1810) -> Result<Json<serde_json::Value>, StatusCode> {
1811 let proxy_config = match &state.proxy_config {
1812 Some(config) => config,
1813 None => {
1814 return Ok(Json(serde_json::json!({
1815 "error": "Migration not configured. Proxy config not available."
1816 })));
1817 }
1818 };
1819
1820 use mockforge_core::proxy::config::MigrationMode;
1821 let mode = match request.mode.to_lowercase().as_str() {
1822 "mock" => MigrationMode::Mock,
1823 "shadow" => MigrationMode::Shadow,
1824 "real" => MigrationMode::Real,
1825 "auto" => MigrationMode::Auto,
1826 _ => {
1827 return Ok(Json(serde_json::json!({
1828 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1829 })));
1830 }
1831 };
1832
1833 let mut config = proxy_config.write().await;
1834 config.update_group_migration_mode(&group, mode);
1835
1836 Ok(Json(serde_json::json!({
1837 "group": group,
1838 "mode": format!("{:?}", mode).to_lowercase()
1839 })))
1840}
1841
1842async fn get_migration_groups(
1844 State(state): State<ManagementState>,
1845) -> Result<Json<serde_json::Value>, StatusCode> {
1846 let proxy_config = match &state.proxy_config {
1847 Some(config) => config,
1848 None => {
1849 return Ok(Json(serde_json::json!({
1850 "error": "Migration not configured. Proxy config not available."
1851 })));
1852 }
1853 };
1854
1855 let config = proxy_config.read().await;
1856 let groups = config.get_migration_groups();
1857
1858 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1860 .into_iter()
1861 .map(|(name, info)| {
1862 (
1863 name,
1864 serde_json::json!({
1865 "name": info.name,
1866 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1867 "route_count": info.route_count
1868 }),
1869 )
1870 })
1871 .collect();
1872
1873 Ok(Json(serde_json::json!(groups_json)))
1874}
1875
1876async fn get_migration_status(
1878 State(state): State<ManagementState>,
1879) -> Result<Json<serde_json::Value>, StatusCode> {
1880 let proxy_config = match &state.proxy_config {
1881 Some(config) => config,
1882 None => {
1883 return Ok(Json(serde_json::json!({
1884 "error": "Migration not configured. Proxy config not available."
1885 })));
1886 }
1887 };
1888
1889 let config = proxy_config.read().await;
1890 let routes = config.get_migration_routes();
1891 let groups = config.get_migration_groups();
1892
1893 let mut mock_count = 0;
1894 let mut shadow_count = 0;
1895 let mut real_count = 0;
1896 let mut auto_count = 0;
1897
1898 for route in &routes {
1899 match route.migration_mode {
1900 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1901 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1902 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1903 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1904 }
1905 }
1906
1907 Ok(Json(serde_json::json!({
1908 "total_routes": routes.len(),
1909 "mock_routes": mock_count,
1910 "shadow_routes": shadow_count,
1911 "real_routes": real_count,
1912 "auto_routes": auto_count,
1913 "total_groups": groups.len(),
1914 "migration_enabled": config.migration_enabled
1915 })))
1916}
1917
1918#[derive(Debug, Deserialize, Serialize)]
1922pub struct ProxyRuleRequest {
1923 pub pattern: String,
1925 #[serde(rename = "type")]
1927 pub rule_type: String,
1928 #[serde(default)]
1930 pub status_codes: Vec<u16>,
1931 pub body_transforms: Vec<BodyTransformRequest>,
1933 #[serde(default = "default_true")]
1935 pub enabled: bool,
1936}
1937
1938#[derive(Debug, Deserialize, Serialize)]
1940pub struct BodyTransformRequest {
1941 pub path: String,
1943 pub replace: String,
1945 #[serde(default)]
1947 pub operation: String,
1948}
1949
1950#[derive(Debug, Serialize)]
1952pub struct ProxyRuleResponse {
1953 pub id: usize,
1955 pub pattern: String,
1957 #[serde(rename = "type")]
1959 pub rule_type: String,
1960 pub status_codes: Vec<u16>,
1962 pub body_transforms: Vec<BodyTransformRequest>,
1964 pub enabled: bool,
1966}
1967
1968async fn list_proxy_rules(
1970 State(state): State<ManagementState>,
1971) -> Result<Json<serde_json::Value>, StatusCode> {
1972 let proxy_config = match &state.proxy_config {
1973 Some(config) => config,
1974 None => {
1975 return Ok(Json(serde_json::json!({
1976 "error": "Proxy not configured. Proxy config not available."
1977 })));
1978 }
1979 };
1980
1981 let config = proxy_config.read().await;
1982
1983 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1984
1985 for (idx, rule) in config.request_replacements.iter().enumerate() {
1987 rules.push(ProxyRuleResponse {
1988 id: idx,
1989 pattern: rule.pattern.clone(),
1990 rule_type: "request".to_string(),
1991 status_codes: Vec::new(),
1992 body_transforms: rule
1993 .body_transforms
1994 .iter()
1995 .map(|t| BodyTransformRequest {
1996 path: t.path.clone(),
1997 replace: t.replace.clone(),
1998 operation: format!("{:?}", t.operation).to_lowercase(),
1999 })
2000 .collect(),
2001 enabled: rule.enabled,
2002 });
2003 }
2004
2005 let request_count = config.request_replacements.len();
2007 for (idx, rule) in config.response_replacements.iter().enumerate() {
2008 rules.push(ProxyRuleResponse {
2009 id: request_count + idx,
2010 pattern: rule.pattern.clone(),
2011 rule_type: "response".to_string(),
2012 status_codes: rule.status_codes.clone(),
2013 body_transforms: rule
2014 .body_transforms
2015 .iter()
2016 .map(|t| BodyTransformRequest {
2017 path: t.path.clone(),
2018 replace: t.replace.clone(),
2019 operation: format!("{:?}", t.operation).to_lowercase(),
2020 })
2021 .collect(),
2022 enabled: rule.enabled,
2023 });
2024 }
2025
2026 Ok(Json(serde_json::json!({
2027 "rules": rules
2028 })))
2029}
2030
2031async fn create_proxy_rule(
2033 State(state): State<ManagementState>,
2034 Json(request): Json<ProxyRuleRequest>,
2035) -> Result<Json<serde_json::Value>, StatusCode> {
2036 let proxy_config = match &state.proxy_config {
2037 Some(config) => config,
2038 None => {
2039 return Ok(Json(serde_json::json!({
2040 "error": "Proxy not configured. Proxy config not available."
2041 })));
2042 }
2043 };
2044
2045 if request.body_transforms.is_empty() {
2047 return Ok(Json(serde_json::json!({
2048 "error": "At least one body transform is required"
2049 })));
2050 }
2051
2052 let body_transforms: Vec<BodyTransform> = request
2053 .body_transforms
2054 .iter()
2055 .map(|t| {
2056 let op = match t.operation.as_str() {
2057 "replace" => TransformOperation::Replace,
2058 "add" => TransformOperation::Add,
2059 "remove" => TransformOperation::Remove,
2060 _ => TransformOperation::Replace,
2061 };
2062 BodyTransform {
2063 path: t.path.clone(),
2064 replace: t.replace.clone(),
2065 operation: op,
2066 }
2067 })
2068 .collect();
2069
2070 let new_rule = BodyTransformRule {
2071 pattern: request.pattern.clone(),
2072 status_codes: request.status_codes.clone(),
2073 body_transforms,
2074 enabled: request.enabled,
2075 };
2076
2077 let mut config = proxy_config.write().await;
2078
2079 let rule_id = if request.rule_type == "request" {
2080 config.request_replacements.push(new_rule);
2081 config.request_replacements.len() - 1
2082 } else if request.rule_type == "response" {
2083 config.response_replacements.push(new_rule);
2084 config.request_replacements.len() + config.response_replacements.len() - 1
2085 } else {
2086 return Ok(Json(serde_json::json!({
2087 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2088 })));
2089 };
2090
2091 Ok(Json(serde_json::json!({
2092 "id": rule_id,
2093 "message": "Rule created successfully"
2094 })))
2095}
2096
2097async fn get_proxy_rule(
2099 State(state): State<ManagementState>,
2100 Path(id): Path<String>,
2101) -> Result<Json<serde_json::Value>, StatusCode> {
2102 let proxy_config = match &state.proxy_config {
2103 Some(config) => config,
2104 None => {
2105 return Ok(Json(serde_json::json!({
2106 "error": "Proxy not configured. Proxy config not available."
2107 })));
2108 }
2109 };
2110
2111 let config = proxy_config.read().await;
2112 let rule_id: usize = match id.parse() {
2113 Ok(id) => id,
2114 Err(_) => {
2115 return Ok(Json(serde_json::json!({
2116 "error": format!("Invalid rule ID: {}", id)
2117 })));
2118 }
2119 };
2120
2121 let request_count = config.request_replacements.len();
2122
2123 if rule_id < request_count {
2124 let rule = &config.request_replacements[rule_id];
2126 Ok(Json(serde_json::json!({
2127 "id": rule_id,
2128 "pattern": rule.pattern,
2129 "type": "request",
2130 "status_codes": [],
2131 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2132 "path": t.path,
2133 "replace": t.replace,
2134 "operation": format!("{:?}", t.operation).to_lowercase()
2135 })).collect::<Vec<_>>(),
2136 "enabled": rule.enabled
2137 })))
2138 } else if rule_id < request_count + config.response_replacements.len() {
2139 let response_idx = rule_id - request_count;
2141 let rule = &config.response_replacements[response_idx];
2142 Ok(Json(serde_json::json!({
2143 "id": rule_id,
2144 "pattern": rule.pattern,
2145 "type": "response",
2146 "status_codes": rule.status_codes,
2147 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2148 "path": t.path,
2149 "replace": t.replace,
2150 "operation": format!("{:?}", t.operation).to_lowercase()
2151 })).collect::<Vec<_>>(),
2152 "enabled": rule.enabled
2153 })))
2154 } else {
2155 Ok(Json(serde_json::json!({
2156 "error": format!("Rule ID {} not found", rule_id)
2157 })))
2158 }
2159}
2160
2161async fn update_proxy_rule(
2163 State(state): State<ManagementState>,
2164 Path(id): Path<String>,
2165 Json(request): Json<ProxyRuleRequest>,
2166) -> Result<Json<serde_json::Value>, StatusCode> {
2167 let proxy_config = match &state.proxy_config {
2168 Some(config) => config,
2169 None => {
2170 return Ok(Json(serde_json::json!({
2171 "error": "Proxy not configured. Proxy config not available."
2172 })));
2173 }
2174 };
2175
2176 let mut config = proxy_config.write().await;
2177 let rule_id: usize = match id.parse() {
2178 Ok(id) => id,
2179 Err(_) => {
2180 return Ok(Json(serde_json::json!({
2181 "error": format!("Invalid rule ID: {}", id)
2182 })));
2183 }
2184 };
2185
2186 let body_transforms: Vec<BodyTransform> = request
2187 .body_transforms
2188 .iter()
2189 .map(|t| {
2190 let op = match t.operation.as_str() {
2191 "replace" => TransformOperation::Replace,
2192 "add" => TransformOperation::Add,
2193 "remove" => TransformOperation::Remove,
2194 _ => TransformOperation::Replace,
2195 };
2196 BodyTransform {
2197 path: t.path.clone(),
2198 replace: t.replace.clone(),
2199 operation: op,
2200 }
2201 })
2202 .collect();
2203
2204 let updated_rule = BodyTransformRule {
2205 pattern: request.pattern.clone(),
2206 status_codes: request.status_codes.clone(),
2207 body_transforms,
2208 enabled: request.enabled,
2209 };
2210
2211 let request_count = config.request_replacements.len();
2212
2213 if rule_id < request_count {
2214 config.request_replacements[rule_id] = updated_rule;
2216 } else if rule_id < request_count + config.response_replacements.len() {
2217 let response_idx = rule_id - request_count;
2219 config.response_replacements[response_idx] = updated_rule;
2220 } else {
2221 return Ok(Json(serde_json::json!({
2222 "error": format!("Rule ID {} not found", rule_id)
2223 })));
2224 }
2225
2226 Ok(Json(serde_json::json!({
2227 "id": rule_id,
2228 "message": "Rule updated successfully"
2229 })))
2230}
2231
2232async fn delete_proxy_rule(
2234 State(state): State<ManagementState>,
2235 Path(id): Path<String>,
2236) -> Result<Json<serde_json::Value>, StatusCode> {
2237 let proxy_config = match &state.proxy_config {
2238 Some(config) => config,
2239 None => {
2240 return Ok(Json(serde_json::json!({
2241 "error": "Proxy not configured. Proxy config not available."
2242 })));
2243 }
2244 };
2245
2246 let mut config = proxy_config.write().await;
2247 let rule_id: usize = match id.parse() {
2248 Ok(id) => id,
2249 Err(_) => {
2250 return Ok(Json(serde_json::json!({
2251 "error": format!("Invalid rule ID: {}", id)
2252 })));
2253 }
2254 };
2255
2256 let request_count = config.request_replacements.len();
2257
2258 if rule_id < request_count {
2259 config.request_replacements.remove(rule_id);
2261 } else if rule_id < request_count + config.response_replacements.len() {
2262 let response_idx = rule_id - request_count;
2264 config.response_replacements.remove(response_idx);
2265 } else {
2266 return Ok(Json(serde_json::json!({
2267 "error": format!("Rule ID {} not found", rule_id)
2268 })));
2269 }
2270
2271 Ok(Json(serde_json::json!({
2272 "id": rule_id,
2273 "message": "Rule deleted successfully"
2274 })))
2275}
2276
2277async fn get_proxy_inspect(
2280 State(state): State<ManagementState>,
2281 Query(params): Query<std::collections::HashMap<String, String>>,
2282) -> Result<Json<serde_json::Value>, StatusCode> {
2283 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2284 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2285
2286 let proxy_config = match &state.proxy_config {
2287 Some(config) => config.read().await,
2288 None => {
2289 return Ok(Json(serde_json::json!({
2290 "error": "Proxy not configured. Proxy config not available."
2291 })));
2292 }
2293 };
2294
2295 let mut rules = Vec::new();
2296 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2297 rules.push(serde_json::json!({
2298 "id": idx,
2299 "kind": "request",
2300 "pattern": rule.pattern,
2301 "enabled": rule.enabled,
2302 "status_codes": rule.status_codes,
2303 "transform_count": rule.body_transforms.len(),
2304 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2305 "path": t.path,
2306 "operation": t.operation,
2307 "replace": t.replace
2308 })).collect::<Vec<_>>()
2309 }));
2310 }
2311 let request_rule_count = rules.len();
2312 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2313 rules.push(serde_json::json!({
2314 "id": request_rule_count + idx,
2315 "kind": "response",
2316 "pattern": rule.pattern,
2317 "enabled": rule.enabled,
2318 "status_codes": rule.status_codes,
2319 "transform_count": rule.body_transforms.len(),
2320 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2321 "path": t.path,
2322 "operation": t.operation,
2323 "replace": t.replace
2324 })).collect::<Vec<_>>()
2325 }));
2326 }
2327
2328 let total = rules.len();
2329 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2330
2331 Ok(Json(serde_json::json!({
2332 "enabled": proxy_config.enabled,
2333 "target_url": proxy_config.target_url,
2334 "prefix": proxy_config.prefix,
2335 "timeout_seconds": proxy_config.timeout_seconds,
2336 "follow_redirects": proxy_config.follow_redirects,
2337 "passthrough_by_default": proxy_config.passthrough_by_default,
2338 "rules": paged_rules,
2339 "request_rule_count": request_rule_count,
2340 "response_rule_count": total.saturating_sub(request_rule_count),
2341 "limit": limit,
2342 "offset": offset,
2343 "total": total
2344 })))
2345}
2346
2347pub fn management_router(state: ManagementState) -> Router {
2349 let router = Router::new()
2350 .route("/health", get(health_check))
2351 .route("/stats", get(get_stats))
2352 .route("/config", get(get_config))
2353 .route("/config/validate", post(validate_config))
2354 .route("/config/bulk", post(bulk_update_config))
2355 .route("/mocks", get(list_mocks))
2356 .route("/mocks", post(create_mock))
2357 .route("/mocks/{id}", get(get_mock))
2358 .route("/mocks/{id}", put(update_mock))
2359 .route("/mocks/{id}", delete(delete_mock))
2360 .route("/export", get(export_mocks))
2361 .route("/import", post(import_mocks));
2362
2363 #[cfg(feature = "smtp")]
2364 let router = router
2365 .route("/smtp/mailbox", get(list_smtp_emails))
2366 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2367 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2368 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2369 .route("/smtp/mailbox/search", get(search_smtp_emails));
2370
2371 #[cfg(not(feature = "smtp"))]
2372 let router = router;
2373
2374 #[cfg(feature = "mqtt")]
2376 let router = router
2377 .route("/mqtt/stats", get(get_mqtt_stats))
2378 .route("/mqtt/clients", get(get_mqtt_clients))
2379 .route("/mqtt/topics", get(get_mqtt_topics))
2380 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2381 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2382 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2383 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2384
2385 #[cfg(not(feature = "mqtt"))]
2386 let router = router
2387 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2388 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2389
2390 #[cfg(feature = "kafka")]
2391 let router = router
2392 .route("/kafka/stats", get(get_kafka_stats))
2393 .route("/kafka/topics", get(get_kafka_topics))
2394 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2395 .route("/kafka/groups", get(get_kafka_groups))
2396 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2397 .route("/kafka/produce", post(produce_kafka_message))
2398 .route("/kafka/produce/batch", post(produce_kafka_batch))
2399 .route("/kafka/messages/stream", get(kafka_messages_stream));
2400
2401 #[cfg(not(feature = "kafka"))]
2402 let router = router;
2403
2404 let router = router
2406 .route("/migration/routes", get(get_migration_routes))
2407 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2408 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2409 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2410 .route("/migration/groups/{group}", put(set_group_migration_mode))
2411 .route("/migration/groups", get(get_migration_groups))
2412 .route("/migration/status", get(get_migration_status));
2413
2414 let router = router
2416 .route("/proxy/rules", get(list_proxy_rules))
2417 .route("/proxy/rules", post(create_proxy_rule))
2418 .route("/proxy/rules/{id}", get(get_proxy_rule))
2419 .route("/proxy/rules/{id}", put(update_proxy_rule))
2420 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2421 .route("/proxy/inspect", get(get_proxy_inspect));
2422
2423 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2425
2426 let router = router.nest(
2428 "/snapshot-diff",
2429 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2430 );
2431
2432 #[cfg(feature = "behavioral-cloning")]
2433 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2434
2435 let router = router
2436 .route("/mockai/learn", post(learn_from_examples))
2437 .route("/mockai/rules/explanations", get(list_rule_explanations))
2438 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2439 .route("/chaos/config", get(get_chaos_config))
2440 .route("/chaos/config", post(update_chaos_config))
2441 .route("/network/profiles", get(list_network_profiles))
2442 .route("/network/profile/apply", post(apply_network_profile));
2443
2444 let router =
2446 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2447
2448 router.with_state(state)
2449}
2450
2451#[cfg(feature = "kafka")]
2452#[derive(Debug, Clone, Serialize, Deserialize)]
2453pub struct KafkaBrokerStats {
2454 pub topics: usize,
2456 pub partitions: usize,
2458 pub consumer_groups: usize,
2460 pub messages_produced: u64,
2462 pub messages_consumed: u64,
2464}
2465
2466#[cfg(feature = "kafka")]
2467#[derive(Debug, Clone, Serialize, Deserialize)]
2468pub struct KafkaTopicInfo {
2469 pub name: String,
2470 pub partitions: usize,
2471 pub replication_factor: i32,
2472}
2473
2474#[cfg(feature = "kafka")]
2475#[derive(Debug, Clone, Serialize, Deserialize)]
2476pub struct KafkaConsumerGroupInfo {
2477 pub group_id: String,
2478 pub members: usize,
2479 pub state: String,
2480}
2481
2482#[cfg(feature = "kafka")]
2483async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2485 if let Some(broker) = &state.kafka_broker {
2486 let topics = broker.topics.read().await;
2487 let consumer_groups = broker.consumer_groups.read().await;
2488
2489 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2490
2491 let metrics_snapshot = broker.metrics().snapshot();
2493
2494 let stats = KafkaBrokerStats {
2495 topics: topics.len(),
2496 partitions: total_partitions,
2497 consumer_groups: consumer_groups.groups().len(),
2498 messages_produced: metrics_snapshot.messages_produced_total,
2499 messages_consumed: metrics_snapshot.messages_consumed_total,
2500 };
2501
2502 Json(stats).into_response()
2503 } else {
2504 (
2505 StatusCode::SERVICE_UNAVAILABLE,
2506 Json(serde_json::json!({
2507 "error": "Kafka broker not available",
2508 "message": "Kafka broker is not enabled or not available."
2509 })),
2510 )
2511 .into_response()
2512 }
2513}
2514
2515#[cfg(feature = "kafka")]
2516async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2518 if let Some(broker) = &state.kafka_broker {
2519 let topics = broker.topics.read().await;
2520 let topic_list: Vec<KafkaTopicInfo> = topics
2521 .iter()
2522 .map(|(name, topic)| KafkaTopicInfo {
2523 name: name.clone(),
2524 partitions: topic.partitions.len(),
2525 replication_factor: topic.config.replication_factor as i32,
2526 })
2527 .collect();
2528
2529 Json(serde_json::json!({
2530 "topics": topic_list
2531 }))
2532 .into_response()
2533 } else {
2534 (
2535 StatusCode::SERVICE_UNAVAILABLE,
2536 Json(serde_json::json!({
2537 "error": "Kafka broker not available",
2538 "message": "Kafka broker is not enabled or not available."
2539 })),
2540 )
2541 .into_response()
2542 }
2543}
2544
2545#[cfg(feature = "kafka")]
2546async fn get_kafka_topic(
2548 State(state): State<ManagementState>,
2549 Path(topic_name): Path<String>,
2550) -> impl IntoResponse {
2551 if let Some(broker) = &state.kafka_broker {
2552 let topics = broker.topics.read().await;
2553 if let Some(topic) = topics.get(&topic_name) {
2554 Json(serde_json::json!({
2555 "name": topic_name,
2556 "partitions": topic.partitions.len(),
2557 "replication_factor": topic.config.replication_factor,
2558 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2559 "id": idx as i32,
2560 "leader": 0,
2561 "replicas": vec![0],
2562 "message_count": partition.messages.len()
2563 })).collect::<Vec<_>>()
2564 })).into_response()
2565 } else {
2566 (
2567 StatusCode::NOT_FOUND,
2568 Json(serde_json::json!({
2569 "error": "Topic not found",
2570 "topic": topic_name
2571 })),
2572 )
2573 .into_response()
2574 }
2575 } else {
2576 (
2577 StatusCode::SERVICE_UNAVAILABLE,
2578 Json(serde_json::json!({
2579 "error": "Kafka broker not available",
2580 "message": "Kafka broker is not enabled or not available."
2581 })),
2582 )
2583 .into_response()
2584 }
2585}
2586
2587#[cfg(feature = "kafka")]
2588async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2590 if let Some(broker) = &state.kafka_broker {
2591 let consumer_groups = broker.consumer_groups.read().await;
2592 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2593 .groups()
2594 .iter()
2595 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2596 group_id: group_id.clone(),
2597 members: group.members.len(),
2598 state: "Stable".to_string(), })
2600 .collect();
2601
2602 Json(serde_json::json!({
2603 "groups": groups
2604 }))
2605 .into_response()
2606 } else {
2607 (
2608 StatusCode::SERVICE_UNAVAILABLE,
2609 Json(serde_json::json!({
2610 "error": "Kafka broker not available",
2611 "message": "Kafka broker is not enabled or not available."
2612 })),
2613 )
2614 .into_response()
2615 }
2616}
2617
2618#[cfg(feature = "kafka")]
2619async fn get_kafka_group(
2621 State(state): State<ManagementState>,
2622 Path(group_id): Path<String>,
2623) -> impl IntoResponse {
2624 if let Some(broker) = &state.kafka_broker {
2625 let consumer_groups = broker.consumer_groups.read().await;
2626 if let Some(group) = consumer_groups.groups().get(&group_id) {
2627 Json(serde_json::json!({
2628 "group_id": group_id,
2629 "members": group.members.len(),
2630 "state": "Stable",
2631 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2632 "member_id": member_id,
2633 "client_id": member.client_id,
2634 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2635 "topic": a.topic,
2636 "partitions": a.partitions
2637 })).collect::<Vec<_>>()
2638 })).collect::<Vec<_>>(),
2639 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2640 "topic": topic,
2641 "partition": partition,
2642 "offset": offset
2643 })).collect::<Vec<_>>()
2644 })).into_response()
2645 } else {
2646 (
2647 StatusCode::NOT_FOUND,
2648 Json(serde_json::json!({
2649 "error": "Consumer group not found",
2650 "group_id": group_id
2651 })),
2652 )
2653 .into_response()
2654 }
2655 } else {
2656 (
2657 StatusCode::SERVICE_UNAVAILABLE,
2658 Json(serde_json::json!({
2659 "error": "Kafka broker not available",
2660 "message": "Kafka broker is not enabled or not available."
2661 })),
2662 )
2663 .into_response()
2664 }
2665}
2666
2667#[cfg(feature = "kafka")]
2670#[derive(Debug, Deserialize)]
2671pub struct KafkaProduceRequest {
2672 pub topic: String,
2674 #[serde(default)]
2676 pub key: Option<String>,
2677 pub value: String,
2679 #[serde(default)]
2681 pub partition: Option<i32>,
2682 #[serde(default)]
2684 pub headers: Option<std::collections::HashMap<String, String>>,
2685}
2686
2687#[cfg(feature = "kafka")]
2688async fn produce_kafka_message(
2690 State(state): State<ManagementState>,
2691 Json(request): Json<KafkaProduceRequest>,
2692) -> impl IntoResponse {
2693 if let Some(broker) = &state.kafka_broker {
2694 let mut topics = broker.topics.write().await;
2695
2696 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2698 mockforge_kafka::topics::Topic::new(
2699 request.topic.clone(),
2700 mockforge_kafka::topics::TopicConfig::default(),
2701 )
2702 });
2703
2704 let partition_id = if let Some(partition) = request.partition {
2706 partition
2707 } else {
2708 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2709 };
2710
2711 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2713 return (
2714 StatusCode::BAD_REQUEST,
2715 Json(serde_json::json!({
2716 "error": "Invalid partition",
2717 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2718 })),
2719 )
2720 .into_response();
2721 }
2722
2723 let key_clone = request.key.clone();
2725 let headers_clone = request.headers.clone();
2726 let message = mockforge_kafka::partitions::KafkaMessage {
2727 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2729 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2730 value: request.value.as_bytes().to_vec(),
2731 headers: headers_clone
2732 .clone()
2733 .unwrap_or_default()
2734 .into_iter()
2735 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2736 .collect(),
2737 };
2738
2739 match topic_entry.produce(partition_id, message).await {
2741 Ok(offset) => {
2742 if let Some(broker) = &state.kafka_broker {
2744 broker.metrics().record_messages_produced(1);
2745 }
2746
2747 #[cfg(feature = "kafka")]
2749 {
2750 let event = MessageEvent::Kafka(KafkaMessageEvent {
2751 topic: request.topic.clone(),
2752 key: key_clone,
2753 value: request.value.clone(),
2754 partition: partition_id,
2755 offset,
2756 headers: headers_clone,
2757 timestamp: chrono::Utc::now().to_rfc3339(),
2758 });
2759 let _ = state.message_events.send(event);
2760 }
2761
2762 Json(serde_json::json!({
2763 "success": true,
2764 "message": format!("Message produced to topic '{}'", request.topic),
2765 "topic": request.topic,
2766 "partition": partition_id,
2767 "offset": offset
2768 }))
2769 .into_response()
2770 }
2771 Err(e) => (
2772 StatusCode::INTERNAL_SERVER_ERROR,
2773 Json(serde_json::json!({
2774 "error": "Failed to produce message",
2775 "message": e.to_string()
2776 })),
2777 )
2778 .into_response(),
2779 }
2780 } else {
2781 (
2782 StatusCode::SERVICE_UNAVAILABLE,
2783 Json(serde_json::json!({
2784 "error": "Kafka broker not available",
2785 "message": "Kafka broker is not enabled or not available."
2786 })),
2787 )
2788 .into_response()
2789 }
2790}
2791
2792#[cfg(feature = "kafka")]
2793#[derive(Debug, Deserialize)]
2794pub struct KafkaBatchProduceRequest {
2795 pub messages: Vec<KafkaProduceRequest>,
2797 #[serde(default = "default_delay")]
2799 pub delay_ms: u64,
2800}
2801
2802#[cfg(feature = "kafka")]
2803async fn produce_kafka_batch(
2805 State(state): State<ManagementState>,
2806 Json(request): Json<KafkaBatchProduceRequest>,
2807) -> impl IntoResponse {
2808 if let Some(broker) = &state.kafka_broker {
2809 if request.messages.is_empty() {
2810 return (
2811 StatusCode::BAD_REQUEST,
2812 Json(serde_json::json!({
2813 "error": "Empty batch",
2814 "message": "At least one message is required"
2815 })),
2816 )
2817 .into_response();
2818 }
2819
2820 let mut results = Vec::new();
2821
2822 for (index, msg_request) in request.messages.iter().enumerate() {
2823 let mut topics = broker.topics.write().await;
2824
2825 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2827 mockforge_kafka::topics::Topic::new(
2828 msg_request.topic.clone(),
2829 mockforge_kafka::topics::TopicConfig::default(),
2830 )
2831 });
2832
2833 let partition_id = if let Some(partition) = msg_request.partition {
2835 partition
2836 } else {
2837 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2838 };
2839
2840 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2842 results.push(serde_json::json!({
2843 "index": index,
2844 "success": false,
2845 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2846 }));
2847 continue;
2848 }
2849
2850 let message = mockforge_kafka::partitions::KafkaMessage {
2852 offset: 0,
2853 timestamp: chrono::Utc::now().timestamp_millis(),
2854 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2855 value: msg_request.value.as_bytes().to_vec(),
2856 headers: msg_request
2857 .headers
2858 .clone()
2859 .unwrap_or_default()
2860 .into_iter()
2861 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2862 .collect(),
2863 };
2864
2865 match topic_entry.produce(partition_id, message).await {
2867 Ok(offset) => {
2868 if let Some(broker) = &state.kafka_broker {
2870 broker.metrics().record_messages_produced(1);
2871 }
2872
2873 let event = MessageEvent::Kafka(KafkaMessageEvent {
2875 topic: msg_request.topic.clone(),
2876 key: msg_request.key.clone(),
2877 value: msg_request.value.clone(),
2878 partition: partition_id,
2879 offset,
2880 headers: msg_request.headers.clone(),
2881 timestamp: chrono::Utc::now().to_rfc3339(),
2882 });
2883 let _ = state.message_events.send(event);
2884
2885 results.push(serde_json::json!({
2886 "index": index,
2887 "success": true,
2888 "topic": msg_request.topic,
2889 "partition": partition_id,
2890 "offset": offset
2891 }));
2892 }
2893 Err(e) => {
2894 results.push(serde_json::json!({
2895 "index": index,
2896 "success": false,
2897 "error": e.to_string()
2898 }));
2899 }
2900 }
2901
2902 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2904 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2905 }
2906 }
2907
2908 let success_count =
2909 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2910
2911 Json(serde_json::json!({
2912 "success": true,
2913 "total": request.messages.len(),
2914 "succeeded": success_count,
2915 "failed": request.messages.len() - success_count,
2916 "results": results
2917 }))
2918 .into_response()
2919 } else {
2920 (
2921 StatusCode::SERVICE_UNAVAILABLE,
2922 Json(serde_json::json!({
2923 "error": "Kafka broker not available",
2924 "message": "Kafka broker is not enabled or not available."
2925 })),
2926 )
2927 .into_response()
2928 }
2929}
2930
2931#[cfg(feature = "mqtt")]
2934async fn mqtt_messages_stream(
2936 State(state): State<ManagementState>,
2937 Query(params): Query<std::collections::HashMap<String, String>>,
2938) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2939 let rx = state.message_events.subscribe();
2940 let topic_filter = params.get("topic").cloned();
2941
2942 let stream = stream::unfold(rx, move |mut rx| {
2943 let topic_filter = topic_filter.clone();
2944
2945 async move {
2946 loop {
2947 match rx.recv().await {
2948 Ok(MessageEvent::Mqtt(event)) => {
2949 if let Some(filter) = &topic_filter {
2951 if !event.topic.contains(filter) {
2952 continue;
2953 }
2954 }
2955
2956 let event_json = serde_json::json!({
2957 "protocol": "mqtt",
2958 "topic": event.topic,
2959 "payload": event.payload,
2960 "qos": event.qos,
2961 "retain": event.retain,
2962 "timestamp": event.timestamp,
2963 });
2964
2965 if let Ok(event_data) = serde_json::to_string(&event_json) {
2966 let sse_event = Event::default().event("mqtt_message").data(event_data);
2967 return Some((Ok(sse_event), rx));
2968 }
2969 }
2970 #[cfg(feature = "kafka")]
2971 Ok(MessageEvent::Kafka(_)) => {
2972 continue;
2974 }
2975 Err(broadcast::error::RecvError::Closed) => {
2976 return None;
2977 }
2978 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2979 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2980 continue;
2981 }
2982 }
2983 }
2984 }
2985 });
2986
2987 Sse::new(stream).keep_alive(
2988 axum::response::sse::KeepAlive::new()
2989 .interval(std::time::Duration::from_secs(15))
2990 .text("keep-alive-text"),
2991 )
2992}
2993
2994#[cfg(feature = "kafka")]
2995async fn kafka_messages_stream(
2997 State(state): State<ManagementState>,
2998 Query(params): Query<std::collections::HashMap<String, String>>,
2999) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3000 let mut rx = state.message_events.subscribe();
3001 let topic_filter = params.get("topic").cloned();
3002
3003 let stream = stream::unfold(rx, move |mut rx| {
3004 let topic_filter = topic_filter.clone();
3005
3006 async move {
3007 loop {
3008 match rx.recv().await {
3009 #[cfg(feature = "mqtt")]
3010 Ok(MessageEvent::Mqtt(_)) => {
3011 continue;
3013 }
3014 Ok(MessageEvent::Kafka(event)) => {
3015 if let Some(filter) = &topic_filter {
3017 if !event.topic.contains(filter) {
3018 continue;
3019 }
3020 }
3021
3022 let event_json = serde_json::json!({
3023 "protocol": "kafka",
3024 "topic": event.topic,
3025 "key": event.key,
3026 "value": event.value,
3027 "partition": event.partition,
3028 "offset": event.offset,
3029 "headers": event.headers,
3030 "timestamp": event.timestamp,
3031 });
3032
3033 if let Ok(event_data) = serde_json::to_string(&event_json) {
3034 let sse_event =
3035 Event::default().event("kafka_message").data(event_data);
3036 return Some((Ok(sse_event), rx));
3037 }
3038 }
3039 Err(broadcast::error::RecvError::Closed) => {
3040 return None;
3041 }
3042 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3043 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3044 continue;
3045 }
3046 }
3047 }
3048 }
3049 });
3050
3051 Sse::new(stream).keep_alive(
3052 axum::response::sse::KeepAlive::new()
3053 .interval(std::time::Duration::from_secs(15))
3054 .text("keep-alive-text"),
3055 )
3056}
3057
3058#[derive(Debug, Deserialize)]
3062pub struct GenerateSpecRequest {
3063 pub query: String,
3065 pub spec_type: String,
3067 pub api_version: Option<String>,
3069}
3070
3071#[derive(Debug, Deserialize)]
3073pub struct GenerateOpenApiFromTrafficRequest {
3074 #[serde(default)]
3076 pub database_path: Option<String>,
3077 #[serde(default)]
3079 pub since: Option<String>,
3080 #[serde(default)]
3082 pub until: Option<String>,
3083 #[serde(default)]
3085 pub path_pattern: Option<String>,
3086 #[serde(default = "default_min_confidence")]
3088 pub min_confidence: f64,
3089}
3090
3091fn default_min_confidence() -> f64 {
3092 0.7
3093}
3094
3095#[cfg(feature = "data-faker")]
3097async fn generate_ai_spec(
3098 State(_state): State<ManagementState>,
3099 Json(request): Json<GenerateSpecRequest>,
3100) -> impl IntoResponse {
3101 use mockforge_data::rag::{
3102 config::{LlmProvider, RagConfig},
3103 engine::RagEngine,
3104 storage::DocumentStorage,
3105 };
3106 use std::sync::Arc;
3107
3108 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3110 .ok()
3111 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3112
3113 if api_key.is_none() {
3115 return (
3116 StatusCode::SERVICE_UNAVAILABLE,
3117 Json(serde_json::json!({
3118 "error": "AI service not configured",
3119 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3120 })),
3121 )
3122 .into_response();
3123 }
3124
3125 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3127 .unwrap_or_else(|_| "openai".to_string())
3128 .to_lowercase();
3129
3130 let provider = match provider_str.as_str() {
3131 "openai" => LlmProvider::OpenAI,
3132 "anthropic" => LlmProvider::Anthropic,
3133 "ollama" => LlmProvider::Ollama,
3134 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3135 _ => LlmProvider::OpenAI,
3136 };
3137
3138 let api_endpoint =
3139 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3140 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3141 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3142 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3143 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3144 });
3145
3146 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3147 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3148 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3149 LlmProvider::Ollama => "llama2".to_string(),
3150 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3151 });
3152
3153 let mut rag_config = RagConfig::default();
3155 rag_config.provider = provider;
3156 rag_config.api_endpoint = api_endpoint;
3157 rag_config.api_key = api_key;
3158 rag_config.model = model;
3159 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3160 .unwrap_or_else(|_| "4096".to_string())
3161 .parse()
3162 .unwrap_or(4096);
3163 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3164 .unwrap_or_else(|_| "0.3".to_string())
3165 .parse()
3166 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3168 .unwrap_or_else(|_| "60".to_string())
3169 .parse()
3170 .unwrap_or(60);
3171 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3172 .unwrap_or_else(|_| "4000".to_string())
3173 .parse()
3174 .unwrap_or(4000);
3175
3176 let spec_type_label = match request.spec_type.as_str() {
3178 "openapi" => "OpenAPI 3.0",
3179 "graphql" => "GraphQL",
3180 "asyncapi" => "AsyncAPI",
3181 _ => "OpenAPI 3.0",
3182 };
3183
3184 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3185
3186 let prompt = format!(
3187 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3188
3189User Requirements:
3190{}
3191
3192Instructions:
31931. Generate a complete, valid {} specification
31942. Include all paths, operations, request/response schemas, and components
31953. Use realistic field names and data types
31964. Include proper descriptions and examples
31975. Follow {} best practices
31986. Return ONLY the specification, no additional explanation
31997. For OpenAPI, use version {}
3200
3201Return the specification in {} format."#,
3202 spec_type_label,
3203 request.query,
3204 spec_type_label,
3205 spec_type_label,
3206 api_version,
3207 if request.spec_type == "graphql" {
3208 "GraphQL SDL"
3209 } else {
3210 "YAML"
3211 }
3212 );
3213
3214 use mockforge_data::rag::storage::InMemoryStorage;
3219 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3220
3221 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3223 Ok(engine) => engine,
3224 Err(e) => {
3225 return (
3226 StatusCode::INTERNAL_SERVER_ERROR,
3227 Json(serde_json::json!({
3228 "error": "Failed to initialize RAG engine",
3229 "message": e.to_string()
3230 })),
3231 )
3232 .into_response();
3233 }
3234 };
3235
3236 match rag_engine.generate(&prompt, None).await {
3238 Ok(generated_text) => {
3239 let spec = if request.spec_type == "graphql" {
3241 extract_graphql_schema(&generated_text)
3243 } else {
3244 extract_yaml_spec(&generated_text)
3246 };
3247
3248 Json(serde_json::json!({
3249 "success": true,
3250 "spec": spec,
3251 "spec_type": request.spec_type,
3252 }))
3253 .into_response()
3254 }
3255 Err(e) => (
3256 StatusCode::INTERNAL_SERVER_ERROR,
3257 Json(serde_json::json!({
3258 "error": "AI generation failed",
3259 "message": e.to_string()
3260 })),
3261 )
3262 .into_response(),
3263 }
3264}
3265
3266#[cfg(not(feature = "data-faker"))]
3267async fn generate_ai_spec(
3268 State(_state): State<ManagementState>,
3269 Json(_request): Json<GenerateSpecRequest>,
3270) -> impl IntoResponse {
3271 (
3272 StatusCode::NOT_IMPLEMENTED,
3273 Json(serde_json::json!({
3274 "error": "AI features not enabled",
3275 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3276 })),
3277 )
3278 .into_response()
3279}
3280
3281#[cfg(feature = "behavioral-cloning")]
3283async fn generate_openapi_from_traffic(
3284 State(_state): State<ManagementState>,
3285 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3286) -> impl IntoResponse {
3287 use chrono::{DateTime, Utc};
3288 use mockforge_core::intelligent_behavior::{
3289 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3290 IntelligentBehaviorConfig,
3291 };
3292 use mockforge_recorder::{
3293 database::RecorderDatabase,
3294 openapi_export::{QueryFilters, RecordingsToOpenApi},
3295 };
3296 use std::path::PathBuf;
3297
3298 let db_path = if let Some(ref path) = request.database_path {
3300 PathBuf::from(path)
3301 } else {
3302 std::env::current_dir()
3303 .unwrap_or_else(|_| PathBuf::from("."))
3304 .join("recordings.db")
3305 };
3306
3307 let db = match RecorderDatabase::new(&db_path).await {
3309 Ok(db) => db,
3310 Err(e) => {
3311 return (
3312 StatusCode::BAD_REQUEST,
3313 Json(serde_json::json!({
3314 "error": "Database error",
3315 "message": format!("Failed to open recorder database: {}", e)
3316 })),
3317 )
3318 .into_response();
3319 }
3320 };
3321
3322 let since_dt = if let Some(ref since_str) = request.since {
3324 match DateTime::parse_from_rfc3339(since_str) {
3325 Ok(dt) => Some(dt.with_timezone(&Utc)),
3326 Err(e) => {
3327 return (
3328 StatusCode::BAD_REQUEST,
3329 Json(serde_json::json!({
3330 "error": "Invalid date format",
3331 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3332 })),
3333 )
3334 .into_response();
3335 }
3336 }
3337 } else {
3338 None
3339 };
3340
3341 let until_dt = if let Some(ref until_str) = request.until {
3342 match DateTime::parse_from_rfc3339(until_str) {
3343 Ok(dt) => Some(dt.with_timezone(&Utc)),
3344 Err(e) => {
3345 return (
3346 StatusCode::BAD_REQUEST,
3347 Json(serde_json::json!({
3348 "error": "Invalid date format",
3349 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3350 })),
3351 )
3352 .into_response();
3353 }
3354 }
3355 } else {
3356 None
3357 };
3358
3359 let query_filters = QueryFilters {
3361 since: since_dt,
3362 until: until_dt,
3363 path_pattern: request.path_pattern.clone(),
3364 min_status_code: None,
3365 max_requests: Some(1000),
3366 };
3367
3368 let exchanges_from_recorder =
3373 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3374 Ok(exchanges) => exchanges,
3375 Err(e) => {
3376 return (
3377 StatusCode::INTERNAL_SERVER_ERROR,
3378 Json(serde_json::json!({
3379 "error": "Query error",
3380 "message": format!("Failed to query HTTP exchanges: {}", e)
3381 })),
3382 )
3383 .into_response();
3384 }
3385 };
3386
3387 if exchanges_from_recorder.is_empty() {
3388 return (
3389 StatusCode::NOT_FOUND,
3390 Json(serde_json::json!({
3391 "error": "No exchanges found",
3392 "message": "No HTTP exchanges found matching the specified filters"
3393 })),
3394 )
3395 .into_response();
3396 }
3397
3398 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3400 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3401 .into_iter()
3402 .map(|e| LocalHttpExchange {
3403 method: e.method,
3404 path: e.path,
3405 query_params: e.query_params,
3406 headers: e.headers,
3407 body: e.body,
3408 body_encoding: e.body_encoding,
3409 status_code: e.status_code,
3410 response_headers: e.response_headers,
3411 response_body: e.response_body,
3412 response_body_encoding: e.response_body_encoding,
3413 timestamp: e.timestamp,
3414 })
3415 .collect();
3416
3417 let behavior_config = IntelligentBehaviorConfig::default();
3419 let gen_config = OpenApiGenerationConfig {
3420 min_confidence: request.min_confidence,
3421 behavior_model: Some(behavior_config.behavior_model),
3422 };
3423
3424 let generator = OpenApiSpecGenerator::new(gen_config);
3426 let result = match generator.generate_from_exchanges(exchanges).await {
3427 Ok(result) => result,
3428 Err(e) => {
3429 return (
3430 StatusCode::INTERNAL_SERVER_ERROR,
3431 Json(serde_json::json!({
3432 "error": "Generation error",
3433 "message": format!("Failed to generate OpenAPI spec: {}", e)
3434 })),
3435 )
3436 .into_response();
3437 }
3438 };
3439
3440 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3442 raw.clone()
3443 } else {
3444 match serde_json::to_value(&result.spec.spec) {
3445 Ok(json) => json,
3446 Err(e) => {
3447 return (
3448 StatusCode::INTERNAL_SERVER_ERROR,
3449 Json(serde_json::json!({
3450 "error": "Serialization error",
3451 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3452 })),
3453 )
3454 .into_response();
3455 }
3456 }
3457 };
3458
3459 let response = serde_json::json!({
3461 "spec": spec_json,
3462 "metadata": {
3463 "requests_analyzed": result.metadata.requests_analyzed,
3464 "paths_inferred": result.metadata.paths_inferred,
3465 "path_confidence": result.metadata.path_confidence,
3466 "generated_at": result.metadata.generated_at.to_rfc3339(),
3467 "duration_ms": result.metadata.duration_ms,
3468 }
3469 });
3470
3471 Json(response).into_response()
3472}
3473
3474async fn list_rule_explanations(
3476 State(state): State<ManagementState>,
3477 Query(params): Query<std::collections::HashMap<String, String>>,
3478) -> impl IntoResponse {
3479 use mockforge_core::intelligent_behavior::RuleType;
3480
3481 let explanations = state.rule_explanations.read().await;
3482 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3483
3484 if let Some(rule_type_str) = params.get("rule_type") {
3486 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3487 explanations_vec.retain(|e| e.rule_type == rule_type);
3488 }
3489 }
3490
3491 if let Some(min_confidence_str) = params.get("min_confidence") {
3493 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3494 explanations_vec.retain(|e| e.confidence >= min_confidence);
3495 }
3496 }
3497
3498 explanations_vec.sort_by(|a, b| {
3500 b.confidence
3501 .partial_cmp(&a.confidence)
3502 .unwrap_or(std::cmp::Ordering::Equal)
3503 .then_with(|| b.generated_at.cmp(&a.generated_at))
3504 });
3505
3506 Json(serde_json::json!({
3507 "explanations": explanations_vec,
3508 "total": explanations_vec.len(),
3509 }))
3510 .into_response()
3511}
3512
3513async fn get_rule_explanation(
3515 State(state): State<ManagementState>,
3516 Path(rule_id): Path<String>,
3517) -> impl IntoResponse {
3518 let explanations = state.rule_explanations.read().await;
3519
3520 match explanations.get(&rule_id) {
3521 Some(explanation) => Json(serde_json::json!({
3522 "explanation": explanation,
3523 }))
3524 .into_response(),
3525 None => (
3526 StatusCode::NOT_FOUND,
3527 Json(serde_json::json!({
3528 "error": "Rule explanation not found",
3529 "message": format!("No explanation found for rule ID: {}", rule_id)
3530 })),
3531 )
3532 .into_response(),
3533 }
3534}
3535
3536#[derive(Debug, Deserialize)]
3538pub struct LearnFromExamplesRequest {
3539 pub examples: Vec<ExamplePairRequest>,
3541 #[serde(default)]
3543 pub config: Option<serde_json::Value>,
3544}
3545
3546#[derive(Debug, Deserialize)]
3548pub struct ExamplePairRequest {
3549 pub request: serde_json::Value,
3551 pub response: serde_json::Value,
3553}
3554
3555async fn learn_from_examples(
3560 State(state): State<ManagementState>,
3561 Json(request): Json<LearnFromExamplesRequest>,
3562) -> impl IntoResponse {
3563 use mockforge_core::intelligent_behavior::{
3564 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3565 rule_generator::{ExamplePair, RuleGenerator},
3566 };
3567
3568 if request.examples.is_empty() {
3569 return (
3570 StatusCode::BAD_REQUEST,
3571 Json(serde_json::json!({
3572 "error": "No examples provided",
3573 "message": "At least one example pair is required"
3574 })),
3575 )
3576 .into_response();
3577 }
3578
3579 let example_pairs: Result<Vec<ExamplePair>, String> = request
3581 .examples
3582 .into_iter()
3583 .enumerate()
3584 .map(|(idx, ex)| {
3585 let method = ex
3587 .request
3588 .get("method")
3589 .and_then(|v| v.as_str())
3590 .map(|s| s.to_string())
3591 .unwrap_or_else(|| "GET".to_string());
3592 let path = ex
3593 .request
3594 .get("path")
3595 .and_then(|v| v.as_str())
3596 .map(|s| s.to_string())
3597 .unwrap_or_else(|| "/".to_string());
3598 let request_body = ex.request.get("body").cloned();
3599 let query_params = ex
3600 .request
3601 .get("query_params")
3602 .and_then(|v| v.as_object())
3603 .map(|obj| {
3604 obj.iter()
3605 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3606 .collect()
3607 })
3608 .unwrap_or_default();
3609 let headers = ex
3610 .request
3611 .get("headers")
3612 .and_then(|v| v.as_object())
3613 .map(|obj| {
3614 obj.iter()
3615 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3616 .collect()
3617 })
3618 .unwrap_or_default();
3619
3620 let status = ex
3622 .response
3623 .get("status_code")
3624 .or_else(|| ex.response.get("status"))
3625 .and_then(|v| v.as_u64())
3626 .map(|n| n as u16)
3627 .unwrap_or(200);
3628 let response_body = ex.response.get("body").cloned();
3629
3630 Ok(ExamplePair {
3631 method,
3632 path,
3633 request: request_body,
3634 status,
3635 response: response_body,
3636 query_params,
3637 headers,
3638 metadata: {
3639 let mut meta = std::collections::HashMap::new();
3640 meta.insert("source".to_string(), "api".to_string());
3641 meta.insert("example_index".to_string(), idx.to_string());
3642 meta
3643 },
3644 })
3645 })
3646 .collect();
3647
3648 let example_pairs = match example_pairs {
3649 Ok(pairs) => pairs,
3650 Err(e) => {
3651 return (
3652 StatusCode::BAD_REQUEST,
3653 Json(serde_json::json!({
3654 "error": "Invalid examples",
3655 "message": e
3656 })),
3657 )
3658 .into_response();
3659 }
3660 };
3661
3662 let behavior_config = if let Some(config_json) = request.config {
3664 serde_json::from_value(config_json)
3666 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3667 .behavior_model
3668 } else {
3669 BehaviorModelConfig::default()
3670 };
3671
3672 let generator = RuleGenerator::new(behavior_config);
3674
3675 let (rules, explanations) =
3677 match generator.generate_rules_with_explanations(example_pairs).await {
3678 Ok(result) => result,
3679 Err(e) => {
3680 return (
3681 StatusCode::INTERNAL_SERVER_ERROR,
3682 Json(serde_json::json!({
3683 "error": "Rule generation failed",
3684 "message": format!("Failed to generate rules: {}", e)
3685 })),
3686 )
3687 .into_response();
3688 }
3689 };
3690
3691 {
3693 let mut stored_explanations = state.rule_explanations.write().await;
3694 for explanation in &explanations {
3695 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3696 }
3697 }
3698
3699 let response = serde_json::json!({
3701 "success": true,
3702 "rules_generated": {
3703 "consistency_rules": rules.consistency_rules.len(),
3704 "schemas": rules.schemas.len(),
3705 "state_machines": rules.state_transitions.len(),
3706 "system_prompt": !rules.system_prompt.is_empty(),
3707 },
3708 "explanations": explanations.iter().map(|e| serde_json::json!({
3709 "rule_id": e.rule_id,
3710 "rule_type": e.rule_type,
3711 "confidence": e.confidence,
3712 "reasoning": e.reasoning,
3713 })).collect::<Vec<_>>(),
3714 "total_explanations": explanations.len(),
3715 });
3716
3717 Json(response).into_response()
3718}
3719
3720#[cfg(feature = "data-faker")]
3721fn extract_yaml_spec(text: &str) -> String {
3722 if let Some(start) = text.find("```yaml") {
3724 let yaml_start = text[start + 7..].trim_start();
3725 if let Some(end) = yaml_start.find("```") {
3726 return yaml_start[..end].trim().to_string();
3727 }
3728 }
3729 if let Some(start) = text.find("```") {
3730 let content_start = text[start + 3..].trim_start();
3731 if let Some(end) = content_start.find("```") {
3732 return content_start[..end].trim().to_string();
3733 }
3734 }
3735
3736 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3738 return text.trim().to_string();
3739 }
3740
3741 text.trim().to_string()
3743}
3744
3745#[cfg(feature = "data-faker")]
3747fn extract_graphql_schema(text: &str) -> String {
3748 if let Some(start) = text.find("```graphql") {
3750 let schema_start = text[start + 10..].trim_start();
3751 if let Some(end) = schema_start.find("```") {
3752 return schema_start[..end].trim().to_string();
3753 }
3754 }
3755 if let Some(start) = text.find("```") {
3756 let content_start = text[start + 3..].trim_start();
3757 if let Some(end) = content_start.find("```") {
3758 return content_start[..end].trim().to_string();
3759 }
3760 }
3761
3762 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3764 return text.trim().to_string();
3765 }
3766
3767 text.trim().to_string()
3768}
3769
3770async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3774 #[cfg(feature = "chaos")]
3775 {
3776 if let Some(chaos_state) = &_state.chaos_api_state {
3777 let config = chaos_state.config.read().await;
3778 Json(serde_json::json!({
3780 "enabled": config.enabled,
3781 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3782 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3783 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3784 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3785 }))
3786 .into_response()
3787 } else {
3788 Json(serde_json::json!({
3790 "enabled": false,
3791 "latency": null,
3792 "fault_injection": null,
3793 "rate_limit": null,
3794 "traffic_shaping": null,
3795 }))
3796 .into_response()
3797 }
3798 }
3799 #[cfg(not(feature = "chaos"))]
3800 {
3801 Json(serde_json::json!({
3803 "enabled": false,
3804 "latency": null,
3805 "fault_injection": null,
3806 "rate_limit": null,
3807 "traffic_shaping": null,
3808 }))
3809 .into_response()
3810 }
3811}
3812
3813#[derive(Debug, Deserialize)]
3815pub struct ChaosConfigUpdate {
3816 pub enabled: Option<bool>,
3818 pub latency: Option<serde_json::Value>,
3820 pub fault_injection: Option<serde_json::Value>,
3822 pub rate_limit: Option<serde_json::Value>,
3824 pub traffic_shaping: Option<serde_json::Value>,
3826}
3827
3828async fn update_chaos_config(
3830 State(_state): State<ManagementState>,
3831 Json(_config_update): Json<ChaosConfigUpdate>,
3832) -> impl IntoResponse {
3833 #[cfg(feature = "chaos")]
3834 {
3835 if let Some(chaos_state) = &_state.chaos_api_state {
3836 use mockforge_chaos::config::{
3837 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3838 TrafficShapingConfig,
3839 };
3840
3841 let mut config = chaos_state.config.write().await;
3842
3843 if let Some(enabled) = _config_update.enabled {
3845 config.enabled = enabled;
3846 }
3847
3848 if let Some(latency_json) = _config_update.latency {
3850 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3851 config.latency = Some(latency);
3852 }
3853 }
3854
3855 if let Some(fault_json) = _config_update.fault_injection {
3857 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3858 config.fault_injection = Some(fault);
3859 }
3860 }
3861
3862 if let Some(rate_json) = _config_update.rate_limit {
3864 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3865 config.rate_limit = Some(rate);
3866 }
3867 }
3868
3869 if let Some(traffic_json) = _config_update.traffic_shaping {
3871 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3872 config.traffic_shaping = Some(traffic);
3873 }
3874 }
3875
3876 drop(config);
3879
3880 info!("Chaos configuration updated successfully");
3881 Json(serde_json::json!({
3882 "success": true,
3883 "message": "Chaos configuration updated and applied"
3884 }))
3885 .into_response()
3886 } else {
3887 (
3888 StatusCode::SERVICE_UNAVAILABLE,
3889 Json(serde_json::json!({
3890 "success": false,
3891 "error": "Chaos API not available",
3892 "message": "Chaos engineering is not enabled or configured"
3893 })),
3894 )
3895 .into_response()
3896 }
3897 }
3898 #[cfg(not(feature = "chaos"))]
3899 {
3900 (
3901 StatusCode::NOT_IMPLEMENTED,
3902 Json(serde_json::json!({
3903 "success": false,
3904 "error": "Chaos feature not enabled",
3905 "message": "Chaos engineering feature is not compiled into this build"
3906 })),
3907 )
3908 .into_response()
3909 }
3910}
3911
3912async fn list_network_profiles() -> impl IntoResponse {
3916 use mockforge_core::network_profiles::NetworkProfileCatalog;
3917
3918 let catalog = NetworkProfileCatalog::default();
3919 let profiles: Vec<serde_json::Value> = catalog
3920 .list_profiles_with_description()
3921 .iter()
3922 .map(|(name, description)| {
3923 serde_json::json!({
3924 "name": name,
3925 "description": description,
3926 })
3927 })
3928 .collect();
3929
3930 Json(serde_json::json!({
3931 "profiles": profiles
3932 }))
3933 .into_response()
3934}
3935
3936#[derive(Debug, Deserialize)]
3937pub struct ApplyNetworkProfileRequest {
3939 pub profile_name: String,
3941}
3942
3943async fn apply_network_profile(
3945 State(state): State<ManagementState>,
3946 Json(request): Json<ApplyNetworkProfileRequest>,
3947) -> impl IntoResponse {
3948 use mockforge_core::network_profiles::NetworkProfileCatalog;
3949
3950 let catalog = NetworkProfileCatalog::default();
3951 if let Some(profile) = catalog.get(&request.profile_name) {
3952 if let Some(server_config) = &state.server_config {
3955 let mut config = server_config.write().await;
3956
3957 use mockforge_core::config::NetworkShapingConfig;
3959
3960 let network_shaping = NetworkShapingConfig {
3964 enabled: profile.traffic_shaping.bandwidth.enabled
3965 || profile.traffic_shaping.burst_loss.enabled,
3966 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3968 max_connections: 1000, };
3970
3971 if let Some(ref mut chaos) = config.observability.chaos {
3974 chaos.traffic_shaping = Some(network_shaping);
3975 } else {
3976 use mockforge_core::config::ChaosEngConfig;
3978 config.observability.chaos = Some(ChaosEngConfig {
3979 enabled: true,
3980 latency: None,
3981 fault_injection: None,
3982 rate_limit: None,
3983 traffic_shaping: Some(network_shaping),
3984 scenario: None,
3985 });
3986 }
3987
3988 info!("Network profile '{}' applied to server configuration", request.profile_name);
3989 } else {
3990 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3991 }
3992
3993 #[cfg(feature = "chaos")]
3995 {
3996 if let Some(chaos_state) = &state.chaos_api_state {
3997 use mockforge_chaos::config::TrafficShapingConfig;
3998
3999 let mut chaos_config = chaos_state.config.write().await;
4000 let chaos_traffic_shaping = TrafficShapingConfig {
4002 enabled: profile.traffic_shaping.bandwidth.enabled
4003 || profile.traffic_shaping.burst_loss.enabled,
4004 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4006 max_connections: 0,
4007 connection_timeout_ms: 30000,
4008 };
4009 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4010 chaos_config.enabled = true; drop(chaos_config);
4012 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4013 }
4014 }
4015
4016 Json(serde_json::json!({
4017 "success": true,
4018 "message": format!("Network profile '{}' applied", request.profile_name),
4019 "profile": {
4020 "name": profile.name,
4021 "description": profile.description,
4022 }
4023 }))
4024 .into_response()
4025 } else {
4026 (
4027 StatusCode::NOT_FOUND,
4028 Json(serde_json::json!({
4029 "error": "Profile not found",
4030 "message": format!("Network profile '{}' not found", request.profile_name)
4031 })),
4032 )
4033 .into_response()
4034 }
4035}
4036
4037pub fn management_router_with_ui_builder(
4039 state: ManagementState,
4040 server_config: mockforge_core::config::ServerConfig,
4041) -> Router {
4042 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4043
4044 let management = management_router(state);
4046
4047 let ui_builder_state = UIBuilderState::new(server_config);
4049 let ui_builder = create_ui_builder_router(ui_builder_state);
4050
4051 management.nest("/ui-builder", ui_builder)
4053}
4054
4055pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4057 use crate::spec_import::{spec_import_router, SpecImportState};
4058
4059 let management = management_router(state);
4061
4062 Router::new()
4064 .merge(management)
4065 .merge(spec_import_router(SpecImportState::new()))
4066}
4067
4068#[cfg(test)]
4069mod tests {
4070 use super::*;
4071
4072 #[tokio::test]
4073 async fn test_create_and_get_mock() {
4074 let state = ManagementState::new(None, None, 3000);
4075
4076 let mock = MockConfig {
4077 id: "test-1".to_string(),
4078 name: "Test Mock".to_string(),
4079 method: "GET".to_string(),
4080 path: "/test".to_string(),
4081 response: MockResponse {
4082 body: serde_json::json!({"message": "test"}),
4083 headers: None,
4084 },
4085 enabled: true,
4086 latency_ms: None,
4087 status_code: Some(200),
4088 request_match: None,
4089 priority: None,
4090 scenario: None,
4091 required_scenario_state: None,
4092 new_scenario_state: None,
4093 };
4094
4095 {
4097 let mut mocks = state.mocks.write().await;
4098 mocks.push(mock.clone());
4099 }
4100
4101 let mocks = state.mocks.read().await;
4103 let found = mocks.iter().find(|m| m.id == "test-1");
4104 assert!(found.is_some());
4105 assert_eq!(found.unwrap().name, "Test Mock");
4106 }
4107
4108 #[tokio::test]
4109 async fn test_server_stats() {
4110 let state = ManagementState::new(None, None, 3000);
4111
4112 {
4114 let mut mocks = state.mocks.write().await;
4115 mocks.push(MockConfig {
4116 id: "1".to_string(),
4117 name: "Mock 1".to_string(),
4118 method: "GET".to_string(),
4119 path: "/test1".to_string(),
4120 response: MockResponse {
4121 body: serde_json::json!({}),
4122 headers: None,
4123 },
4124 enabled: true,
4125 latency_ms: None,
4126 status_code: Some(200),
4127 request_match: None,
4128 priority: None,
4129 scenario: None,
4130 required_scenario_state: None,
4131 new_scenario_state: None,
4132 });
4133 mocks.push(MockConfig {
4134 id: "2".to_string(),
4135 name: "Mock 2".to_string(),
4136 method: "POST".to_string(),
4137 path: "/test2".to_string(),
4138 response: MockResponse {
4139 body: serde_json::json!({}),
4140 headers: None,
4141 },
4142 enabled: false,
4143 latency_ms: None,
4144 status_code: Some(201),
4145 request_match: None,
4146 priority: None,
4147 scenario: None,
4148 required_scenario_state: None,
4149 new_scenario_state: None,
4150 });
4151 }
4152
4153 let mocks = state.mocks.read().await;
4154 assert_eq!(mocks.len(), 2);
4155 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4156 }
4157
4158 #[test]
4159 fn test_mock_matches_request_with_xpath_absolute_path() {
4160 let mock = MockConfig {
4161 id: "xpath-1".to_string(),
4162 name: "XPath Match".to_string(),
4163 method: "POST".to_string(),
4164 path: "/xml".to_string(),
4165 response: MockResponse {
4166 body: serde_json::json!({"ok": true}),
4167 headers: None,
4168 },
4169 enabled: true,
4170 latency_ms: None,
4171 status_code: Some(200),
4172 request_match: Some(RequestMatchCriteria {
4173 xpath: Some("/root/order/id".to_string()),
4174 ..Default::default()
4175 }),
4176 priority: None,
4177 scenario: None,
4178 required_scenario_state: None,
4179 new_scenario_state: None,
4180 };
4181
4182 let body = br#"<root><order><id>123</id></order></root>"#;
4183 let headers = std::collections::HashMap::new();
4184 let query = std::collections::HashMap::new();
4185
4186 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4187 }
4188
4189 #[test]
4190 fn test_mock_matches_request_with_xpath_text_predicate() {
4191 let mock = MockConfig {
4192 id: "xpath-2".to_string(),
4193 name: "XPath Predicate Match".to_string(),
4194 method: "POST".to_string(),
4195 path: "/xml".to_string(),
4196 response: MockResponse {
4197 body: serde_json::json!({"ok": true}),
4198 headers: None,
4199 },
4200 enabled: true,
4201 latency_ms: None,
4202 status_code: Some(200),
4203 request_match: Some(RequestMatchCriteria {
4204 xpath: Some("//order/id[text()='123']".to_string()),
4205 ..Default::default()
4206 }),
4207 priority: None,
4208 scenario: None,
4209 required_scenario_state: None,
4210 new_scenario_state: None,
4211 };
4212
4213 let body = br#"<root><order><id>123</id></order></root>"#;
4214 let headers = std::collections::HashMap::new();
4215 let query = std::collections::HashMap::new();
4216
4217 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4218 }
4219
4220 #[test]
4221 fn test_mock_matches_request_with_xpath_no_match() {
4222 let mock = MockConfig {
4223 id: "xpath-3".to_string(),
4224 name: "XPath No Match".to_string(),
4225 method: "POST".to_string(),
4226 path: "/xml".to_string(),
4227 response: MockResponse {
4228 body: serde_json::json!({"ok": true}),
4229 headers: None,
4230 },
4231 enabled: true,
4232 latency_ms: None,
4233 status_code: Some(200),
4234 request_match: Some(RequestMatchCriteria {
4235 xpath: Some("//order/id[text()='456']".to_string()),
4236 ..Default::default()
4237 }),
4238 priority: None,
4239 scenario: None,
4240 required_scenario_state: None,
4241 new_scenario_state: None,
4242 };
4243
4244 let body = br#"<root><order><id>123</id></order></root>"#;
4245 let headers = std::collections::HashMap::new();
4246 let query = std::collections::HashMap::new();
4247
4248 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4249 }
4250}