1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
31
32fn get_message_broadcast_capacity() -> usize {
34 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "protocol", content = "data")]
43#[serde(rename_all = "lowercase")]
44pub enum MessageEvent {
45 #[cfg(feature = "mqtt")]
46 Mqtt(MqttMessageEvent),
48 #[cfg(feature = "kafka")]
49 Kafka(KafkaMessageEvent),
51}
52
53#[cfg(feature = "mqtt")]
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct MqttMessageEvent {
57 pub topic: String,
59 pub payload: String,
61 pub qos: u8,
63 pub retain: bool,
65 pub timestamp: String,
67}
68
69#[cfg(feature = "kafka")]
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct KafkaMessageEvent {
72 pub topic: String,
73 pub key: Option<String>,
74 pub value: String,
75 pub partition: i32,
76 pub offset: i64,
77 pub headers: Option<std::collections::HashMap<String, String>>,
78 pub timestamp: String,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MockConfig {
84 #[serde(skip_serializing_if = "String::is_empty")]
86 pub id: String,
87 pub name: String,
89 pub method: String,
91 pub path: String,
93 pub response: MockResponse,
95 #[serde(default = "default_true")]
97 pub enabled: bool,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub latency_ms: Option<u64>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub status_code: Option<u16>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub request_match: Option<RequestMatchCriteria>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub priority: Option<i32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub scenario: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub required_scenario_state: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub new_scenario_state: Option<String>,
119}
120
121fn default_true() -> bool {
122 true
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MockResponse {
128 pub body: serde_json::Value,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub headers: Option<std::collections::HashMap<String, String>>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, Default)]
137pub struct RequestMatchCriteria {
138 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
140 pub headers: std::collections::HashMap<String, String>,
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub query_params: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
146 pub body_pattern: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub json_path: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub xpath: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub custom_matcher: Option<String>,
156}
157
158pub fn mock_matches_request(
167 mock: &MockConfig,
168 method: &str,
169 path: &str,
170 headers: &std::collections::HashMap<String, String>,
171 query_params: &std::collections::HashMap<String, String>,
172 body: Option<&[u8]>,
173) -> bool {
174 use regex::Regex;
175
176 if !mock.enabled {
178 return false;
179 }
180
181 if mock.method.to_uppercase() != method.to_uppercase() {
183 return false;
184 }
185
186 if !path_matches_pattern(&mock.path, path) {
188 return false;
189 }
190
191 if let Some(criteria) = &mock.request_match {
193 for (key, expected_value) in &criteria.headers {
195 let header_key_lower = key.to_lowercase();
196 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
197
198 if let Some((_, actual_value)) = found {
199 if let Ok(re) = Regex::new(expected_value) {
201 if !re.is_match(actual_value) {
202 return false;
203 }
204 } else if actual_value != expected_value {
205 return false;
206 }
207 } else {
208 return false; }
210 }
211
212 for (key, expected_value) in &criteria.query_params {
214 if let Some(actual_value) = query_params.get(key) {
215 if actual_value != expected_value {
216 return false;
217 }
218 } else {
219 return false; }
221 }
222
223 if let Some(pattern) = &criteria.body_pattern {
225 if let Some(body_bytes) = body {
226 let body_str = String::from_utf8_lossy(body_bytes);
227 if let Ok(re) = Regex::new(pattern) {
229 if !re.is_match(&body_str) {
230 return false;
231 }
232 } else if body_str.as_ref() != pattern {
233 return false;
234 }
235 } else {
236 return false; }
238 }
239
240 if let Some(json_path) = &criteria.json_path {
242 if let Some(body_bytes) = body {
243 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
244 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
245 if !json_path_exists(&json_value, json_path) {
247 return false;
248 }
249 }
250 }
251 }
252 }
253
254 if let Some(xpath) = &criteria.xpath {
256 if let Some(body_bytes) = body {
257 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
258 if !xml_xpath_exists(body_str, xpath) {
259 return false;
260 }
261 } else {
262 return false;
263 }
264 } else {
265 return false; }
267 }
268
269 if let Some(custom) = &criteria.custom_matcher {
271 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
272 return false;
273 }
274 }
275 }
276
277 true
278}
279
280fn path_matches_pattern(pattern: &str, path: &str) -> bool {
282 if pattern == path {
284 return true;
285 }
286
287 if pattern == "*" {
289 return true;
290 }
291
292 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
294 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
295
296 if pattern_parts.len() != path_parts.len() {
297 if pattern.contains('*') {
299 return matches_wildcard_pattern(pattern, path);
300 }
301 return false;
302 }
303
304 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
305 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
307 continue; }
309
310 if pattern_part != path_part {
311 return false;
312 }
313 }
314
315 true
316}
317
318fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
320 use regex::Regex;
321
322 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
324
325 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
326 return re.is_match(path);
327 }
328
329 false
330}
331
332fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
334 if let Some(path) = json_path.strip_prefix("$.") {
337 let parts: Vec<&str> = path.split('.').collect();
338
339 let mut current = json;
340 for part in parts {
341 if let Some(obj) = current.as_object() {
342 if let Some(value) = obj.get(part) {
343 current = value;
344 } else {
345 return false;
346 }
347 } else {
348 return false;
349 }
350 }
351 true
352 } else {
353 tracing::warn!("Complex JSONPath expressions not yet fully supported: {}", json_path);
355 false
356 }
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360struct XPathSegment {
361 name: String,
362 text_equals: Option<String>,
363}
364
365fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
366 if segment.is_empty() {
367 return None;
368 }
369
370 let trimmed = segment.trim();
371 if let Some(bracket_start) = trimmed.find('[') {
372 if !trimmed.ends_with(']') {
373 return None;
374 }
375
376 let name = trimmed[..bracket_start].trim();
377 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
378 let predicate = predicate.trim();
379
380 if let Some(raw) = predicate.strip_prefix("text()=") {
382 let raw = raw.trim();
383 if raw.len() >= 2
384 && ((raw.starts_with('"') && raw.ends_with('"'))
385 || (raw.starts_with('\'') && raw.ends_with('\'')))
386 {
387 let text = raw[1..raw.len() - 1].to_string();
388 if !name.is_empty() {
389 return Some(XPathSegment {
390 name: name.to_string(),
391 text_equals: Some(text),
392 });
393 }
394 }
395 }
396
397 None
398 } else {
399 Some(XPathSegment {
400 name: trimmed.to_string(),
401 text_equals: None,
402 })
403 }
404}
405
406fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
407 if !node.is_element() {
408 return false;
409 }
410 if node.tag_name().name() != segment.name {
411 return false;
412 }
413 match &segment.text_equals {
414 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
415 None => true,
416 }
417}
418
419fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
426 let doc = match roxmltree::Document::parse(xml_body) {
427 Ok(doc) => doc,
428 Err(err) => {
429 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
430 return false;
431 }
432 };
433
434 let expr = xpath.trim();
435 if expr.is_empty() {
436 return false;
437 }
438
439 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
440 (true, rest)
441 } else if let Some(rest) = expr.strip_prefix('/') {
442 (false, rest)
443 } else {
444 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
445 return false;
446 };
447
448 let segments: Vec<XPathSegment> = path_str
449 .split('/')
450 .filter(|s| !s.trim().is_empty())
451 .filter_map(parse_xpath_segment)
452 .collect();
453
454 if segments.is_empty() {
455 return false;
456 }
457
458 if is_descendant {
459 let first = &segments[0];
460 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
461 let mut frontier = vec![node];
462 for segment in &segments[1..] {
463 let mut next_frontier = Vec::new();
464 for parent in &frontier {
465 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
466 next_frontier.push(child);
467 }
468 }
469 if next_frontier.is_empty() {
470 frontier.clear();
471 break;
472 }
473 frontier = next_frontier;
474 }
475 if !frontier.is_empty() {
476 return true;
477 }
478 }
479 false
480 } else {
481 let mut frontier = vec![doc.root_element()];
482 for (index, segment) in segments.iter().enumerate() {
483 let mut next_frontier = Vec::new();
484 for parent in &frontier {
485 if index == 0 {
486 if segment_matches(*parent, segment) {
487 next_frontier.push(*parent);
488 }
489 continue;
490 }
491 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
492 next_frontier.push(child);
493 }
494 }
495 if next_frontier.is_empty() {
496 return false;
497 }
498 frontier = next_frontier;
499 }
500 !frontier.is_empty()
501 }
502}
503
504fn evaluate_custom_matcher(
506 expression: &str,
507 method: &str,
508 path: &str,
509 headers: &std::collections::HashMap<String, String>,
510 query_params: &std::collections::HashMap<String, String>,
511 body: Option<&[u8]>,
512) -> bool {
513 use regex::Regex;
514
515 let expr = expression.trim();
516
517 if expr.contains("==") {
519 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
520 if parts.len() != 2 {
521 return false;
522 }
523
524 let field = parts[0];
525 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
526
527 match field {
528 "method" => method == expected_value,
529 "path" => path == expected_value,
530 _ if field.starts_with("headers.") => {
531 let header_name = &field[8..];
532 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
533 }
534 _ if field.starts_with("query.") => {
535 let param_name = &field[6..];
536 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
537 }
538 _ => false,
539 }
540 }
541 else if expr.contains("=~") {
543 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
544 if parts.len() != 2 {
545 return false;
546 }
547
548 let field = parts[0];
549 let pattern = parts[1].trim_matches('"').trim_matches('\'');
550
551 if let Ok(re) = Regex::new(pattern) {
552 match field {
553 "method" => re.is_match(method),
554 "path" => re.is_match(path),
555 _ if field.starts_with("headers.") => {
556 let header_name = &field[8..];
557 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
558 }
559 _ if field.starts_with("query.") => {
560 let param_name = &field[6..];
561 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
562 }
563 _ => false,
564 }
565 } else {
566 false
567 }
568 }
569 else if expr.contains("contains") {
571 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
572 if parts.len() != 2 {
573 return false;
574 }
575
576 let field = parts[0];
577 let search_value = parts[1].trim_matches('"').trim_matches('\'');
578
579 match field {
580 "path" => path.contains(search_value),
581 _ if field.starts_with("headers.") => {
582 let header_name = &field[8..];
583 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
584 }
585 _ if field.starts_with("body") => {
586 if let Some(body_bytes) = body {
587 let body_str = String::from_utf8_lossy(body_bytes);
588 body_str.contains(search_value)
589 } else {
590 false
591 }
592 }
593 _ => false,
594 }
595 } else {
596 tracing::warn!("Unknown custom matcher expression format: {}", expr);
598 false
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct ServerStats {
605 pub uptime_seconds: u64,
607 pub total_requests: u64,
609 pub active_mocks: usize,
611 pub enabled_mocks: usize,
613 pub registered_routes: usize,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct ServerConfig {
620 pub version: String,
622 pub port: u16,
624 pub has_openapi_spec: bool,
626 #[serde(skip_serializing_if = "Option::is_none")]
628 pub spec_path: Option<String>,
629}
630
631#[derive(Clone)]
633pub struct ManagementState {
634 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
636 pub spec: Option<Arc<OpenApiSpec>>,
638 pub spec_path: Option<String>,
640 pub port: u16,
642 pub start_time: std::time::Instant,
644 pub request_counter: Arc<RwLock<u64>>,
646 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
648 #[cfg(feature = "smtp")]
650 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
651 #[cfg(feature = "mqtt")]
653 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
654 #[cfg(feature = "kafka")]
656 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
657 #[cfg(any(feature = "mqtt", feature = "kafka"))]
659 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
660 pub state_machine_manager:
662 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
663 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
665 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
667 pub rule_explanations: Arc<
669 RwLock<
670 std::collections::HashMap<
671 String,
672 mockforge_core::intelligent_behavior::RuleExplanation,
673 >,
674 >,
675 >,
676 #[cfg(feature = "chaos")]
678 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
679 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
681}
682
683impl ManagementState {
684 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
691 Self {
692 mocks: Arc::new(RwLock::new(Vec::new())),
693 spec,
694 spec_path,
695 port,
696 start_time: std::time::Instant::now(),
697 request_counter: Arc::new(RwLock::new(0)),
698 proxy_config: None,
699 #[cfg(feature = "smtp")]
700 smtp_registry: None,
701 #[cfg(feature = "mqtt")]
702 mqtt_broker: None,
703 #[cfg(feature = "kafka")]
704 kafka_broker: None,
705 #[cfg(any(feature = "mqtt", feature = "kafka"))]
706 message_events: {
707 let capacity = get_message_broadcast_capacity();
708 let (tx, _) = broadcast::channel(capacity);
709 Arc::new(tx)
710 },
711 state_machine_manager: Arc::new(RwLock::new(
712 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
713 )),
714 ws_broadcast: None,
715 lifecycle_hooks: None,
716 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
717 #[cfg(feature = "chaos")]
718 chaos_api_state: None,
719 server_config: None,
720 }
721 }
722
723 pub fn with_lifecycle_hooks(
725 mut self,
726 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
727 ) -> Self {
728 self.lifecycle_hooks = Some(hooks);
729 self
730 }
731
732 pub fn with_ws_broadcast(
734 mut self,
735 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
736 ) -> Self {
737 self.ws_broadcast = Some(ws_broadcast);
738 self
739 }
740
741 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
743 self.proxy_config = Some(proxy_config);
744 self
745 }
746
747 #[cfg(feature = "smtp")]
748 pub fn with_smtp_registry(
750 mut self,
751 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
752 ) -> Self {
753 self.smtp_registry = Some(smtp_registry);
754 self
755 }
756
757 #[cfg(feature = "mqtt")]
758 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
760 self.mqtt_broker = Some(mqtt_broker);
761 self
762 }
763
764 #[cfg(feature = "kafka")]
765 pub fn with_kafka_broker(
767 mut self,
768 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
769 ) -> Self {
770 self.kafka_broker = Some(kafka_broker);
771 self
772 }
773
774 #[cfg(feature = "chaos")]
775 pub fn with_chaos_api_state(
777 mut self,
778 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
779 ) -> Self {
780 self.chaos_api_state = Some(chaos_api_state);
781 self
782 }
783
784 pub fn with_server_config(
786 mut self,
787 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
788 ) -> Self {
789 self.server_config = Some(server_config);
790 self
791 }
792}
793
794async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
796 let mocks = state.mocks.read().await;
797 Json(serde_json::json!({
798 "mocks": *mocks,
799 "total": mocks.len(),
800 "enabled": mocks.iter().filter(|m| m.enabled).count()
801 }))
802}
803
804async fn get_mock(
806 State(state): State<ManagementState>,
807 Path(id): Path<String>,
808) -> Result<Json<MockConfig>, StatusCode> {
809 let mocks = state.mocks.read().await;
810 mocks
811 .iter()
812 .find(|m| m.id == id)
813 .cloned()
814 .map(Json)
815 .ok_or(StatusCode::NOT_FOUND)
816}
817
818async fn create_mock(
820 State(state): State<ManagementState>,
821 Json(mut mock): Json<MockConfig>,
822) -> Result<Json<MockConfig>, StatusCode> {
823 let mut mocks = state.mocks.write().await;
824
825 if mock.id.is_empty() {
827 mock.id = uuid::Uuid::new_v4().to_string();
828 }
829
830 if mocks.iter().any(|m| m.id == mock.id) {
832 return Err(StatusCode::CONFLICT);
833 }
834
835 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
836
837 if let Some(hooks) = &state.lifecycle_hooks {
839 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
840 id: mock.id.clone(),
841 name: mock.name.clone(),
842 config: serde_json::to_value(&mock).unwrap_or_default(),
843 };
844 hooks.invoke_mock_created(&event).await;
845 }
846
847 mocks.push(mock.clone());
848
849 if let Some(tx) = &state.ws_broadcast {
851 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
852 }
853
854 Ok(Json(mock))
855}
856
857async fn update_mock(
859 State(state): State<ManagementState>,
860 Path(id): Path<String>,
861 Json(updated_mock): Json<MockConfig>,
862) -> Result<Json<MockConfig>, StatusCode> {
863 let mut mocks = state.mocks.write().await;
864
865 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
866
867 let old_mock = mocks[position].clone();
869
870 info!("Updating mock: {}", id);
871 mocks[position] = updated_mock.clone();
872
873 if let Some(hooks) = &state.lifecycle_hooks {
875 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
876 id: updated_mock.id.clone(),
877 name: updated_mock.name.clone(),
878 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
879 };
880 hooks.invoke_mock_updated(&event).await;
881
882 if old_mock.enabled != updated_mock.enabled {
884 let state_event = if updated_mock.enabled {
885 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
886 id: updated_mock.id.clone(),
887 }
888 } else {
889 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
890 id: updated_mock.id.clone(),
891 }
892 };
893 hooks.invoke_mock_state_changed(&state_event).await;
894 }
895 }
896
897 if let Some(tx) = &state.ws_broadcast {
899 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
900 }
901
902 Ok(Json(updated_mock))
903}
904
905async fn delete_mock(
907 State(state): State<ManagementState>,
908 Path(id): Path<String>,
909) -> Result<StatusCode, StatusCode> {
910 let mut mocks = state.mocks.write().await;
911
912 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
913
914 let deleted_mock = mocks[position].clone();
916
917 info!("Deleting mock: {}", id);
918 mocks.remove(position);
919
920 if let Some(hooks) = &state.lifecycle_hooks {
922 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
923 id: deleted_mock.id.clone(),
924 name: deleted_mock.name.clone(),
925 };
926 hooks.invoke_mock_deleted(&event).await;
927 }
928
929 if let Some(tx) = &state.ws_broadcast {
931 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
932 }
933
934 Ok(StatusCode::NO_CONTENT)
935}
936
937#[derive(Debug, Deserialize)]
939pub struct ValidateConfigRequest {
940 pub config: serde_json::Value,
942 #[serde(default = "default_format")]
944 pub format: String,
945}
946
947fn default_format() -> String {
948 "json".to_string()
949}
950
951async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
953 use mockforge_core::config::ServerConfig;
954
955 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
956 "yaml" | "yml" => {
957 let yaml_str = match serde_json::to_string(&request.config) {
958 Ok(s) => s,
959 Err(e) => {
960 return (
961 StatusCode::BAD_REQUEST,
962 Json(serde_json::json!({
963 "valid": false,
964 "error": format!("Failed to convert to string: {}", e),
965 "message": "Configuration validation failed"
966 })),
967 )
968 .into_response();
969 }
970 };
971 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
972 }
973 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
974 };
975
976 match config_result {
977 Ok(_) => Json(serde_json::json!({
978 "valid": true,
979 "message": "Configuration is valid"
980 }))
981 .into_response(),
982 Err(e) => (
983 StatusCode::BAD_REQUEST,
984 Json(serde_json::json!({
985 "valid": false,
986 "error": format!("Invalid configuration: {}", e),
987 "message": "Configuration validation failed"
988 })),
989 )
990 .into_response(),
991 }
992}
993
994#[derive(Debug, Deserialize)]
996pub struct BulkConfigUpdateRequest {
997 pub updates: serde_json::Value,
999}
1000
1001async fn bulk_update_config(
1009 State(state): State<ManagementState>,
1010 Json(request): Json<BulkConfigUpdateRequest>,
1011) -> impl IntoResponse {
1012 if !request.updates.is_object() {
1014 return (
1015 StatusCode::BAD_REQUEST,
1016 Json(serde_json::json!({
1017 "error": "Invalid request",
1018 "message": "Updates must be a JSON object"
1019 })),
1020 )
1021 .into_response();
1022 }
1023
1024 use mockforge_core::config::ServerConfig;
1026
1027 let base_config = ServerConfig::default();
1029 let base_json = match serde_json::to_value(&base_config) {
1030 Ok(v) => v,
1031 Err(e) => {
1032 return (
1033 StatusCode::INTERNAL_SERVER_ERROR,
1034 Json(serde_json::json!({
1035 "error": "Internal error",
1036 "message": format!("Failed to serialize base config: {}", e)
1037 })),
1038 )
1039 .into_response();
1040 }
1041 };
1042
1043 let mut merged = base_json.clone();
1045 if let (Some(merged_obj), Some(updates_obj)) =
1046 (merged.as_object_mut(), request.updates.as_object())
1047 {
1048 for (key, value) in updates_obj {
1049 merged_obj.insert(key.clone(), value.clone());
1050 }
1051 }
1052
1053 match serde_json::from_value::<ServerConfig>(merged) {
1055 Ok(validated_config) => {
1056 if let Some(ref config_lock) = state.server_config {
1058 let mut config = config_lock.write().await;
1059 *config = validated_config;
1060 Json(serde_json::json!({
1061 "success": true,
1062 "message": "Bulk configuration update applied successfully",
1063 "updates_received": request.updates,
1064 "validated": true,
1065 "applied": true
1066 }))
1067 .into_response()
1068 } else {
1069 Json(serde_json::json!({
1070 "success": true,
1071 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1072 "updates_received": request.updates,
1073 "validated": true,
1074 "applied": false
1075 }))
1076 .into_response()
1077 }
1078 }
1079 Err(e) => (
1080 StatusCode::BAD_REQUEST,
1081 Json(serde_json::json!({
1082 "error": "Invalid configuration",
1083 "message": format!("Configuration validation failed: {}", e),
1084 "validated": false
1085 })),
1086 )
1087 .into_response(),
1088 }
1089}
1090
1091async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1093 let mocks = state.mocks.read().await;
1094 let request_count = *state.request_counter.read().await;
1095
1096 Json(ServerStats {
1097 uptime_seconds: state.start_time.elapsed().as_secs(),
1098 total_requests: request_count,
1099 active_mocks: mocks.len(),
1100 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1101 registered_routes: mocks.len(), })
1103}
1104
1105async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1107 Json(ServerConfig {
1108 version: env!("CARGO_PKG_VERSION").to_string(),
1109 port: state.port,
1110 has_openapi_spec: state.spec.is_some(),
1111 spec_path: state.spec_path.clone(),
1112 })
1113}
1114
1115async fn health_check() -> Json<serde_json::Value> {
1117 Json(serde_json::json!({
1118 "status": "healthy",
1119 "service": "mockforge-management",
1120 "timestamp": chrono::Utc::now().to_rfc3339()
1121 }))
1122}
1123
1124#[derive(Debug, Clone, Serialize, Deserialize)]
1126#[serde(rename_all = "lowercase")]
1127pub enum ExportFormat {
1128 Json,
1130 Yaml,
1132}
1133
1134async fn export_mocks(
1136 State(state): State<ManagementState>,
1137 Query(params): Query<std::collections::HashMap<String, String>>,
1138) -> Result<(StatusCode, String), StatusCode> {
1139 let mocks = state.mocks.read().await;
1140
1141 let format = params
1142 .get("format")
1143 .map(|f| match f.as_str() {
1144 "yaml" | "yml" => ExportFormat::Yaml,
1145 _ => ExportFormat::Json,
1146 })
1147 .unwrap_or(ExportFormat::Json);
1148
1149 match format {
1150 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1151 .map(|json| (StatusCode::OK, json))
1152 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1153 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1154 .map(|yaml| (StatusCode::OK, yaml))
1155 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1156 }
1157}
1158
1159async fn import_mocks(
1161 State(state): State<ManagementState>,
1162 Json(mocks): Json<Vec<MockConfig>>,
1163) -> impl IntoResponse {
1164 let mut current_mocks = state.mocks.write().await;
1165 current_mocks.clear();
1166 current_mocks.extend(mocks);
1167 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1168}
1169
1170#[cfg(feature = "smtp")]
1171async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1173 if let Some(ref smtp_registry) = state.smtp_registry {
1174 match smtp_registry.get_emails() {
1175 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1176 Err(e) => (
1177 StatusCode::INTERNAL_SERVER_ERROR,
1178 Json(serde_json::json!({
1179 "error": "Failed to retrieve emails",
1180 "message": e.to_string()
1181 })),
1182 ),
1183 }
1184 } else {
1185 (
1186 StatusCode::NOT_IMPLEMENTED,
1187 Json(serde_json::json!({
1188 "error": "SMTP mailbox management not available",
1189 "message": "SMTP server is not enabled or registry not available."
1190 })),
1191 )
1192 }
1193}
1194
1195#[cfg(feature = "smtp")]
1197async fn get_smtp_email(
1198 State(state): State<ManagementState>,
1199 Path(id): Path<String>,
1200) -> impl IntoResponse {
1201 if let Some(ref smtp_registry) = state.smtp_registry {
1202 match smtp_registry.get_email_by_id(&id) {
1203 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1204 Ok(None) => (
1205 StatusCode::NOT_FOUND,
1206 Json(serde_json::json!({
1207 "error": "Email not found",
1208 "id": id
1209 })),
1210 ),
1211 Err(e) => (
1212 StatusCode::INTERNAL_SERVER_ERROR,
1213 Json(serde_json::json!({
1214 "error": "Failed to retrieve email",
1215 "message": e.to_string()
1216 })),
1217 ),
1218 }
1219 } else {
1220 (
1221 StatusCode::NOT_IMPLEMENTED,
1222 Json(serde_json::json!({
1223 "error": "SMTP mailbox management not available",
1224 "message": "SMTP server is not enabled or registry not available."
1225 })),
1226 )
1227 }
1228}
1229
1230#[cfg(feature = "smtp")]
1232async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1233 if let Some(ref smtp_registry) = state.smtp_registry {
1234 match smtp_registry.clear_mailbox() {
1235 Ok(()) => (
1236 StatusCode::OK,
1237 Json(serde_json::json!({
1238 "message": "Mailbox cleared successfully"
1239 })),
1240 ),
1241 Err(e) => (
1242 StatusCode::INTERNAL_SERVER_ERROR,
1243 Json(serde_json::json!({
1244 "error": "Failed to clear mailbox",
1245 "message": e.to_string()
1246 })),
1247 ),
1248 }
1249 } else {
1250 (
1251 StatusCode::NOT_IMPLEMENTED,
1252 Json(serde_json::json!({
1253 "error": "SMTP mailbox management not available",
1254 "message": "SMTP server is not enabled or registry not available."
1255 })),
1256 )
1257 }
1258}
1259
1260#[cfg(feature = "smtp")]
1262async fn export_smtp_mailbox(
1263 Query(params): Query<std::collections::HashMap<String, String>>,
1264) -> impl IntoResponse {
1265 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1266 (
1267 StatusCode::NOT_IMPLEMENTED,
1268 Json(serde_json::json!({
1269 "error": "SMTP mailbox management not available via HTTP API",
1270 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1271 "requested_format": format
1272 })),
1273 )
1274}
1275
1276#[cfg(feature = "smtp")]
1278async fn search_smtp_emails(
1279 State(state): State<ManagementState>,
1280 Query(params): Query<std::collections::HashMap<String, String>>,
1281) -> impl IntoResponse {
1282 if let Some(ref smtp_registry) = state.smtp_registry {
1283 let filters = EmailSearchFilters {
1284 sender: params.get("sender").cloned(),
1285 recipient: params.get("recipient").cloned(),
1286 subject: params.get("subject").cloned(),
1287 body: params.get("body").cloned(),
1288 since: params
1289 .get("since")
1290 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1291 .map(|dt| dt.with_timezone(&chrono::Utc)),
1292 until: params
1293 .get("until")
1294 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1295 .map(|dt| dt.with_timezone(&chrono::Utc)),
1296 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1297 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1298 };
1299
1300 match smtp_registry.search_emails(filters) {
1301 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1302 Err(e) => (
1303 StatusCode::INTERNAL_SERVER_ERROR,
1304 Json(serde_json::json!({
1305 "error": "Failed to search emails",
1306 "message": e.to_string()
1307 })),
1308 ),
1309 }
1310 } else {
1311 (
1312 StatusCode::NOT_IMPLEMENTED,
1313 Json(serde_json::json!({
1314 "error": "SMTP mailbox management not available",
1315 "message": "SMTP server is not enabled or registry not available."
1316 })),
1317 )
1318 }
1319}
1320
1321#[cfg(feature = "mqtt")]
1323#[derive(Debug, Clone, Serialize, Deserialize)]
1324pub struct MqttBrokerStats {
1325 pub connected_clients: usize,
1327 pub active_topics: usize,
1329 pub retained_messages: usize,
1331 pub total_subscriptions: usize,
1333}
1334
1335#[cfg(feature = "mqtt")]
1337async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1338 if let Some(broker) = &state.mqtt_broker {
1339 let connected_clients = broker.get_connected_clients().await.len();
1340 let active_topics = broker.get_active_topics().await.len();
1341 let stats = broker.get_topic_stats().await;
1342
1343 let broker_stats = MqttBrokerStats {
1344 connected_clients,
1345 active_topics,
1346 retained_messages: stats.retained_messages,
1347 total_subscriptions: stats.total_subscriptions,
1348 };
1349
1350 Json(broker_stats).into_response()
1351 } else {
1352 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1353 }
1354}
1355
1356#[cfg(feature = "mqtt")]
1357async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1358 if let Some(broker) = &state.mqtt_broker {
1359 let clients = broker.get_connected_clients().await;
1360 Json(serde_json::json!({
1361 "clients": clients
1362 }))
1363 .into_response()
1364 } else {
1365 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1366 }
1367}
1368
1369#[cfg(feature = "mqtt")]
1370async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1371 if let Some(broker) = &state.mqtt_broker {
1372 let topics = broker.get_active_topics().await;
1373 Json(serde_json::json!({
1374 "topics": topics
1375 }))
1376 .into_response()
1377 } else {
1378 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1379 }
1380}
1381
1382#[cfg(feature = "mqtt")]
1383async fn disconnect_mqtt_client(
1384 State(state): State<ManagementState>,
1385 Path(client_id): Path<String>,
1386) -> impl IntoResponse {
1387 if let Some(broker) = &state.mqtt_broker {
1388 match broker.disconnect_client(&client_id).await {
1389 Ok(_) => {
1390 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1391 }
1392 Err(e) => {
1393 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1394 .into_response()
1395 }
1396 }
1397 } else {
1398 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1399 }
1400}
1401
1402#[cfg(feature = "mqtt")]
1405#[derive(Debug, Deserialize)]
1407pub struct MqttPublishRequest {
1408 pub topic: String,
1410 pub payload: String,
1412 #[serde(default = "default_qos")]
1414 pub qos: u8,
1415 #[serde(default)]
1417 pub retain: bool,
1418}
1419
1420#[cfg(feature = "mqtt")]
1421fn default_qos() -> u8 {
1422 0
1423}
1424
1425#[cfg(feature = "mqtt")]
1426async fn publish_mqtt_message_handler(
1428 State(state): State<ManagementState>,
1429 Json(request): Json<serde_json::Value>,
1430) -> impl IntoResponse {
1431 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1433 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1434 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1435 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1436
1437 if topic.is_none() || payload.is_none() {
1438 return (
1439 StatusCode::BAD_REQUEST,
1440 Json(serde_json::json!({
1441 "error": "Invalid request",
1442 "message": "Missing required fields: topic and payload"
1443 })),
1444 );
1445 }
1446
1447 let topic = topic.unwrap();
1448 let payload = payload.unwrap();
1449
1450 if let Some(broker) = &state.mqtt_broker {
1451 if qos > 2 {
1453 return (
1454 StatusCode::BAD_REQUEST,
1455 Json(serde_json::json!({
1456 "error": "Invalid QoS",
1457 "message": "QoS must be 0, 1, or 2"
1458 })),
1459 );
1460 }
1461
1462 let payload_bytes = payload.as_bytes().to_vec();
1464 let client_id = "mockforge-management-api".to_string();
1465
1466 let publish_result = broker
1467 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1468 .await
1469 .map_err(|e| format!("{}", e));
1470
1471 match publish_result {
1472 Ok(_) => {
1473 let event = MessageEvent::Mqtt(MqttMessageEvent {
1475 topic: topic.clone(),
1476 payload: payload.clone(),
1477 qos,
1478 retain,
1479 timestamp: chrono::Utc::now().to_rfc3339(),
1480 });
1481 let _ = state.message_events.send(event);
1482
1483 (
1484 StatusCode::OK,
1485 Json(serde_json::json!({
1486 "success": true,
1487 "message": format!("Message published to topic '{}'", topic),
1488 "topic": topic,
1489 "qos": qos,
1490 "retain": retain
1491 })),
1492 )
1493 }
1494 Err(error_msg) => (
1495 StatusCode::INTERNAL_SERVER_ERROR,
1496 Json(serde_json::json!({
1497 "error": "Failed to publish message",
1498 "message": error_msg
1499 })),
1500 ),
1501 }
1502 } else {
1503 (
1504 StatusCode::SERVICE_UNAVAILABLE,
1505 Json(serde_json::json!({
1506 "error": "MQTT broker not available",
1507 "message": "MQTT broker is not enabled or not available."
1508 })),
1509 )
1510 }
1511}
1512
1513#[cfg(not(feature = "mqtt"))]
1514async fn publish_mqtt_message_handler(
1516 State(_state): State<ManagementState>,
1517 Json(_request): Json<serde_json::Value>,
1518) -> impl IntoResponse {
1519 (
1520 StatusCode::SERVICE_UNAVAILABLE,
1521 Json(serde_json::json!({
1522 "error": "MQTT feature not enabled",
1523 "message": "MQTT support is not compiled into this build"
1524 })),
1525 )
1526}
1527
1528#[cfg(feature = "mqtt")]
1529#[derive(Debug, Deserialize)]
1531pub struct MqttBatchPublishRequest {
1532 pub messages: Vec<MqttPublishRequest>,
1534 #[serde(default = "default_delay")]
1536 pub delay_ms: u64,
1537}
1538
1539#[cfg(feature = "mqtt")]
1540fn default_delay() -> u64 {
1541 100
1542}
1543
1544#[cfg(feature = "mqtt")]
1545async fn publish_mqtt_batch_handler(
1547 State(state): State<ManagementState>,
1548 Json(request): Json<serde_json::Value>,
1549) -> impl IntoResponse {
1550 let messages_json = request.get("messages").and_then(|v| v.as_array());
1552 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1553
1554 if messages_json.is_none() {
1555 return (
1556 StatusCode::BAD_REQUEST,
1557 Json(serde_json::json!({
1558 "error": "Invalid request",
1559 "message": "Missing required field: messages"
1560 })),
1561 );
1562 }
1563
1564 let messages_json = messages_json.unwrap();
1565
1566 if let Some(broker) = &state.mqtt_broker {
1567 if messages_json.is_empty() {
1568 return (
1569 StatusCode::BAD_REQUEST,
1570 Json(serde_json::json!({
1571 "error": "Empty batch",
1572 "message": "At least one message is required"
1573 })),
1574 );
1575 }
1576
1577 let mut results = Vec::new();
1578 let client_id = "mockforge-management-api".to_string();
1579
1580 for (index, msg_json) in messages_json.iter().enumerate() {
1581 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1582 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1583 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1584 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1585
1586 if topic.is_none() || payload.is_none() {
1587 results.push(serde_json::json!({
1588 "index": index,
1589 "success": false,
1590 "error": "Missing required fields: topic and payload"
1591 }));
1592 continue;
1593 }
1594
1595 let topic = topic.unwrap();
1596 let payload = payload.unwrap();
1597
1598 if qos > 2 {
1600 results.push(serde_json::json!({
1601 "index": index,
1602 "success": false,
1603 "error": "Invalid QoS (must be 0, 1, or 2)"
1604 }));
1605 continue;
1606 }
1607
1608 let payload_bytes = payload.as_bytes().to_vec();
1610
1611 let publish_result = broker
1612 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1613 .await
1614 .map_err(|e| format!("{}", e));
1615
1616 match publish_result {
1617 Ok(_) => {
1618 let event = MessageEvent::Mqtt(MqttMessageEvent {
1620 topic: topic.clone(),
1621 payload: payload.clone(),
1622 qos,
1623 retain,
1624 timestamp: chrono::Utc::now().to_rfc3339(),
1625 });
1626 let _ = state.message_events.send(event);
1627
1628 results.push(serde_json::json!({
1629 "index": index,
1630 "success": true,
1631 "topic": topic,
1632 "qos": qos
1633 }));
1634 }
1635 Err(error_msg) => {
1636 results.push(serde_json::json!({
1637 "index": index,
1638 "success": false,
1639 "error": error_msg
1640 }));
1641 }
1642 }
1643
1644 if index < messages_json.len() - 1 && delay_ms > 0 {
1646 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1647 }
1648 }
1649
1650 let success_count =
1651 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1652
1653 (
1654 StatusCode::OK,
1655 Json(serde_json::json!({
1656 "success": true,
1657 "total": messages_json.len(),
1658 "succeeded": success_count,
1659 "failed": messages_json.len() - success_count,
1660 "results": results
1661 })),
1662 )
1663 } else {
1664 (
1665 StatusCode::SERVICE_UNAVAILABLE,
1666 Json(serde_json::json!({
1667 "error": "MQTT broker not available",
1668 "message": "MQTT broker is not enabled or not available."
1669 })),
1670 )
1671 }
1672}
1673
1674#[cfg(not(feature = "mqtt"))]
1675async fn publish_mqtt_batch_handler(
1677 State(_state): State<ManagementState>,
1678 Json(_request): Json<serde_json::Value>,
1679) -> impl IntoResponse {
1680 (
1681 StatusCode::SERVICE_UNAVAILABLE,
1682 Json(serde_json::json!({
1683 "error": "MQTT feature not enabled",
1684 "message": "MQTT support is not compiled into this build"
1685 })),
1686 )
1687}
1688
1689#[derive(Debug, Deserialize)]
1693struct SetMigrationModeRequest {
1694 mode: String,
1695}
1696
1697async fn get_migration_routes(
1699 State(state): State<ManagementState>,
1700) -> Result<Json<serde_json::Value>, StatusCode> {
1701 let proxy_config = match &state.proxy_config {
1702 Some(config) => config,
1703 None => {
1704 return Ok(Json(serde_json::json!({
1705 "error": "Migration not configured. Proxy config not available."
1706 })));
1707 }
1708 };
1709
1710 let config = proxy_config.read().await;
1711 let routes = config.get_migration_routes();
1712
1713 Ok(Json(serde_json::json!({
1714 "routes": routes
1715 })))
1716}
1717
1718async fn toggle_route_migration(
1720 State(state): State<ManagementState>,
1721 Path(pattern): Path<String>,
1722) -> Result<Json<serde_json::Value>, StatusCode> {
1723 let proxy_config = match &state.proxy_config {
1724 Some(config) => config,
1725 None => {
1726 return Ok(Json(serde_json::json!({
1727 "error": "Migration not configured. Proxy config not available."
1728 })));
1729 }
1730 };
1731
1732 let mut config = proxy_config.write().await;
1733 let new_mode = match config.toggle_route_migration(&pattern) {
1734 Some(mode) => mode,
1735 None => {
1736 return Ok(Json(serde_json::json!({
1737 "error": format!("Route pattern not found: {}", pattern)
1738 })));
1739 }
1740 };
1741
1742 Ok(Json(serde_json::json!({
1743 "pattern": pattern,
1744 "mode": format!("{:?}", new_mode).to_lowercase()
1745 })))
1746}
1747
1748async fn set_route_migration_mode(
1750 State(state): State<ManagementState>,
1751 Path(pattern): Path<String>,
1752 Json(request): Json<SetMigrationModeRequest>,
1753) -> Result<Json<serde_json::Value>, StatusCode> {
1754 let proxy_config = match &state.proxy_config {
1755 Some(config) => config,
1756 None => {
1757 return Ok(Json(serde_json::json!({
1758 "error": "Migration not configured. Proxy config not available."
1759 })));
1760 }
1761 };
1762
1763 use mockforge_core::proxy::config::MigrationMode;
1764 let mode = match request.mode.to_lowercase().as_str() {
1765 "mock" => MigrationMode::Mock,
1766 "shadow" => MigrationMode::Shadow,
1767 "real" => MigrationMode::Real,
1768 "auto" => MigrationMode::Auto,
1769 _ => {
1770 return Ok(Json(serde_json::json!({
1771 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1772 })));
1773 }
1774 };
1775
1776 let mut config = proxy_config.write().await;
1777 let updated = config.update_rule_migration_mode(&pattern, mode);
1778
1779 if !updated {
1780 return Ok(Json(serde_json::json!({
1781 "error": format!("Route pattern not found: {}", pattern)
1782 })));
1783 }
1784
1785 Ok(Json(serde_json::json!({
1786 "pattern": pattern,
1787 "mode": format!("{:?}", mode).to_lowercase()
1788 })))
1789}
1790
1791async fn toggle_group_migration(
1793 State(state): State<ManagementState>,
1794 Path(group): Path<String>,
1795) -> Result<Json<serde_json::Value>, StatusCode> {
1796 let proxy_config = match &state.proxy_config {
1797 Some(config) => config,
1798 None => {
1799 return Ok(Json(serde_json::json!({
1800 "error": "Migration not configured. Proxy config not available."
1801 })));
1802 }
1803 };
1804
1805 let mut config = proxy_config.write().await;
1806 let new_mode = config.toggle_group_migration(&group);
1807
1808 Ok(Json(serde_json::json!({
1809 "group": group,
1810 "mode": format!("{:?}", new_mode).to_lowercase()
1811 })))
1812}
1813
1814async fn set_group_migration_mode(
1816 State(state): State<ManagementState>,
1817 Path(group): Path<String>,
1818 Json(request): Json<SetMigrationModeRequest>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820 let proxy_config = match &state.proxy_config {
1821 Some(config) => config,
1822 None => {
1823 return Ok(Json(serde_json::json!({
1824 "error": "Migration not configured. Proxy config not available."
1825 })));
1826 }
1827 };
1828
1829 use mockforge_core::proxy::config::MigrationMode;
1830 let mode = match request.mode.to_lowercase().as_str() {
1831 "mock" => MigrationMode::Mock,
1832 "shadow" => MigrationMode::Shadow,
1833 "real" => MigrationMode::Real,
1834 "auto" => MigrationMode::Auto,
1835 _ => {
1836 return Ok(Json(serde_json::json!({
1837 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1838 })));
1839 }
1840 };
1841
1842 let mut config = proxy_config.write().await;
1843 config.update_group_migration_mode(&group, mode);
1844
1845 Ok(Json(serde_json::json!({
1846 "group": group,
1847 "mode": format!("{:?}", mode).to_lowercase()
1848 })))
1849}
1850
1851async fn get_migration_groups(
1853 State(state): State<ManagementState>,
1854) -> Result<Json<serde_json::Value>, StatusCode> {
1855 let proxy_config = match &state.proxy_config {
1856 Some(config) => config,
1857 None => {
1858 return Ok(Json(serde_json::json!({
1859 "error": "Migration not configured. Proxy config not available."
1860 })));
1861 }
1862 };
1863
1864 let config = proxy_config.read().await;
1865 let groups = config.get_migration_groups();
1866
1867 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1869 .into_iter()
1870 .map(|(name, info)| {
1871 (
1872 name,
1873 serde_json::json!({
1874 "name": info.name,
1875 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1876 "route_count": info.route_count
1877 }),
1878 )
1879 })
1880 .collect();
1881
1882 Ok(Json(serde_json::json!(groups_json)))
1883}
1884
1885async fn get_migration_status(
1887 State(state): State<ManagementState>,
1888) -> Result<Json<serde_json::Value>, StatusCode> {
1889 let proxy_config = match &state.proxy_config {
1890 Some(config) => config,
1891 None => {
1892 return Ok(Json(serde_json::json!({
1893 "error": "Migration not configured. Proxy config not available."
1894 })));
1895 }
1896 };
1897
1898 let config = proxy_config.read().await;
1899 let routes = config.get_migration_routes();
1900 let groups = config.get_migration_groups();
1901
1902 let mut mock_count = 0;
1903 let mut shadow_count = 0;
1904 let mut real_count = 0;
1905 let mut auto_count = 0;
1906
1907 for route in &routes {
1908 match route.migration_mode {
1909 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1910 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1911 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1912 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1913 }
1914 }
1915
1916 Ok(Json(serde_json::json!({
1917 "total_routes": routes.len(),
1918 "mock_routes": mock_count,
1919 "shadow_routes": shadow_count,
1920 "real_routes": real_count,
1921 "auto_routes": auto_count,
1922 "total_groups": groups.len(),
1923 "migration_enabled": config.migration_enabled
1924 })))
1925}
1926
1927#[derive(Debug, Deserialize, Serialize)]
1931pub struct ProxyRuleRequest {
1932 pub pattern: String,
1934 #[serde(rename = "type")]
1936 pub rule_type: String,
1937 #[serde(default)]
1939 pub status_codes: Vec<u16>,
1940 pub body_transforms: Vec<BodyTransformRequest>,
1942 #[serde(default = "default_true")]
1944 pub enabled: bool,
1945}
1946
1947#[derive(Debug, Deserialize, Serialize)]
1949pub struct BodyTransformRequest {
1950 pub path: String,
1952 pub replace: String,
1954 #[serde(default)]
1956 pub operation: String,
1957}
1958
1959#[derive(Debug, Serialize)]
1961pub struct ProxyRuleResponse {
1962 pub id: usize,
1964 pub pattern: String,
1966 #[serde(rename = "type")]
1968 pub rule_type: String,
1969 pub status_codes: Vec<u16>,
1971 pub body_transforms: Vec<BodyTransformRequest>,
1973 pub enabled: bool,
1975}
1976
1977async fn list_proxy_rules(
1979 State(state): State<ManagementState>,
1980) -> Result<Json<serde_json::Value>, StatusCode> {
1981 let proxy_config = match &state.proxy_config {
1982 Some(config) => config,
1983 None => {
1984 return Ok(Json(serde_json::json!({
1985 "error": "Proxy not configured. Proxy config not available."
1986 })));
1987 }
1988 };
1989
1990 let config = proxy_config.read().await;
1991
1992 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
1993
1994 for (idx, rule) in config.request_replacements.iter().enumerate() {
1996 rules.push(ProxyRuleResponse {
1997 id: idx,
1998 pattern: rule.pattern.clone(),
1999 rule_type: "request".to_string(),
2000 status_codes: Vec::new(),
2001 body_transforms: rule
2002 .body_transforms
2003 .iter()
2004 .map(|t| BodyTransformRequest {
2005 path: t.path.clone(),
2006 replace: t.replace.clone(),
2007 operation: format!("{:?}", t.operation).to_lowercase(),
2008 })
2009 .collect(),
2010 enabled: rule.enabled,
2011 });
2012 }
2013
2014 let request_count = config.request_replacements.len();
2016 for (idx, rule) in config.response_replacements.iter().enumerate() {
2017 rules.push(ProxyRuleResponse {
2018 id: request_count + idx,
2019 pattern: rule.pattern.clone(),
2020 rule_type: "response".to_string(),
2021 status_codes: rule.status_codes.clone(),
2022 body_transforms: rule
2023 .body_transforms
2024 .iter()
2025 .map(|t| BodyTransformRequest {
2026 path: t.path.clone(),
2027 replace: t.replace.clone(),
2028 operation: format!("{:?}", t.operation).to_lowercase(),
2029 })
2030 .collect(),
2031 enabled: rule.enabled,
2032 });
2033 }
2034
2035 Ok(Json(serde_json::json!({
2036 "rules": rules
2037 })))
2038}
2039
2040async fn create_proxy_rule(
2042 State(state): State<ManagementState>,
2043 Json(request): Json<ProxyRuleRequest>,
2044) -> Result<Json<serde_json::Value>, StatusCode> {
2045 let proxy_config = match &state.proxy_config {
2046 Some(config) => config,
2047 None => {
2048 return Ok(Json(serde_json::json!({
2049 "error": "Proxy not configured. Proxy config not available."
2050 })));
2051 }
2052 };
2053
2054 if request.body_transforms.is_empty() {
2056 return Ok(Json(serde_json::json!({
2057 "error": "At least one body transform is required"
2058 })));
2059 }
2060
2061 let body_transforms: Vec<BodyTransform> = request
2062 .body_transforms
2063 .iter()
2064 .map(|t| {
2065 let op = match t.operation.as_str() {
2066 "replace" => TransformOperation::Replace,
2067 "add" => TransformOperation::Add,
2068 "remove" => TransformOperation::Remove,
2069 _ => TransformOperation::Replace,
2070 };
2071 BodyTransform {
2072 path: t.path.clone(),
2073 replace: t.replace.clone(),
2074 operation: op,
2075 }
2076 })
2077 .collect();
2078
2079 let new_rule = BodyTransformRule {
2080 pattern: request.pattern.clone(),
2081 status_codes: request.status_codes.clone(),
2082 body_transforms,
2083 enabled: request.enabled,
2084 };
2085
2086 let mut config = proxy_config.write().await;
2087
2088 let rule_id = if request.rule_type == "request" {
2089 config.request_replacements.push(new_rule);
2090 config.request_replacements.len() - 1
2091 } else if request.rule_type == "response" {
2092 config.response_replacements.push(new_rule);
2093 config.request_replacements.len() + config.response_replacements.len() - 1
2094 } else {
2095 return Ok(Json(serde_json::json!({
2096 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2097 })));
2098 };
2099
2100 Ok(Json(serde_json::json!({
2101 "id": rule_id,
2102 "message": "Rule created successfully"
2103 })))
2104}
2105
2106async fn get_proxy_rule(
2108 State(state): State<ManagementState>,
2109 Path(id): Path<String>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111 let proxy_config = match &state.proxy_config {
2112 Some(config) => config,
2113 None => {
2114 return Ok(Json(serde_json::json!({
2115 "error": "Proxy not configured. Proxy config not available."
2116 })));
2117 }
2118 };
2119
2120 let config = proxy_config.read().await;
2121 let rule_id: usize = match id.parse() {
2122 Ok(id) => id,
2123 Err(_) => {
2124 return Ok(Json(serde_json::json!({
2125 "error": format!("Invalid rule ID: {}", id)
2126 })));
2127 }
2128 };
2129
2130 let request_count = config.request_replacements.len();
2131
2132 if rule_id < request_count {
2133 let rule = &config.request_replacements[rule_id];
2135 Ok(Json(serde_json::json!({
2136 "id": rule_id,
2137 "pattern": rule.pattern,
2138 "type": "request",
2139 "status_codes": [],
2140 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2141 "path": t.path,
2142 "replace": t.replace,
2143 "operation": format!("{:?}", t.operation).to_lowercase()
2144 })).collect::<Vec<_>>(),
2145 "enabled": rule.enabled
2146 })))
2147 } else if rule_id < request_count + config.response_replacements.len() {
2148 let response_idx = rule_id - request_count;
2150 let rule = &config.response_replacements[response_idx];
2151 Ok(Json(serde_json::json!({
2152 "id": rule_id,
2153 "pattern": rule.pattern,
2154 "type": "response",
2155 "status_codes": rule.status_codes,
2156 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2157 "path": t.path,
2158 "replace": t.replace,
2159 "operation": format!("{:?}", t.operation).to_lowercase()
2160 })).collect::<Vec<_>>(),
2161 "enabled": rule.enabled
2162 })))
2163 } else {
2164 Ok(Json(serde_json::json!({
2165 "error": format!("Rule ID {} not found", rule_id)
2166 })))
2167 }
2168}
2169
2170async fn update_proxy_rule(
2172 State(state): State<ManagementState>,
2173 Path(id): Path<String>,
2174 Json(request): Json<ProxyRuleRequest>,
2175) -> Result<Json<serde_json::Value>, StatusCode> {
2176 let proxy_config = match &state.proxy_config {
2177 Some(config) => config,
2178 None => {
2179 return Ok(Json(serde_json::json!({
2180 "error": "Proxy not configured. Proxy config not available."
2181 })));
2182 }
2183 };
2184
2185 let mut config = proxy_config.write().await;
2186 let rule_id: usize = match id.parse() {
2187 Ok(id) => id,
2188 Err(_) => {
2189 return Ok(Json(serde_json::json!({
2190 "error": format!("Invalid rule ID: {}", id)
2191 })));
2192 }
2193 };
2194
2195 let body_transforms: Vec<BodyTransform> = request
2196 .body_transforms
2197 .iter()
2198 .map(|t| {
2199 let op = match t.operation.as_str() {
2200 "replace" => TransformOperation::Replace,
2201 "add" => TransformOperation::Add,
2202 "remove" => TransformOperation::Remove,
2203 _ => TransformOperation::Replace,
2204 };
2205 BodyTransform {
2206 path: t.path.clone(),
2207 replace: t.replace.clone(),
2208 operation: op,
2209 }
2210 })
2211 .collect();
2212
2213 let updated_rule = BodyTransformRule {
2214 pattern: request.pattern.clone(),
2215 status_codes: request.status_codes.clone(),
2216 body_transforms,
2217 enabled: request.enabled,
2218 };
2219
2220 let request_count = config.request_replacements.len();
2221
2222 if rule_id < request_count {
2223 config.request_replacements[rule_id] = updated_rule;
2225 } else if rule_id < request_count + config.response_replacements.len() {
2226 let response_idx = rule_id - request_count;
2228 config.response_replacements[response_idx] = updated_rule;
2229 } else {
2230 return Ok(Json(serde_json::json!({
2231 "error": format!("Rule ID {} not found", rule_id)
2232 })));
2233 }
2234
2235 Ok(Json(serde_json::json!({
2236 "id": rule_id,
2237 "message": "Rule updated successfully"
2238 })))
2239}
2240
2241async fn delete_proxy_rule(
2243 State(state): State<ManagementState>,
2244 Path(id): Path<String>,
2245) -> Result<Json<serde_json::Value>, StatusCode> {
2246 let proxy_config = match &state.proxy_config {
2247 Some(config) => config,
2248 None => {
2249 return Ok(Json(serde_json::json!({
2250 "error": "Proxy not configured. Proxy config not available."
2251 })));
2252 }
2253 };
2254
2255 let mut config = proxy_config.write().await;
2256 let rule_id: usize = match id.parse() {
2257 Ok(id) => id,
2258 Err(_) => {
2259 return Ok(Json(serde_json::json!({
2260 "error": format!("Invalid rule ID: {}", id)
2261 })));
2262 }
2263 };
2264
2265 let request_count = config.request_replacements.len();
2266
2267 if rule_id < request_count {
2268 config.request_replacements.remove(rule_id);
2270 } else if rule_id < request_count + config.response_replacements.len() {
2271 let response_idx = rule_id - request_count;
2273 config.response_replacements.remove(response_idx);
2274 } else {
2275 return Ok(Json(serde_json::json!({
2276 "error": format!("Rule ID {} not found", rule_id)
2277 })));
2278 }
2279
2280 Ok(Json(serde_json::json!({
2281 "id": rule_id,
2282 "message": "Rule deleted successfully"
2283 })))
2284}
2285
2286async fn get_proxy_inspect(
2288 State(state): State<ManagementState>,
2289 Query(params): Query<std::collections::HashMap<String, String>>,
2290) -> Result<Json<serde_json::Value>, StatusCode> {
2291 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2292 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2293
2294 let proxy_config = match &state.proxy_config {
2295 Some(config) => config.read().await,
2296 None => {
2297 return Ok(Json(serde_json::json!({
2298 "error": "Proxy not configured. Proxy config not available."
2299 })));
2300 }
2301 };
2302
2303 let mut rules = Vec::new();
2304 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2305 rules.push(serde_json::json!({
2306 "id": idx,
2307 "kind": "request",
2308 "pattern": rule.pattern,
2309 "enabled": rule.enabled,
2310 "status_codes": rule.status_codes,
2311 "transform_count": rule.body_transforms.len(),
2312 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2313 "path": t.path,
2314 "operation": t.operation,
2315 "replace": t.replace
2316 })).collect::<Vec<_>>()
2317 }));
2318 }
2319 let request_rule_count = rules.len();
2320 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2321 rules.push(serde_json::json!({
2322 "id": request_rule_count + idx,
2323 "kind": "response",
2324 "pattern": rule.pattern,
2325 "enabled": rule.enabled,
2326 "status_codes": rule.status_codes,
2327 "transform_count": rule.body_transforms.len(),
2328 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2329 "path": t.path,
2330 "operation": t.operation,
2331 "replace": t.replace
2332 })).collect::<Vec<_>>()
2333 }));
2334 }
2335
2336 let total = rules.len();
2337 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2338
2339 Ok(Json(serde_json::json!({
2340 "enabled": proxy_config.enabled,
2341 "target_url": proxy_config.target_url,
2342 "prefix": proxy_config.prefix,
2343 "timeout_seconds": proxy_config.timeout_seconds,
2344 "follow_redirects": proxy_config.follow_redirects,
2345 "passthrough_by_default": proxy_config.passthrough_by_default,
2346 "rules": paged_rules,
2347 "request_rule_count": request_rule_count,
2348 "response_rule_count": total.saturating_sub(request_rule_count),
2349 "limit": limit,
2350 "offset": offset,
2351 "total": total
2352 })))
2353}
2354
2355pub fn management_router(state: ManagementState) -> Router {
2357 let router = Router::new()
2358 .route("/health", get(health_check))
2359 .route("/stats", get(get_stats))
2360 .route("/config", get(get_config))
2361 .route("/config/validate", post(validate_config))
2362 .route("/config/bulk", post(bulk_update_config))
2363 .route("/mocks", get(list_mocks))
2364 .route("/mocks", post(create_mock))
2365 .route("/mocks/{id}", get(get_mock))
2366 .route("/mocks/{id}", put(update_mock))
2367 .route("/mocks/{id}", delete(delete_mock))
2368 .route("/export", get(export_mocks))
2369 .route("/import", post(import_mocks));
2370
2371 #[cfg(feature = "smtp")]
2372 let router = router
2373 .route("/smtp/mailbox", get(list_smtp_emails))
2374 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2375 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2376 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2377 .route("/smtp/mailbox/search", get(search_smtp_emails));
2378
2379 #[cfg(not(feature = "smtp"))]
2380 let router = router;
2381
2382 #[cfg(feature = "mqtt")]
2384 let router = router
2385 .route("/mqtt/stats", get(get_mqtt_stats))
2386 .route("/mqtt/clients", get(get_mqtt_clients))
2387 .route("/mqtt/topics", get(get_mqtt_topics))
2388 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2389 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2390 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2391 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2392
2393 #[cfg(not(feature = "mqtt"))]
2394 let router = router
2395 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2396 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2397
2398 #[cfg(feature = "kafka")]
2399 let router = router
2400 .route("/kafka/stats", get(get_kafka_stats))
2401 .route("/kafka/topics", get(get_kafka_topics))
2402 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2403 .route("/kafka/groups", get(get_kafka_groups))
2404 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2405 .route("/kafka/produce", post(produce_kafka_message))
2406 .route("/kafka/produce/batch", post(produce_kafka_batch))
2407 .route("/kafka/messages/stream", get(kafka_messages_stream));
2408
2409 #[cfg(not(feature = "kafka"))]
2410 let router = router;
2411
2412 let router = router
2414 .route("/migration/routes", get(get_migration_routes))
2415 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2416 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2417 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2418 .route("/migration/groups/{group}", put(set_group_migration_mode))
2419 .route("/migration/groups", get(get_migration_groups))
2420 .route("/migration/status", get(get_migration_status));
2421
2422 let router = router
2424 .route("/proxy/rules", get(list_proxy_rules))
2425 .route("/proxy/rules", post(create_proxy_rule))
2426 .route("/proxy/rules/{id}", get(get_proxy_rule))
2427 .route("/proxy/rules/{id}", put(update_proxy_rule))
2428 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2429 .route("/proxy/inspect", get(get_proxy_inspect));
2430
2431 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2433
2434 let router = router.nest(
2436 "/snapshot-diff",
2437 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2438 );
2439
2440 #[cfg(feature = "behavioral-cloning")]
2441 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2442
2443 let router = router
2444 .route("/mockai/learn", post(learn_from_examples))
2445 .route("/mockai/rules/explanations", get(list_rule_explanations))
2446 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2447 .route("/chaos/config", get(get_chaos_config))
2448 .route("/chaos/config", post(update_chaos_config))
2449 .route("/network/profiles", get(list_network_profiles))
2450 .route("/network/profile/apply", post(apply_network_profile));
2451
2452 let router =
2454 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2455
2456 router.with_state(state)
2457}
2458
2459#[cfg(feature = "kafka")]
2460#[derive(Debug, Clone, Serialize, Deserialize)]
2461pub struct KafkaBrokerStats {
2462 pub topics: usize,
2464 pub partitions: usize,
2466 pub consumer_groups: usize,
2468 pub messages_produced: u64,
2470 pub messages_consumed: u64,
2472}
2473
2474#[cfg(feature = "kafka")]
2475#[derive(Debug, Clone, Serialize, Deserialize)]
2476pub struct KafkaTopicInfo {
2477 pub name: String,
2478 pub partitions: usize,
2479 pub replication_factor: i32,
2480}
2481
2482#[cfg(feature = "kafka")]
2483#[derive(Debug, Clone, Serialize, Deserialize)]
2484pub struct KafkaConsumerGroupInfo {
2485 pub group_id: String,
2486 pub members: usize,
2487 pub state: String,
2488}
2489
2490#[cfg(feature = "kafka")]
2491async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2493 if let Some(broker) = &state.kafka_broker {
2494 let topics = broker.topics.read().await;
2495 let consumer_groups = broker.consumer_groups.read().await;
2496
2497 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2498
2499 let metrics_snapshot = broker.metrics().snapshot();
2501
2502 let stats = KafkaBrokerStats {
2503 topics: topics.len(),
2504 partitions: total_partitions,
2505 consumer_groups: consumer_groups.groups().len(),
2506 messages_produced: metrics_snapshot.messages_produced_total,
2507 messages_consumed: metrics_snapshot.messages_consumed_total,
2508 };
2509
2510 Json(stats).into_response()
2511 } else {
2512 (
2513 StatusCode::SERVICE_UNAVAILABLE,
2514 Json(serde_json::json!({
2515 "error": "Kafka broker not available",
2516 "message": "Kafka broker is not enabled or not available."
2517 })),
2518 )
2519 .into_response()
2520 }
2521}
2522
2523#[cfg(feature = "kafka")]
2524async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2526 if let Some(broker) = &state.kafka_broker {
2527 let topics = broker.topics.read().await;
2528 let topic_list: Vec<KafkaTopicInfo> = topics
2529 .iter()
2530 .map(|(name, topic)| KafkaTopicInfo {
2531 name: name.clone(),
2532 partitions: topic.partitions.len(),
2533 replication_factor: topic.config.replication_factor as i32,
2534 })
2535 .collect();
2536
2537 Json(serde_json::json!({
2538 "topics": topic_list
2539 }))
2540 .into_response()
2541 } else {
2542 (
2543 StatusCode::SERVICE_UNAVAILABLE,
2544 Json(serde_json::json!({
2545 "error": "Kafka broker not available",
2546 "message": "Kafka broker is not enabled or not available."
2547 })),
2548 )
2549 .into_response()
2550 }
2551}
2552
2553#[cfg(feature = "kafka")]
2554async fn get_kafka_topic(
2556 State(state): State<ManagementState>,
2557 Path(topic_name): Path<String>,
2558) -> impl IntoResponse {
2559 if let Some(broker) = &state.kafka_broker {
2560 let topics = broker.topics.read().await;
2561 if let Some(topic) = topics.get(&topic_name) {
2562 Json(serde_json::json!({
2563 "name": topic_name,
2564 "partitions": topic.partitions.len(),
2565 "replication_factor": topic.config.replication_factor,
2566 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2567 "id": idx as i32,
2568 "leader": 0,
2569 "replicas": vec![0],
2570 "message_count": partition.messages.len()
2571 })).collect::<Vec<_>>()
2572 })).into_response()
2573 } else {
2574 (
2575 StatusCode::NOT_FOUND,
2576 Json(serde_json::json!({
2577 "error": "Topic not found",
2578 "topic": topic_name
2579 })),
2580 )
2581 .into_response()
2582 }
2583 } else {
2584 (
2585 StatusCode::SERVICE_UNAVAILABLE,
2586 Json(serde_json::json!({
2587 "error": "Kafka broker not available",
2588 "message": "Kafka broker is not enabled or not available."
2589 })),
2590 )
2591 .into_response()
2592 }
2593}
2594
2595#[cfg(feature = "kafka")]
2596async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2598 if let Some(broker) = &state.kafka_broker {
2599 let consumer_groups = broker.consumer_groups.read().await;
2600 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2601 .groups()
2602 .iter()
2603 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2604 group_id: group_id.clone(),
2605 members: group.members.len(),
2606 state: "Stable".to_string(), })
2608 .collect();
2609
2610 Json(serde_json::json!({
2611 "groups": groups
2612 }))
2613 .into_response()
2614 } else {
2615 (
2616 StatusCode::SERVICE_UNAVAILABLE,
2617 Json(serde_json::json!({
2618 "error": "Kafka broker not available",
2619 "message": "Kafka broker is not enabled or not available."
2620 })),
2621 )
2622 .into_response()
2623 }
2624}
2625
2626#[cfg(feature = "kafka")]
2627async fn get_kafka_group(
2629 State(state): State<ManagementState>,
2630 Path(group_id): Path<String>,
2631) -> impl IntoResponse {
2632 if let Some(broker) = &state.kafka_broker {
2633 let consumer_groups = broker.consumer_groups.read().await;
2634 if let Some(group) = consumer_groups.groups().get(&group_id) {
2635 Json(serde_json::json!({
2636 "group_id": group_id,
2637 "members": group.members.len(),
2638 "state": "Stable",
2639 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2640 "member_id": member_id,
2641 "client_id": member.client_id,
2642 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2643 "topic": a.topic,
2644 "partitions": a.partitions
2645 })).collect::<Vec<_>>()
2646 })).collect::<Vec<_>>(),
2647 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2648 "topic": topic,
2649 "partition": partition,
2650 "offset": offset
2651 })).collect::<Vec<_>>()
2652 })).into_response()
2653 } else {
2654 (
2655 StatusCode::NOT_FOUND,
2656 Json(serde_json::json!({
2657 "error": "Consumer group not found",
2658 "group_id": group_id
2659 })),
2660 )
2661 .into_response()
2662 }
2663 } else {
2664 (
2665 StatusCode::SERVICE_UNAVAILABLE,
2666 Json(serde_json::json!({
2667 "error": "Kafka broker not available",
2668 "message": "Kafka broker is not enabled or not available."
2669 })),
2670 )
2671 .into_response()
2672 }
2673}
2674
2675#[cfg(feature = "kafka")]
2678#[derive(Debug, Deserialize)]
2679pub struct KafkaProduceRequest {
2680 pub topic: String,
2682 #[serde(default)]
2684 pub key: Option<String>,
2685 pub value: String,
2687 #[serde(default)]
2689 pub partition: Option<i32>,
2690 #[serde(default)]
2692 pub headers: Option<std::collections::HashMap<String, String>>,
2693}
2694
2695#[cfg(feature = "kafka")]
2696async fn produce_kafka_message(
2698 State(state): State<ManagementState>,
2699 Json(request): Json<KafkaProduceRequest>,
2700) -> impl IntoResponse {
2701 if let Some(broker) = &state.kafka_broker {
2702 let mut topics = broker.topics.write().await;
2703
2704 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2706 mockforge_kafka::topics::Topic::new(
2707 request.topic.clone(),
2708 mockforge_kafka::topics::TopicConfig::default(),
2709 )
2710 });
2711
2712 let partition_id = if let Some(partition) = request.partition {
2714 partition
2715 } else {
2716 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2717 };
2718
2719 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2721 return (
2722 StatusCode::BAD_REQUEST,
2723 Json(serde_json::json!({
2724 "error": "Invalid partition",
2725 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2726 })),
2727 )
2728 .into_response();
2729 }
2730
2731 let key_clone = request.key.clone();
2733 let headers_clone = request.headers.clone();
2734 let message = mockforge_kafka::partitions::KafkaMessage {
2735 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2737 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2738 value: request.value.as_bytes().to_vec(),
2739 headers: headers_clone
2740 .clone()
2741 .unwrap_or_default()
2742 .into_iter()
2743 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2744 .collect(),
2745 };
2746
2747 match topic_entry.produce(partition_id, message).await {
2749 Ok(offset) => {
2750 if let Some(broker) = &state.kafka_broker {
2752 broker.metrics().record_messages_produced(1);
2753 }
2754
2755 #[cfg(feature = "kafka")]
2757 {
2758 let event = MessageEvent::Kafka(KafkaMessageEvent {
2759 topic: request.topic.clone(),
2760 key: key_clone,
2761 value: request.value.clone(),
2762 partition: partition_id,
2763 offset,
2764 headers: headers_clone,
2765 timestamp: chrono::Utc::now().to_rfc3339(),
2766 });
2767 let _ = state.message_events.send(event);
2768 }
2769
2770 Json(serde_json::json!({
2771 "success": true,
2772 "message": format!("Message produced to topic '{}'", request.topic),
2773 "topic": request.topic,
2774 "partition": partition_id,
2775 "offset": offset
2776 }))
2777 .into_response()
2778 }
2779 Err(e) => (
2780 StatusCode::INTERNAL_SERVER_ERROR,
2781 Json(serde_json::json!({
2782 "error": "Failed to produce message",
2783 "message": e.to_string()
2784 })),
2785 )
2786 .into_response(),
2787 }
2788 } else {
2789 (
2790 StatusCode::SERVICE_UNAVAILABLE,
2791 Json(serde_json::json!({
2792 "error": "Kafka broker not available",
2793 "message": "Kafka broker is not enabled or not available."
2794 })),
2795 )
2796 .into_response()
2797 }
2798}
2799
2800#[cfg(feature = "kafka")]
2801#[derive(Debug, Deserialize)]
2802pub struct KafkaBatchProduceRequest {
2803 pub messages: Vec<KafkaProduceRequest>,
2805 #[serde(default = "default_delay")]
2807 pub delay_ms: u64,
2808}
2809
2810#[cfg(feature = "kafka")]
2811async fn produce_kafka_batch(
2813 State(state): State<ManagementState>,
2814 Json(request): Json<KafkaBatchProduceRequest>,
2815) -> impl IntoResponse {
2816 if let Some(broker) = &state.kafka_broker {
2817 if request.messages.is_empty() {
2818 return (
2819 StatusCode::BAD_REQUEST,
2820 Json(serde_json::json!({
2821 "error": "Empty batch",
2822 "message": "At least one message is required"
2823 })),
2824 )
2825 .into_response();
2826 }
2827
2828 let mut results = Vec::new();
2829
2830 for (index, msg_request) in request.messages.iter().enumerate() {
2831 let mut topics = broker.topics.write().await;
2832
2833 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2835 mockforge_kafka::topics::Topic::new(
2836 msg_request.topic.clone(),
2837 mockforge_kafka::topics::TopicConfig::default(),
2838 )
2839 });
2840
2841 let partition_id = if let Some(partition) = msg_request.partition {
2843 partition
2844 } else {
2845 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2846 };
2847
2848 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2850 results.push(serde_json::json!({
2851 "index": index,
2852 "success": false,
2853 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2854 }));
2855 continue;
2856 }
2857
2858 let message = mockforge_kafka::partitions::KafkaMessage {
2860 offset: 0,
2861 timestamp: chrono::Utc::now().timestamp_millis(),
2862 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2863 value: msg_request.value.as_bytes().to_vec(),
2864 headers: msg_request
2865 .headers
2866 .clone()
2867 .unwrap_or_default()
2868 .into_iter()
2869 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2870 .collect(),
2871 };
2872
2873 match topic_entry.produce(partition_id, message).await {
2875 Ok(offset) => {
2876 if let Some(broker) = &state.kafka_broker {
2878 broker.metrics().record_messages_produced(1);
2879 }
2880
2881 let event = MessageEvent::Kafka(KafkaMessageEvent {
2883 topic: msg_request.topic.clone(),
2884 key: msg_request.key.clone(),
2885 value: msg_request.value.clone(),
2886 partition: partition_id,
2887 offset,
2888 headers: msg_request.headers.clone(),
2889 timestamp: chrono::Utc::now().to_rfc3339(),
2890 });
2891 let _ = state.message_events.send(event);
2892
2893 results.push(serde_json::json!({
2894 "index": index,
2895 "success": true,
2896 "topic": msg_request.topic,
2897 "partition": partition_id,
2898 "offset": offset
2899 }));
2900 }
2901 Err(e) => {
2902 results.push(serde_json::json!({
2903 "index": index,
2904 "success": false,
2905 "error": e.to_string()
2906 }));
2907 }
2908 }
2909
2910 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2912 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2913 }
2914 }
2915
2916 let success_count =
2917 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2918
2919 Json(serde_json::json!({
2920 "success": true,
2921 "total": request.messages.len(),
2922 "succeeded": success_count,
2923 "failed": request.messages.len() - success_count,
2924 "results": results
2925 }))
2926 .into_response()
2927 } else {
2928 (
2929 StatusCode::SERVICE_UNAVAILABLE,
2930 Json(serde_json::json!({
2931 "error": "Kafka broker not available",
2932 "message": "Kafka broker is not enabled or not available."
2933 })),
2934 )
2935 .into_response()
2936 }
2937}
2938
2939#[cfg(feature = "mqtt")]
2942async fn mqtt_messages_stream(
2944 State(state): State<ManagementState>,
2945 Query(params): Query<std::collections::HashMap<String, String>>,
2946) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
2947 let rx = state.message_events.subscribe();
2948 let topic_filter = params.get("topic").cloned();
2949
2950 let stream = stream::unfold(rx, move |mut rx| {
2951 let topic_filter = topic_filter.clone();
2952
2953 async move {
2954 loop {
2955 match rx.recv().await {
2956 Ok(MessageEvent::Mqtt(event)) => {
2957 if let Some(filter) = &topic_filter {
2959 if !event.topic.contains(filter) {
2960 continue;
2961 }
2962 }
2963
2964 let event_json = serde_json::json!({
2965 "protocol": "mqtt",
2966 "topic": event.topic,
2967 "payload": event.payload,
2968 "qos": event.qos,
2969 "retain": event.retain,
2970 "timestamp": event.timestamp,
2971 });
2972
2973 if let Ok(event_data) = serde_json::to_string(&event_json) {
2974 let sse_event = Event::default().event("mqtt_message").data(event_data);
2975 return Some((Ok(sse_event), rx));
2976 }
2977 }
2978 #[cfg(feature = "kafka")]
2979 Ok(MessageEvent::Kafka(_)) => {
2980 continue;
2982 }
2983 Err(broadcast::error::RecvError::Closed) => {
2984 return None;
2985 }
2986 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2987 warn!("MQTT message stream lagged, skipped {} messages", skipped);
2988 continue;
2989 }
2990 }
2991 }
2992 }
2993 });
2994
2995 Sse::new(stream).keep_alive(
2996 axum::response::sse::KeepAlive::new()
2997 .interval(std::time::Duration::from_secs(15))
2998 .text("keep-alive-text"),
2999 )
3000}
3001
3002#[cfg(feature = "kafka")]
3003async fn kafka_messages_stream(
3005 State(state): State<ManagementState>,
3006 Query(params): Query<std::collections::HashMap<String, String>>,
3007) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3008 let mut rx = state.message_events.subscribe();
3009 let topic_filter = params.get("topic").cloned();
3010
3011 let stream = stream::unfold(rx, move |mut rx| {
3012 let topic_filter = topic_filter.clone();
3013
3014 async move {
3015 loop {
3016 match rx.recv().await {
3017 #[cfg(feature = "mqtt")]
3018 Ok(MessageEvent::Mqtt(_)) => {
3019 continue;
3021 }
3022 Ok(MessageEvent::Kafka(event)) => {
3023 if let Some(filter) = &topic_filter {
3025 if !event.topic.contains(filter) {
3026 continue;
3027 }
3028 }
3029
3030 let event_json = serde_json::json!({
3031 "protocol": "kafka",
3032 "topic": event.topic,
3033 "key": event.key,
3034 "value": event.value,
3035 "partition": event.partition,
3036 "offset": event.offset,
3037 "headers": event.headers,
3038 "timestamp": event.timestamp,
3039 });
3040
3041 if let Ok(event_data) = serde_json::to_string(&event_json) {
3042 let sse_event =
3043 Event::default().event("kafka_message").data(event_data);
3044 return Some((Ok(sse_event), rx));
3045 }
3046 }
3047 Err(broadcast::error::RecvError::Closed) => {
3048 return None;
3049 }
3050 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3051 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3052 continue;
3053 }
3054 }
3055 }
3056 }
3057 });
3058
3059 Sse::new(stream).keep_alive(
3060 axum::response::sse::KeepAlive::new()
3061 .interval(std::time::Duration::from_secs(15))
3062 .text("keep-alive-text"),
3063 )
3064}
3065
3066#[derive(Debug, Deserialize)]
3070pub struct GenerateSpecRequest {
3071 pub query: String,
3073 pub spec_type: String,
3075 pub api_version: Option<String>,
3077}
3078
3079#[derive(Debug, Deserialize)]
3081pub struct GenerateOpenApiFromTrafficRequest {
3082 #[serde(default)]
3084 pub database_path: Option<String>,
3085 #[serde(default)]
3087 pub since: Option<String>,
3088 #[serde(default)]
3090 pub until: Option<String>,
3091 #[serde(default)]
3093 pub path_pattern: Option<String>,
3094 #[serde(default = "default_min_confidence")]
3096 pub min_confidence: f64,
3097}
3098
3099fn default_min_confidence() -> f64 {
3100 0.7
3101}
3102
3103#[cfg(feature = "data-faker")]
3105async fn generate_ai_spec(
3106 State(_state): State<ManagementState>,
3107 Json(request): Json<GenerateSpecRequest>,
3108) -> impl IntoResponse {
3109 use mockforge_data::rag::{
3110 config::{LlmProvider, RagConfig},
3111 engine::RagEngine,
3112 storage::DocumentStorage,
3113 };
3114 use std::sync::Arc;
3115
3116 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3118 .ok()
3119 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3120
3121 if api_key.is_none() {
3123 return (
3124 StatusCode::SERVICE_UNAVAILABLE,
3125 Json(serde_json::json!({
3126 "error": "AI service not configured",
3127 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3128 })),
3129 )
3130 .into_response();
3131 }
3132
3133 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3135 .unwrap_or_else(|_| "openai".to_string())
3136 .to_lowercase();
3137
3138 let provider = match provider_str.as_str() {
3139 "openai" => LlmProvider::OpenAI,
3140 "anthropic" => LlmProvider::Anthropic,
3141 "ollama" => LlmProvider::Ollama,
3142 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3143 _ => LlmProvider::OpenAI,
3144 };
3145
3146 let api_endpoint =
3147 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3148 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3149 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3150 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3151 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3152 });
3153
3154 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3155 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3156 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3157 LlmProvider::Ollama => "llama2".to_string(),
3158 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3159 });
3160
3161 let mut rag_config = RagConfig::default();
3163 rag_config.provider = provider;
3164 rag_config.api_endpoint = api_endpoint;
3165 rag_config.api_key = api_key;
3166 rag_config.model = model;
3167 rag_config.max_tokens = std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3168 .unwrap_or_else(|_| "4096".to_string())
3169 .parse()
3170 .unwrap_or(4096);
3171 rag_config.temperature = std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3172 .unwrap_or_else(|_| "0.3".to_string())
3173 .parse()
3174 .unwrap_or(0.3); rag_config.timeout_secs = std::env::var("MOCKFORGE_RAG_TIMEOUT")
3176 .unwrap_or_else(|_| "60".to_string())
3177 .parse()
3178 .unwrap_or(60);
3179 rag_config.max_context_length = std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3180 .unwrap_or_else(|_| "4000".to_string())
3181 .parse()
3182 .unwrap_or(4000);
3183
3184 let spec_type_label = match request.spec_type.as_str() {
3186 "openapi" => "OpenAPI 3.0",
3187 "graphql" => "GraphQL",
3188 "asyncapi" => "AsyncAPI",
3189 _ => "OpenAPI 3.0",
3190 };
3191
3192 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3193
3194 let prompt = format!(
3195 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3196
3197User Requirements:
3198{}
3199
3200Instructions:
32011. Generate a complete, valid {} specification
32022. Include all paths, operations, request/response schemas, and components
32033. Use realistic field names and data types
32044. Include proper descriptions and examples
32055. Follow {} best practices
32066. Return ONLY the specification, no additional explanation
32077. For OpenAPI, use version {}
3208
3209Return the specification in {} format."#,
3210 spec_type_label,
3211 request.query,
3212 spec_type_label,
3213 spec_type_label,
3214 api_version,
3215 if request.spec_type == "graphql" {
3216 "GraphQL SDL"
3217 } else {
3218 "YAML"
3219 }
3220 );
3221
3222 use mockforge_data::rag::storage::InMemoryStorage;
3227 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3228
3229 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3231 Ok(engine) => engine,
3232 Err(e) => {
3233 return (
3234 StatusCode::INTERNAL_SERVER_ERROR,
3235 Json(serde_json::json!({
3236 "error": "Failed to initialize RAG engine",
3237 "message": e.to_string()
3238 })),
3239 )
3240 .into_response();
3241 }
3242 };
3243
3244 match rag_engine.generate(&prompt, None).await {
3246 Ok(generated_text) => {
3247 let spec = if request.spec_type == "graphql" {
3249 extract_graphql_schema(&generated_text)
3251 } else {
3252 extract_yaml_spec(&generated_text)
3254 };
3255
3256 Json(serde_json::json!({
3257 "success": true,
3258 "spec": spec,
3259 "spec_type": request.spec_type,
3260 }))
3261 .into_response()
3262 }
3263 Err(e) => (
3264 StatusCode::INTERNAL_SERVER_ERROR,
3265 Json(serde_json::json!({
3266 "error": "AI generation failed",
3267 "message": e.to_string()
3268 })),
3269 )
3270 .into_response(),
3271 }
3272}
3273
3274#[cfg(not(feature = "data-faker"))]
3275async fn generate_ai_spec(
3276 State(_state): State<ManagementState>,
3277 Json(_request): Json<GenerateSpecRequest>,
3278) -> impl IntoResponse {
3279 (
3280 StatusCode::NOT_IMPLEMENTED,
3281 Json(serde_json::json!({
3282 "error": "AI features not enabled",
3283 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3284 })),
3285 )
3286 .into_response()
3287}
3288
3289#[cfg(feature = "behavioral-cloning")]
3291async fn generate_openapi_from_traffic(
3292 State(_state): State<ManagementState>,
3293 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3294) -> impl IntoResponse {
3295 use chrono::{DateTime, Utc};
3296 use mockforge_core::intelligent_behavior::{
3297 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3298 IntelligentBehaviorConfig,
3299 };
3300 use mockforge_recorder::{
3301 database::RecorderDatabase,
3302 openapi_export::{QueryFilters, RecordingsToOpenApi},
3303 };
3304 use std::path::PathBuf;
3305
3306 let db_path = if let Some(ref path) = request.database_path {
3308 PathBuf::from(path)
3309 } else {
3310 std::env::current_dir()
3311 .unwrap_or_else(|_| PathBuf::from("."))
3312 .join("recordings.db")
3313 };
3314
3315 let db = match RecorderDatabase::new(&db_path).await {
3317 Ok(db) => db,
3318 Err(e) => {
3319 return (
3320 StatusCode::BAD_REQUEST,
3321 Json(serde_json::json!({
3322 "error": "Database error",
3323 "message": format!("Failed to open recorder database: {}", e)
3324 })),
3325 )
3326 .into_response();
3327 }
3328 };
3329
3330 let since_dt = if let Some(ref since_str) = request.since {
3332 match DateTime::parse_from_rfc3339(since_str) {
3333 Ok(dt) => Some(dt.with_timezone(&Utc)),
3334 Err(e) => {
3335 return (
3336 StatusCode::BAD_REQUEST,
3337 Json(serde_json::json!({
3338 "error": "Invalid date format",
3339 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3340 })),
3341 )
3342 .into_response();
3343 }
3344 }
3345 } else {
3346 None
3347 };
3348
3349 let until_dt = if let Some(ref until_str) = request.until {
3350 match DateTime::parse_from_rfc3339(until_str) {
3351 Ok(dt) => Some(dt.with_timezone(&Utc)),
3352 Err(e) => {
3353 return (
3354 StatusCode::BAD_REQUEST,
3355 Json(serde_json::json!({
3356 "error": "Invalid date format",
3357 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3358 })),
3359 )
3360 .into_response();
3361 }
3362 }
3363 } else {
3364 None
3365 };
3366
3367 let query_filters = QueryFilters {
3369 since: since_dt,
3370 until: until_dt,
3371 path_pattern: request.path_pattern.clone(),
3372 min_status_code: None,
3373 max_requests: Some(1000),
3374 };
3375
3376 let exchanges_from_recorder =
3381 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3382 Ok(exchanges) => exchanges,
3383 Err(e) => {
3384 return (
3385 StatusCode::INTERNAL_SERVER_ERROR,
3386 Json(serde_json::json!({
3387 "error": "Query error",
3388 "message": format!("Failed to query HTTP exchanges: {}", e)
3389 })),
3390 )
3391 .into_response();
3392 }
3393 };
3394
3395 if exchanges_from_recorder.is_empty() {
3396 return (
3397 StatusCode::NOT_FOUND,
3398 Json(serde_json::json!({
3399 "error": "No exchanges found",
3400 "message": "No HTTP exchanges found matching the specified filters"
3401 })),
3402 )
3403 .into_response();
3404 }
3405
3406 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3408 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3409 .into_iter()
3410 .map(|e| LocalHttpExchange {
3411 method: e.method,
3412 path: e.path,
3413 query_params: e.query_params,
3414 headers: e.headers,
3415 body: e.body,
3416 body_encoding: e.body_encoding,
3417 status_code: e.status_code,
3418 response_headers: e.response_headers,
3419 response_body: e.response_body,
3420 response_body_encoding: e.response_body_encoding,
3421 timestamp: e.timestamp,
3422 })
3423 .collect();
3424
3425 let behavior_config = IntelligentBehaviorConfig::default();
3427 let gen_config = OpenApiGenerationConfig {
3428 min_confidence: request.min_confidence,
3429 behavior_model: Some(behavior_config.behavior_model),
3430 };
3431
3432 let generator = OpenApiSpecGenerator::new(gen_config);
3434 let result = match generator.generate_from_exchanges(exchanges).await {
3435 Ok(result) => result,
3436 Err(e) => {
3437 return (
3438 StatusCode::INTERNAL_SERVER_ERROR,
3439 Json(serde_json::json!({
3440 "error": "Generation error",
3441 "message": format!("Failed to generate OpenAPI spec: {}", e)
3442 })),
3443 )
3444 .into_response();
3445 }
3446 };
3447
3448 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3450 raw.clone()
3451 } else {
3452 match serde_json::to_value(&result.spec.spec) {
3453 Ok(json) => json,
3454 Err(e) => {
3455 return (
3456 StatusCode::INTERNAL_SERVER_ERROR,
3457 Json(serde_json::json!({
3458 "error": "Serialization error",
3459 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3460 })),
3461 )
3462 .into_response();
3463 }
3464 }
3465 };
3466
3467 let response = serde_json::json!({
3469 "spec": spec_json,
3470 "metadata": {
3471 "requests_analyzed": result.metadata.requests_analyzed,
3472 "paths_inferred": result.metadata.paths_inferred,
3473 "path_confidence": result.metadata.path_confidence,
3474 "generated_at": result.metadata.generated_at.to_rfc3339(),
3475 "duration_ms": result.metadata.duration_ms,
3476 }
3477 });
3478
3479 Json(response).into_response()
3480}
3481
3482async fn list_rule_explanations(
3484 State(state): State<ManagementState>,
3485 Query(params): Query<std::collections::HashMap<String, String>>,
3486) -> impl IntoResponse {
3487 use mockforge_core::intelligent_behavior::RuleType;
3488
3489 let explanations = state.rule_explanations.read().await;
3490 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3491
3492 if let Some(rule_type_str) = params.get("rule_type") {
3494 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3495 explanations_vec.retain(|e| e.rule_type == rule_type);
3496 }
3497 }
3498
3499 if let Some(min_confidence_str) = params.get("min_confidence") {
3501 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3502 explanations_vec.retain(|e| e.confidence >= min_confidence);
3503 }
3504 }
3505
3506 explanations_vec.sort_by(|a, b| {
3508 b.confidence
3509 .partial_cmp(&a.confidence)
3510 .unwrap_or(std::cmp::Ordering::Equal)
3511 .then_with(|| b.generated_at.cmp(&a.generated_at))
3512 });
3513
3514 Json(serde_json::json!({
3515 "explanations": explanations_vec,
3516 "total": explanations_vec.len(),
3517 }))
3518 .into_response()
3519}
3520
3521async fn get_rule_explanation(
3523 State(state): State<ManagementState>,
3524 Path(rule_id): Path<String>,
3525) -> impl IntoResponse {
3526 let explanations = state.rule_explanations.read().await;
3527
3528 match explanations.get(&rule_id) {
3529 Some(explanation) => Json(serde_json::json!({
3530 "explanation": explanation,
3531 }))
3532 .into_response(),
3533 None => (
3534 StatusCode::NOT_FOUND,
3535 Json(serde_json::json!({
3536 "error": "Rule explanation not found",
3537 "message": format!("No explanation found for rule ID: {}", rule_id)
3538 })),
3539 )
3540 .into_response(),
3541 }
3542}
3543
3544#[derive(Debug, Deserialize)]
3546pub struct LearnFromExamplesRequest {
3547 pub examples: Vec<ExamplePairRequest>,
3549 #[serde(default)]
3551 pub config: Option<serde_json::Value>,
3552}
3553
3554#[derive(Debug, Deserialize)]
3556pub struct ExamplePairRequest {
3557 pub request: serde_json::Value,
3559 pub response: serde_json::Value,
3561}
3562
3563async fn learn_from_examples(
3568 State(state): State<ManagementState>,
3569 Json(request): Json<LearnFromExamplesRequest>,
3570) -> impl IntoResponse {
3571 use mockforge_core::intelligent_behavior::{
3572 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3573 rule_generator::{ExamplePair, RuleGenerator},
3574 };
3575
3576 if request.examples.is_empty() {
3577 return (
3578 StatusCode::BAD_REQUEST,
3579 Json(serde_json::json!({
3580 "error": "No examples provided",
3581 "message": "At least one example pair is required"
3582 })),
3583 )
3584 .into_response();
3585 }
3586
3587 let example_pairs: Result<Vec<ExamplePair>, String> = request
3589 .examples
3590 .into_iter()
3591 .enumerate()
3592 .map(|(idx, ex)| {
3593 let method = ex
3595 .request
3596 .get("method")
3597 .and_then(|v| v.as_str())
3598 .map(|s| s.to_string())
3599 .unwrap_or_else(|| "GET".to_string());
3600 let path = ex
3601 .request
3602 .get("path")
3603 .and_then(|v| v.as_str())
3604 .map(|s| s.to_string())
3605 .unwrap_or_else(|| "/".to_string());
3606 let request_body = ex.request.get("body").cloned();
3607 let query_params = ex
3608 .request
3609 .get("query_params")
3610 .and_then(|v| v.as_object())
3611 .map(|obj| {
3612 obj.iter()
3613 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3614 .collect()
3615 })
3616 .unwrap_or_default();
3617 let headers = ex
3618 .request
3619 .get("headers")
3620 .and_then(|v| v.as_object())
3621 .map(|obj| {
3622 obj.iter()
3623 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3624 .collect()
3625 })
3626 .unwrap_or_default();
3627
3628 let status = ex
3630 .response
3631 .get("status_code")
3632 .or_else(|| ex.response.get("status"))
3633 .and_then(|v| v.as_u64())
3634 .map(|n| n as u16)
3635 .unwrap_or(200);
3636 let response_body = ex.response.get("body").cloned();
3637
3638 Ok(ExamplePair {
3639 method,
3640 path,
3641 request: request_body,
3642 status,
3643 response: response_body,
3644 query_params,
3645 headers,
3646 metadata: {
3647 let mut meta = std::collections::HashMap::new();
3648 meta.insert("source".to_string(), "api".to_string());
3649 meta.insert("example_index".to_string(), idx.to_string());
3650 meta
3651 },
3652 })
3653 })
3654 .collect();
3655
3656 let example_pairs = match example_pairs {
3657 Ok(pairs) => pairs,
3658 Err(e) => {
3659 return (
3660 StatusCode::BAD_REQUEST,
3661 Json(serde_json::json!({
3662 "error": "Invalid examples",
3663 "message": e
3664 })),
3665 )
3666 .into_response();
3667 }
3668 };
3669
3670 let behavior_config = if let Some(config_json) = request.config {
3672 serde_json::from_value(config_json)
3674 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3675 .behavior_model
3676 } else {
3677 BehaviorModelConfig::default()
3678 };
3679
3680 let generator = RuleGenerator::new(behavior_config);
3682
3683 let (rules, explanations) =
3685 match generator.generate_rules_with_explanations(example_pairs).await {
3686 Ok(result) => result,
3687 Err(e) => {
3688 return (
3689 StatusCode::INTERNAL_SERVER_ERROR,
3690 Json(serde_json::json!({
3691 "error": "Rule generation failed",
3692 "message": format!("Failed to generate rules: {}", e)
3693 })),
3694 )
3695 .into_response();
3696 }
3697 };
3698
3699 {
3701 let mut stored_explanations = state.rule_explanations.write().await;
3702 for explanation in &explanations {
3703 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3704 }
3705 }
3706
3707 let response = serde_json::json!({
3709 "success": true,
3710 "rules_generated": {
3711 "consistency_rules": rules.consistency_rules.len(),
3712 "schemas": rules.schemas.len(),
3713 "state_machines": rules.state_transitions.len(),
3714 "system_prompt": !rules.system_prompt.is_empty(),
3715 },
3716 "explanations": explanations.iter().map(|e| serde_json::json!({
3717 "rule_id": e.rule_id,
3718 "rule_type": e.rule_type,
3719 "confidence": e.confidence,
3720 "reasoning": e.reasoning,
3721 })).collect::<Vec<_>>(),
3722 "total_explanations": explanations.len(),
3723 });
3724
3725 Json(response).into_response()
3726}
3727
3728#[cfg(feature = "data-faker")]
3729fn extract_yaml_spec(text: &str) -> String {
3730 if let Some(start) = text.find("```yaml") {
3732 let yaml_start = text[start + 7..].trim_start();
3733 if let Some(end) = yaml_start.find("```") {
3734 return yaml_start[..end].trim().to_string();
3735 }
3736 }
3737 if let Some(start) = text.find("```") {
3738 let content_start = text[start + 3..].trim_start();
3739 if let Some(end) = content_start.find("```") {
3740 return content_start[..end].trim().to_string();
3741 }
3742 }
3743
3744 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3746 return text.trim().to_string();
3747 }
3748
3749 text.trim().to_string()
3751}
3752
3753#[cfg(feature = "data-faker")]
3755fn extract_graphql_schema(text: &str) -> String {
3756 if let Some(start) = text.find("```graphql") {
3758 let schema_start = text[start + 10..].trim_start();
3759 if let Some(end) = schema_start.find("```") {
3760 return schema_start[..end].trim().to_string();
3761 }
3762 }
3763 if let Some(start) = text.find("```") {
3764 let content_start = text[start + 3..].trim_start();
3765 if let Some(end) = content_start.find("```") {
3766 return content_start[..end].trim().to_string();
3767 }
3768 }
3769
3770 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3772 return text.trim().to_string();
3773 }
3774
3775 text.trim().to_string()
3776}
3777
3778async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3782 #[cfg(feature = "chaos")]
3783 {
3784 if let Some(chaos_state) = &_state.chaos_api_state {
3785 let config = chaos_state.config.read().await;
3786 Json(serde_json::json!({
3788 "enabled": config.enabled,
3789 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3790 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3791 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3792 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3793 }))
3794 .into_response()
3795 } else {
3796 Json(serde_json::json!({
3798 "enabled": false,
3799 "latency": null,
3800 "fault_injection": null,
3801 "rate_limit": null,
3802 "traffic_shaping": null,
3803 }))
3804 .into_response()
3805 }
3806 }
3807 #[cfg(not(feature = "chaos"))]
3808 {
3809 Json(serde_json::json!({
3811 "enabled": false,
3812 "latency": null,
3813 "fault_injection": null,
3814 "rate_limit": null,
3815 "traffic_shaping": null,
3816 }))
3817 .into_response()
3818 }
3819}
3820
3821#[derive(Debug, Deserialize)]
3823pub struct ChaosConfigUpdate {
3824 pub enabled: Option<bool>,
3826 pub latency: Option<serde_json::Value>,
3828 pub fault_injection: Option<serde_json::Value>,
3830 pub rate_limit: Option<serde_json::Value>,
3832 pub traffic_shaping: Option<serde_json::Value>,
3834}
3835
3836async fn update_chaos_config(
3838 State(_state): State<ManagementState>,
3839 Json(_config_update): Json<ChaosConfigUpdate>,
3840) -> impl IntoResponse {
3841 #[cfg(feature = "chaos")]
3842 {
3843 if let Some(chaos_state) = &_state.chaos_api_state {
3844 use mockforge_chaos::config::{
3845 ChaosConfig, FaultInjectionConfig, LatencyConfig, RateLimitConfig,
3846 TrafficShapingConfig,
3847 };
3848
3849 let mut config = chaos_state.config.write().await;
3850
3851 if let Some(enabled) = _config_update.enabled {
3853 config.enabled = enabled;
3854 }
3855
3856 if let Some(latency_json) = _config_update.latency {
3858 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3859 config.latency = Some(latency);
3860 }
3861 }
3862
3863 if let Some(fault_json) = _config_update.fault_injection {
3865 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3866 config.fault_injection = Some(fault);
3867 }
3868 }
3869
3870 if let Some(rate_json) = _config_update.rate_limit {
3872 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3873 config.rate_limit = Some(rate);
3874 }
3875 }
3876
3877 if let Some(traffic_json) = _config_update.traffic_shaping {
3879 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3880 config.traffic_shaping = Some(traffic);
3881 }
3882 }
3883
3884 drop(config);
3887
3888 info!("Chaos configuration updated successfully");
3889 Json(serde_json::json!({
3890 "success": true,
3891 "message": "Chaos configuration updated and applied"
3892 }))
3893 .into_response()
3894 } else {
3895 (
3896 StatusCode::SERVICE_UNAVAILABLE,
3897 Json(serde_json::json!({
3898 "success": false,
3899 "error": "Chaos API not available",
3900 "message": "Chaos engineering is not enabled or configured"
3901 })),
3902 )
3903 .into_response()
3904 }
3905 }
3906 #[cfg(not(feature = "chaos"))]
3907 {
3908 (
3909 StatusCode::NOT_IMPLEMENTED,
3910 Json(serde_json::json!({
3911 "success": false,
3912 "error": "Chaos feature not enabled",
3913 "message": "Chaos engineering feature is not compiled into this build"
3914 })),
3915 )
3916 .into_response()
3917 }
3918}
3919
3920async fn list_network_profiles() -> impl IntoResponse {
3924 use mockforge_core::network_profiles::NetworkProfileCatalog;
3925
3926 let catalog = NetworkProfileCatalog::default();
3927 let profiles: Vec<serde_json::Value> = catalog
3928 .list_profiles_with_description()
3929 .iter()
3930 .map(|(name, description)| {
3931 serde_json::json!({
3932 "name": name,
3933 "description": description,
3934 })
3935 })
3936 .collect();
3937
3938 Json(serde_json::json!({
3939 "profiles": profiles
3940 }))
3941 .into_response()
3942}
3943
3944#[derive(Debug, Deserialize)]
3945pub struct ApplyNetworkProfileRequest {
3947 pub profile_name: String,
3949}
3950
3951async fn apply_network_profile(
3953 State(state): State<ManagementState>,
3954 Json(request): Json<ApplyNetworkProfileRequest>,
3955) -> impl IntoResponse {
3956 use mockforge_core::network_profiles::NetworkProfileCatalog;
3957
3958 let catalog = NetworkProfileCatalog::default();
3959 if let Some(profile) = catalog.get(&request.profile_name) {
3960 if let Some(server_config) = &state.server_config {
3963 let mut config = server_config.write().await;
3964
3965 use mockforge_core::config::NetworkShapingConfig;
3967
3968 let network_shaping = NetworkShapingConfig {
3972 enabled: profile.traffic_shaping.bandwidth.enabled
3973 || profile.traffic_shaping.burst_loss.enabled,
3974 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
3976 max_connections: 1000, };
3978
3979 if let Some(ref mut chaos) = config.observability.chaos {
3982 chaos.traffic_shaping = Some(network_shaping);
3983 } else {
3984 use mockforge_core::config::ChaosEngConfig;
3986 config.observability.chaos = Some(ChaosEngConfig {
3987 enabled: true,
3988 latency: None,
3989 fault_injection: None,
3990 rate_limit: None,
3991 traffic_shaping: Some(network_shaping),
3992 scenario: None,
3993 });
3994 }
3995
3996 info!("Network profile '{}' applied to server configuration", request.profile_name);
3997 } else {
3998 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
3999 }
4000
4001 #[cfg(feature = "chaos")]
4003 {
4004 if let Some(chaos_state) = &state.chaos_api_state {
4005 use mockforge_chaos::config::TrafficShapingConfig;
4006
4007 let mut chaos_config = chaos_state.config.write().await;
4008 let chaos_traffic_shaping = TrafficShapingConfig {
4010 enabled: profile.traffic_shaping.bandwidth.enabled
4011 || profile.traffic_shaping.burst_loss.enabled,
4012 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4014 max_connections: 0,
4015 connection_timeout_ms: 30000,
4016 };
4017 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4018 chaos_config.enabled = true; drop(chaos_config);
4020 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4021 }
4022 }
4023
4024 Json(serde_json::json!({
4025 "success": true,
4026 "message": format!("Network profile '{}' applied", request.profile_name),
4027 "profile": {
4028 "name": profile.name,
4029 "description": profile.description,
4030 }
4031 }))
4032 .into_response()
4033 } else {
4034 (
4035 StatusCode::NOT_FOUND,
4036 Json(serde_json::json!({
4037 "error": "Profile not found",
4038 "message": format!("Network profile '{}' not found", request.profile_name)
4039 })),
4040 )
4041 .into_response()
4042 }
4043}
4044
4045pub fn management_router_with_ui_builder(
4047 state: ManagementState,
4048 server_config: mockforge_core::config::ServerConfig,
4049) -> Router {
4050 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4051
4052 let management = management_router(state);
4054
4055 let ui_builder_state = UIBuilderState::new(server_config);
4057 let ui_builder = create_ui_builder_router(ui_builder_state);
4058
4059 management.nest("/ui-builder", ui_builder)
4061}
4062
4063pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4065 use crate::spec_import::{spec_import_router, SpecImportState};
4066
4067 let management = management_router(state);
4069
4070 Router::new()
4072 .merge(management)
4073 .merge(spec_import_router(SpecImportState::new()))
4074}
4075
4076#[cfg(test)]
4077mod tests {
4078 use super::*;
4079
4080 #[tokio::test]
4081 async fn test_create_and_get_mock() {
4082 let state = ManagementState::new(None, None, 3000);
4083
4084 let mock = MockConfig {
4085 id: "test-1".to_string(),
4086 name: "Test Mock".to_string(),
4087 method: "GET".to_string(),
4088 path: "/test".to_string(),
4089 response: MockResponse {
4090 body: serde_json::json!({"message": "test"}),
4091 headers: None,
4092 },
4093 enabled: true,
4094 latency_ms: None,
4095 status_code: Some(200),
4096 request_match: None,
4097 priority: None,
4098 scenario: None,
4099 required_scenario_state: None,
4100 new_scenario_state: None,
4101 };
4102
4103 {
4105 let mut mocks = state.mocks.write().await;
4106 mocks.push(mock.clone());
4107 }
4108
4109 let mocks = state.mocks.read().await;
4111 let found = mocks.iter().find(|m| m.id == "test-1");
4112 assert!(found.is_some());
4113 assert_eq!(found.unwrap().name, "Test Mock");
4114 }
4115
4116 #[tokio::test]
4117 async fn test_server_stats() {
4118 let state = ManagementState::new(None, None, 3000);
4119
4120 {
4122 let mut mocks = state.mocks.write().await;
4123 mocks.push(MockConfig {
4124 id: "1".to_string(),
4125 name: "Mock 1".to_string(),
4126 method: "GET".to_string(),
4127 path: "/test1".to_string(),
4128 response: MockResponse {
4129 body: serde_json::json!({}),
4130 headers: None,
4131 },
4132 enabled: true,
4133 latency_ms: None,
4134 status_code: Some(200),
4135 request_match: None,
4136 priority: None,
4137 scenario: None,
4138 required_scenario_state: None,
4139 new_scenario_state: None,
4140 });
4141 mocks.push(MockConfig {
4142 id: "2".to_string(),
4143 name: "Mock 2".to_string(),
4144 method: "POST".to_string(),
4145 path: "/test2".to_string(),
4146 response: MockResponse {
4147 body: serde_json::json!({}),
4148 headers: None,
4149 },
4150 enabled: false,
4151 latency_ms: None,
4152 status_code: Some(201),
4153 request_match: None,
4154 priority: None,
4155 scenario: None,
4156 required_scenario_state: None,
4157 new_scenario_state: None,
4158 });
4159 }
4160
4161 let mocks = state.mocks.read().await;
4162 assert_eq!(mocks.len(), 2);
4163 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4164 }
4165
4166 #[test]
4167 fn test_mock_matches_request_with_xpath_absolute_path() {
4168 let mock = MockConfig {
4169 id: "xpath-1".to_string(),
4170 name: "XPath Match".to_string(),
4171 method: "POST".to_string(),
4172 path: "/xml".to_string(),
4173 response: MockResponse {
4174 body: serde_json::json!({"ok": true}),
4175 headers: None,
4176 },
4177 enabled: true,
4178 latency_ms: None,
4179 status_code: Some(200),
4180 request_match: Some(RequestMatchCriteria {
4181 xpath: Some("/root/order/id".to_string()),
4182 ..Default::default()
4183 }),
4184 priority: None,
4185 scenario: None,
4186 required_scenario_state: None,
4187 new_scenario_state: None,
4188 };
4189
4190 let body = br#"<root><order><id>123</id></order></root>"#;
4191 let headers = std::collections::HashMap::new();
4192 let query = std::collections::HashMap::new();
4193
4194 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4195 }
4196
4197 #[test]
4198 fn test_mock_matches_request_with_xpath_text_predicate() {
4199 let mock = MockConfig {
4200 id: "xpath-2".to_string(),
4201 name: "XPath Predicate Match".to_string(),
4202 method: "POST".to_string(),
4203 path: "/xml".to_string(),
4204 response: MockResponse {
4205 body: serde_json::json!({"ok": true}),
4206 headers: None,
4207 },
4208 enabled: true,
4209 latency_ms: None,
4210 status_code: Some(200),
4211 request_match: Some(RequestMatchCriteria {
4212 xpath: Some("//order/id[text()='123']".to_string()),
4213 ..Default::default()
4214 }),
4215 priority: None,
4216 scenario: None,
4217 required_scenario_state: None,
4218 new_scenario_state: None,
4219 };
4220
4221 let body = br#"<root><order><id>123</id></order></root>"#;
4222 let headers = std::collections::HashMap::new();
4223 let query = std::collections::HashMap::new();
4224
4225 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4226 }
4227
4228 #[test]
4229 fn test_mock_matches_request_with_xpath_no_match() {
4230 let mock = MockConfig {
4231 id: "xpath-3".to_string(),
4232 name: "XPath No Match".to_string(),
4233 method: "POST".to_string(),
4234 path: "/xml".to_string(),
4235 response: MockResponse {
4236 body: serde_json::json!({"ok": true}),
4237 headers: None,
4238 },
4239 enabled: true,
4240 latency_ms: None,
4241 status_code: Some(200),
4242 request_match: Some(RequestMatchCriteria {
4243 xpath: Some("//order/id[text()='456']".to_string()),
4244 ..Default::default()
4245 }),
4246 priority: None,
4247 scenario: None,
4248 required_scenario_state: None,
4249 new_scenario_state: None,
4250 };
4251
4252 let body = br#"<root><order><id>123</id></order></root>"#;
4253 let headers = std::collections::HashMap::new();
4254 let query = std::collections::HashMap::new();
4255
4256 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4257 }
4258}