1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(default, skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122 #[serde(default = "default_version")]
126 pub version: u64,
127}
128
129fn default_true() -> bool {
130 true
131}
132
133fn default_version() -> u64 {
134 0
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct MockResponse {
140 pub body: serde_json::Value,
142 #[serde(skip_serializing_if = "Option::is_none")]
144 pub headers: Option<std::collections::HashMap<String, String>>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, Default)]
149pub struct RequestMatchCriteria {
150 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
152 pub headers: std::collections::HashMap<String, String>,
153 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
155 pub query_params: std::collections::HashMap<String, String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub body_pattern: Option<String>,
159 #[serde(skip_serializing_if = "Option::is_none")]
161 pub json_path: Option<String>,
162 #[serde(skip_serializing_if = "Option::is_none")]
164 pub xpath: Option<String>,
165 #[serde(skip_serializing_if = "Option::is_none")]
167 pub custom_matcher: Option<String>,
168}
169
170pub fn mock_matches_request(
179 mock: &MockConfig,
180 method: &str,
181 path: &str,
182 headers: &std::collections::HashMap<String, String>,
183 query_params: &std::collections::HashMap<String, String>,
184 body: Option<&[u8]>,
185) -> bool {
186 use regex::Regex;
187
188 if !mock.enabled {
190 return false;
191 }
192
193 if mock.method.to_uppercase() != method.to_uppercase() {
195 return false;
196 }
197
198 if !path_matches_pattern(&mock.path, path) {
200 return false;
201 }
202
203 if let Some(criteria) = &mock.request_match {
205 for (key, expected_value) in &criteria.headers {
207 let header_key_lower = key.to_lowercase();
208 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
209
210 if let Some((_, actual_value)) = found {
211 if let Ok(re) = Regex::new(expected_value) {
213 if !re.is_match(actual_value) {
214 return false;
215 }
216 } else if actual_value != expected_value {
217 return false;
218 }
219 } else {
220 return false; }
222 }
223
224 for (key, expected_value) in &criteria.query_params {
226 if let Some(actual_value) = query_params.get(key) {
227 if actual_value != expected_value {
228 return false;
229 }
230 } else {
231 return false; }
233 }
234
235 if let Some(pattern) = &criteria.body_pattern {
237 if let Some(body_bytes) = body {
238 let body_str = String::from_utf8_lossy(body_bytes);
239 if let Ok(re) = Regex::new(pattern) {
241 if !re.is_match(&body_str) {
242 return false;
243 }
244 } else if body_str.as_ref() != pattern {
245 return false;
246 }
247 } else {
248 return false; }
250 }
251
252 if let Some(json_path) = &criteria.json_path {
254 if let Some(body_bytes) = body {
255 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
256 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
257 if !json_path_exists(&json_value, json_path) {
259 return false;
260 }
261 }
262 }
263 }
264 }
265
266 if let Some(xpath) = &criteria.xpath {
268 if let Some(body_bytes) = body {
269 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
270 if !xml_xpath_exists(body_str, xpath) {
271 return false;
272 }
273 } else {
274 return false;
275 }
276 } else {
277 return false; }
279 }
280
281 if let Some(custom) = &criteria.custom_matcher {
283 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
284 return false;
285 }
286 }
287 }
288
289 true
290}
291
292fn path_matches_pattern(pattern: &str, path: &str) -> bool {
294 if pattern == path {
296 return true;
297 }
298
299 if pattern == "*" {
301 return true;
302 }
303
304 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
306 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
307
308 if pattern_parts.len() != path_parts.len() {
309 if pattern.contains('*') {
311 return matches_wildcard_pattern(pattern, path);
312 }
313 return false;
314 }
315
316 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
317 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
319 continue; }
321
322 if pattern_part != path_part {
323 return false;
324 }
325 }
326
327 true
328}
329
330fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
332 use regex::Regex;
333
334 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
336
337 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
338 return re.is_match(path);
339 }
340
341 false
342}
343
344fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
352 let path = if json_path == "$" {
353 return true;
354 } else if let Some(p) = json_path.strip_prefix("$.") {
355 p
356 } else if let Some(p) = json_path.strip_prefix('$') {
357 p.strip_prefix('.').unwrap_or(p)
358 } else {
359 json_path
360 };
361
362 let mut current = json;
363 for segment in split_json_path_segments(path) {
364 match segment {
365 JsonPathSegment::Field(name) => {
366 if let Some(obj) = current.as_object() {
367 if let Some(value) = obj.get(name) {
368 current = value;
369 } else {
370 return false;
371 }
372 } else {
373 return false;
374 }
375 }
376 JsonPathSegment::Index(idx) => {
377 if let Some(arr) = current.as_array() {
378 if let Some(value) = arr.get(idx) {
379 current = value;
380 } else {
381 return false;
382 }
383 } else {
384 return false;
385 }
386 }
387 JsonPathSegment::Wildcard => {
388 if let Some(arr) = current.as_array() {
389 return !arr.is_empty();
390 }
391 return false;
392 }
393 }
394 }
395 true
396}
397
398enum JsonPathSegment<'a> {
399 Field(&'a str),
400 Index(usize),
401 Wildcard,
402}
403
404fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
406 let mut segments = Vec::new();
407 for part in path.split('.') {
408 if part.is_empty() {
409 continue;
410 }
411 if let Some(bracket_start) = part.find('[') {
412 let field_name = &part[..bracket_start];
413 if !field_name.is_empty() {
414 segments.push(JsonPathSegment::Field(field_name));
415 }
416 let bracket_content = &part[bracket_start + 1..part.len() - 1];
417 if bracket_content == "*" {
418 segments.push(JsonPathSegment::Wildcard);
419 } else if let Ok(idx) = bracket_content.parse::<usize>() {
420 segments.push(JsonPathSegment::Index(idx));
421 }
422 } else {
423 segments.push(JsonPathSegment::Field(part));
424 }
425 }
426 segments
427}
428
429#[derive(Debug, Clone, PartialEq, Eq)]
430struct XPathSegment {
431 name: String,
432 text_equals: Option<String>,
433}
434
435fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
436 if segment.is_empty() {
437 return None;
438 }
439
440 let trimmed = segment.trim();
441 if let Some(bracket_start) = trimmed.find('[') {
442 if !trimmed.ends_with(']') {
443 return None;
444 }
445
446 let name = trimmed[..bracket_start].trim();
447 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
448 let predicate = predicate.trim();
449
450 if let Some(raw) = predicate.strip_prefix("text()=") {
452 let raw = raw.trim();
453 if raw.len() >= 2
454 && ((raw.starts_with('"') && raw.ends_with('"'))
455 || (raw.starts_with('\'') && raw.ends_with('\'')))
456 {
457 let text = raw[1..raw.len() - 1].to_string();
458 if !name.is_empty() {
459 return Some(XPathSegment {
460 name: name.to_string(),
461 text_equals: Some(text),
462 });
463 }
464 }
465 }
466
467 None
468 } else {
469 Some(XPathSegment {
470 name: trimmed.to_string(),
471 text_equals: None,
472 })
473 }
474}
475
476fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
477 if !node.is_element() {
478 return false;
479 }
480 if node.tag_name().name() != segment.name {
481 return false;
482 }
483 match &segment.text_equals {
484 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
485 None => true,
486 }
487}
488
489fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
496 let doc = match roxmltree::Document::parse(xml_body) {
497 Ok(doc) => doc,
498 Err(err) => {
499 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
500 return false;
501 }
502 };
503
504 let expr = xpath.trim();
505 if expr.is_empty() {
506 return false;
507 }
508
509 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
510 (true, rest)
511 } else if let Some(rest) = expr.strip_prefix('/') {
512 (false, rest)
513 } else {
514 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
515 return false;
516 };
517
518 let segments: Vec<XPathSegment> = path_str
519 .split('/')
520 .filter(|s| !s.trim().is_empty())
521 .filter_map(parse_xpath_segment)
522 .collect();
523
524 if segments.is_empty() {
525 return false;
526 }
527
528 if is_descendant {
529 let first = &segments[0];
530 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
531 let mut frontier = vec![node];
532 for segment in &segments[1..] {
533 let mut next_frontier = Vec::new();
534 for parent in &frontier {
535 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
536 next_frontier.push(child);
537 }
538 }
539 if next_frontier.is_empty() {
540 frontier.clear();
541 break;
542 }
543 frontier = next_frontier;
544 }
545 if !frontier.is_empty() {
546 return true;
547 }
548 }
549 false
550 } else {
551 let mut frontier = vec![doc.root_element()];
552 for (index, segment) in segments.iter().enumerate() {
553 let mut next_frontier = Vec::new();
554 for parent in &frontier {
555 if index == 0 {
556 if segment_matches(*parent, segment) {
557 next_frontier.push(*parent);
558 }
559 continue;
560 }
561 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
562 next_frontier.push(child);
563 }
564 }
565 if next_frontier.is_empty() {
566 return false;
567 }
568 frontier = next_frontier;
569 }
570 !frontier.is_empty()
571 }
572}
573
574fn evaluate_custom_matcher(
576 expression: &str,
577 method: &str,
578 path: &str,
579 headers: &std::collections::HashMap<String, String>,
580 query_params: &std::collections::HashMap<String, String>,
581 body: Option<&[u8]>,
582) -> bool {
583 use regex::Regex;
584
585 let expr = expression.trim();
586
587 if expr.contains("==") {
589 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
590 if parts.len() != 2 {
591 return false;
592 }
593
594 let field = parts[0];
595 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
596
597 match field {
598 "method" => method == expected_value,
599 "path" => path == expected_value,
600 _ if field.starts_with("headers.") => {
601 let header_name = &field[8..];
602 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
603 }
604 _ if field.starts_with("query.") => {
605 let param_name = &field[6..];
606 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
607 }
608 _ => false,
609 }
610 }
611 else if expr.contains("=~") {
613 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
614 if parts.len() != 2 {
615 return false;
616 }
617
618 let field = parts[0];
619 let pattern = parts[1].trim_matches('"').trim_matches('\'');
620
621 if let Ok(re) = Regex::new(pattern) {
622 match field {
623 "method" => re.is_match(method),
624 "path" => re.is_match(path),
625 _ if field.starts_with("headers.") => {
626 let header_name = &field[8..];
627 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
628 }
629 _ if field.starts_with("query.") => {
630 let param_name = &field[6..];
631 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
632 }
633 _ => false,
634 }
635 } else {
636 false
637 }
638 }
639 else if expr.contains("contains") {
641 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
642 if parts.len() != 2 {
643 return false;
644 }
645
646 let field = parts[0];
647 let search_value = parts[1].trim_matches('"').trim_matches('\'');
648
649 match field {
650 "path" => path.contains(search_value),
651 _ if field.starts_with("headers.") => {
652 let header_name = &field[8..];
653 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
654 }
655 _ if field.starts_with("body") => {
656 if let Some(body_bytes) = body {
657 let body_str = String::from_utf8_lossy(body_bytes);
658 body_str.contains(search_value)
659 } else {
660 false
661 }
662 }
663 _ => false,
664 }
665 } else {
666 tracing::warn!("Unknown custom matcher expression format: {}", expr);
668 false
669 }
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct ServerStats {
675 pub uptime_seconds: u64,
677 pub total_requests: u64,
679 pub active_mocks: usize,
681 pub enabled_mocks: usize,
683 pub registered_routes: usize,
685}
686
687#[derive(Debug, Clone, Serialize, Deserialize)]
689pub struct ServerConfig {
690 pub version: String,
692 pub port: u16,
694 pub has_openapi_spec: bool,
696 #[serde(skip_serializing_if = "Option::is_none")]
698 pub spec_path: Option<String>,
699}
700
701#[derive(Clone)]
703pub struct ManagementState {
704 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
706 pub spec: Option<Arc<OpenApiSpec>>,
708 pub spec_path: Option<String>,
710 pub port: u16,
712 pub start_time: std::time::Instant,
714 pub request_counter: Arc<RwLock<u64>>,
716 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
718 #[cfg(feature = "smtp")]
720 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
721 #[cfg(feature = "mqtt")]
723 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
724 #[cfg(feature = "kafka")]
726 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
727 #[cfg(any(feature = "mqtt", feature = "kafka"))]
729 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
730 pub state_machine_manager:
732 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
733 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
735 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
737 pub rule_explanations: Arc<
739 RwLock<
740 std::collections::HashMap<
741 String,
742 mockforge_core::intelligent_behavior::RuleExplanation,
743 >,
744 >,
745 >,
746 #[cfg(feature = "chaos")]
748 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
749 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
751 #[cfg(feature = "conformance")]
753 pub conformance_state: crate::handlers::conformance::ConformanceState,
754}
755
756impl ManagementState {
757 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
764 Self {
765 mocks: Arc::new(RwLock::new(Vec::new())),
766 spec,
767 spec_path,
768 port,
769 start_time: std::time::Instant::now(),
770 request_counter: Arc::new(RwLock::new(0)),
771 proxy_config: None,
772 #[cfg(feature = "smtp")]
773 smtp_registry: None,
774 #[cfg(feature = "mqtt")]
775 mqtt_broker: None,
776 #[cfg(feature = "kafka")]
777 kafka_broker: None,
778 #[cfg(any(feature = "mqtt", feature = "kafka"))]
779 message_events: {
780 let capacity = get_message_broadcast_capacity();
781 let (tx, _) = broadcast::channel(capacity);
782 Arc::new(tx)
783 },
784 state_machine_manager: Arc::new(RwLock::new(
785 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
786 )),
787 ws_broadcast: None,
788 lifecycle_hooks: None,
789 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
790 #[cfg(feature = "chaos")]
791 chaos_api_state: None,
792 server_config: None,
793 #[cfg(feature = "conformance")]
794 conformance_state: crate::handlers::conformance::ConformanceState::new(),
795 }
796 }
797
798 pub fn with_lifecycle_hooks(
800 mut self,
801 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
802 ) -> Self {
803 self.lifecycle_hooks = Some(hooks);
804 self
805 }
806
807 pub fn with_ws_broadcast(
809 mut self,
810 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
811 ) -> Self {
812 self.ws_broadcast = Some(ws_broadcast);
813 self
814 }
815
816 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
818 self.proxy_config = Some(proxy_config);
819 self
820 }
821
822 #[cfg(feature = "smtp")]
823 pub fn with_smtp_registry(
825 mut self,
826 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
827 ) -> Self {
828 self.smtp_registry = Some(smtp_registry);
829 self
830 }
831
832 #[cfg(feature = "mqtt")]
833 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
835 self.mqtt_broker = Some(mqtt_broker);
836 self
837 }
838
839 #[cfg(feature = "kafka")]
840 pub fn with_kafka_broker(
842 mut self,
843 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
844 ) -> Self {
845 self.kafka_broker = Some(kafka_broker);
846 self
847 }
848
849 #[cfg(feature = "chaos")]
850 pub fn with_chaos_api_state(
852 mut self,
853 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
854 ) -> Self {
855 self.chaos_api_state = Some(chaos_api_state);
856 self
857 }
858
859 pub fn with_server_config(
861 mut self,
862 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
863 ) -> Self {
864 self.server_config = Some(server_config);
865 self
866 }
867}
868
869async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
871 let mocks = state.mocks.read().await;
872 Json(serde_json::json!({
873 "mocks": *mocks,
874 "total": mocks.len(),
875 "enabled": mocks.iter().filter(|m| m.enabled).count()
876 }))
877}
878
879async fn get_mock(
881 State(state): State<ManagementState>,
882 Path(id): Path<String>,
883) -> Result<Json<MockConfig>, StatusCode> {
884 let mocks = state.mocks.read().await;
885 mocks
886 .iter()
887 .find(|m| m.id == id)
888 .cloned()
889 .map(Json)
890 .ok_or(StatusCode::NOT_FOUND)
891}
892
893async fn create_mock(
895 State(state): State<ManagementState>,
896 Json(mut mock): Json<MockConfig>,
897) -> Result<Json<MockConfig>, StatusCode> {
898 let mut mocks = state.mocks.write().await;
899
900 if mock.id.is_empty() {
902 mock.id = uuid::Uuid::new_v4().to_string();
903 }
904
905 if mocks.iter().any(|m| m.id == mock.id) {
907 return Err(StatusCode::CONFLICT);
908 }
909
910 mock.version = 1;
912
913 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
914
915 if let Some(hooks) = &state.lifecycle_hooks {
917 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
918 id: mock.id.clone(),
919 name: mock.name.clone(),
920 config: serde_json::to_value(&mock).unwrap_or_default(),
921 };
922 hooks.invoke_mock_created(&event).await;
923 }
924
925 mocks.push(mock.clone());
926
927 if let Some(tx) = &state.ws_broadcast {
929 if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone())) {
930 tracing::warn!("Failed to broadcast mock_created event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
931 }
932 }
933
934 Ok(Json(mock))
935}
936
937async fn update_mock(
939 State(state): State<ManagementState>,
940 Path(id): Path<String>,
941 Json(mut updated_mock): Json<MockConfig>,
942) -> Result<Json<MockConfig>, (StatusCode, Json<serde_json::Value>)> {
943 let mut mocks = state.mocks.write().await;
944
945 let position = mocks
946 .iter()
947 .position(|m| m.id == id)
948 .ok_or((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Mock not found"}))))?;
949
950 let old_mock = mocks[position].clone();
952
953 if updated_mock.version != old_mock.version {
955 return Err((
956 StatusCode::CONFLICT,
957 Json(serde_json::json!({
958 "error": "Version conflict",
959 "message": format!(
960 "Mock '{}' has been modified by another request. \
961 Expected version {}, but current version is {}. \
962 Please fetch the latest version and retry.",
963 id, updated_mock.version, old_mock.version
964 ),
965 "current_version": old_mock.version
966 })),
967 ));
968 }
969
970 updated_mock.version = old_mock.version + 1;
972
973 info!(
974 "Updating mock: {} (version {} -> {})",
975 id, old_mock.version, updated_mock.version
976 );
977 mocks[position] = updated_mock.clone();
978
979 if let Some(hooks) = &state.lifecycle_hooks {
981 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
982 id: updated_mock.id.clone(),
983 name: updated_mock.name.clone(),
984 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
985 };
986 hooks.invoke_mock_updated(&event).await;
987
988 if old_mock.enabled != updated_mock.enabled {
990 let state_event = if updated_mock.enabled {
991 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
992 id: updated_mock.id.clone(),
993 }
994 } else {
995 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
996 id: updated_mock.id.clone(),
997 }
998 };
999 hooks.invoke_mock_state_changed(&state_event).await;
1000 }
1001 }
1002
1003 if let Some(tx) = &state.ws_broadcast {
1005 if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()))
1006 {
1007 tracing::warn!("Failed to broadcast mock_updated event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
1008 }
1009 }
1010
1011 Ok(Json(updated_mock))
1012}
1013
1014async fn delete_mock(
1016 State(state): State<ManagementState>,
1017 Path(id): Path<String>,
1018) -> Result<StatusCode, StatusCode> {
1019 let mut mocks = state.mocks.write().await;
1020
1021 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
1022
1023 let deleted_mock = mocks[position].clone();
1025
1026 info!("Deleting mock: {}", id);
1027 mocks.remove(position);
1028
1029 if let Some(hooks) = &state.lifecycle_hooks {
1031 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
1032 id: deleted_mock.id.clone(),
1033 name: deleted_mock.name.clone(),
1034 };
1035 hooks.invoke_mock_deleted(&event).await;
1036 }
1037
1038 if let Some(tx) = &state.ws_broadcast {
1040 if let Err(e) = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone())) {
1041 tracing::warn!("Failed to broadcast mock_deleted event: no active WebSocket receivers (event: {:?})", std::mem::discriminant(&e.0));
1042 }
1043 }
1044
1045 Ok(StatusCode::NO_CONTENT)
1046}
1047
1048#[derive(Debug, Deserialize)]
1050pub struct ValidateConfigRequest {
1051 pub config: serde_json::Value,
1053 #[serde(default = "default_format")]
1055 pub format: String,
1056}
1057
1058fn default_format() -> String {
1059 "json".to_string()
1060}
1061
1062async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1064 use mockforge_core::config::ServerConfig;
1065
1066 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1067 "yaml" | "yml" => {
1068 let yaml_str = match serde_json::to_string(&request.config) {
1069 Ok(s) => s,
1070 Err(e) => {
1071 return (
1072 StatusCode::BAD_REQUEST,
1073 Json(serde_json::json!({
1074 "valid": false,
1075 "error": format!("Failed to convert to string: {}", e),
1076 "message": "Configuration validation failed"
1077 })),
1078 )
1079 .into_response();
1080 }
1081 };
1082 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1083 }
1084 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1085 };
1086
1087 match config_result {
1088 Ok(_) => Json(serde_json::json!({
1089 "valid": true,
1090 "message": "Configuration is valid"
1091 }))
1092 .into_response(),
1093 Err(e) => (
1094 StatusCode::BAD_REQUEST,
1095 Json(serde_json::json!({
1096 "valid": false,
1097 "error": format!("Invalid configuration: {}", e),
1098 "message": "Configuration validation failed"
1099 })),
1100 )
1101 .into_response(),
1102 }
1103}
1104
1105#[derive(Debug, Deserialize)]
1107pub struct BulkConfigUpdateRequest {
1108 pub updates: serde_json::Value,
1110}
1111
1112async fn bulk_update_config(
1120 State(state): State<ManagementState>,
1121 Json(request): Json<BulkConfigUpdateRequest>,
1122) -> impl IntoResponse {
1123 if !request.updates.is_object() {
1125 return (
1126 StatusCode::BAD_REQUEST,
1127 Json(serde_json::json!({
1128 "error": "Invalid request",
1129 "message": "Updates must be a JSON object"
1130 })),
1131 )
1132 .into_response();
1133 }
1134
1135 use mockforge_core::config::ServerConfig;
1137
1138 let base_config = ServerConfig::default();
1140 let base_json = match serde_json::to_value(&base_config) {
1141 Ok(v) => v,
1142 Err(e) => {
1143 return (
1144 StatusCode::INTERNAL_SERVER_ERROR,
1145 Json(serde_json::json!({
1146 "error": "Internal error",
1147 "message": format!("Failed to serialize base config: {}", e)
1148 })),
1149 )
1150 .into_response();
1151 }
1152 };
1153
1154 let mut merged = base_json.clone();
1156 if let (Some(merged_obj), Some(updates_obj)) =
1157 (merged.as_object_mut(), request.updates.as_object())
1158 {
1159 for (key, value) in updates_obj {
1160 merged_obj.insert(key.clone(), value.clone());
1161 }
1162 }
1163
1164 match serde_json::from_value::<ServerConfig>(merged) {
1166 Ok(validated_config) => {
1167 if let Some(ref config_lock) = state.server_config {
1169 let mut config = config_lock.write().await;
1170 *config = validated_config;
1171 Json(serde_json::json!({
1172 "success": true,
1173 "message": "Bulk configuration update applied successfully",
1174 "updates_received": request.updates,
1175 "validated": true,
1176 "applied": true
1177 }))
1178 .into_response()
1179 } else {
1180 Json(serde_json::json!({
1181 "success": true,
1182 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1183 "updates_received": request.updates,
1184 "validated": true,
1185 "applied": false
1186 }))
1187 .into_response()
1188 }
1189 }
1190 Err(e) => (
1191 StatusCode::BAD_REQUEST,
1192 Json(serde_json::json!({
1193 "error": "Invalid configuration",
1194 "message": format!("Configuration validation failed: {}", e),
1195 "validated": false
1196 })),
1197 )
1198 .into_response(),
1199 }
1200}
1201
1202async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1204 let mocks = state.mocks.read().await;
1205 let request_count = *state.request_counter.read().await;
1206
1207 Json(ServerStats {
1208 uptime_seconds: state.start_time.elapsed().as_secs(),
1209 total_requests: request_count,
1210 active_mocks: mocks.len(),
1211 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1212 registered_routes: mocks.len(), })
1214}
1215
1216async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1218 Json(ServerConfig {
1219 version: env!("CARGO_PKG_VERSION").to_string(),
1220 port: state.port,
1221 has_openapi_spec: state.spec.is_some(),
1222 spec_path: state.spec_path.clone(),
1223 })
1224}
1225
1226async fn get_openapi_spec(
1228 State(state): State<ManagementState>,
1229) -> Result<Json<serde_json::Value>, StatusCode> {
1230 match &state.spec {
1231 Some(spec) => match &spec.raw_document {
1232 Some(doc) => Ok(Json(doc.clone())),
1233 None => {
1234 match serde_json::to_value(&spec.spec) {
1236 Ok(val) => Ok(Json(val)),
1237 Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1238 }
1239 }
1240 },
1241 None => Err(StatusCode::NOT_FOUND),
1242 }
1243}
1244
1245async fn health_check() -> Json<serde_json::Value> {
1247 Json(serde_json::json!({
1248 "status": "healthy",
1249 "service": "mockforge-management",
1250 "timestamp": chrono::Utc::now().to_rfc3339()
1251 }))
1252}
1253
1254#[derive(Debug, Clone, Serialize, Deserialize)]
1256#[serde(rename_all = "lowercase")]
1257pub enum ExportFormat {
1258 Json,
1260 Yaml,
1262}
1263
1264async fn export_mocks(
1266 State(state): State<ManagementState>,
1267 Query(params): Query<std::collections::HashMap<String, String>>,
1268) -> Result<(StatusCode, String), StatusCode> {
1269 let mocks = state.mocks.read().await;
1270
1271 let format = params
1272 .get("format")
1273 .map(|f| match f.as_str() {
1274 "yaml" | "yml" => ExportFormat::Yaml,
1275 _ => ExportFormat::Json,
1276 })
1277 .unwrap_or(ExportFormat::Json);
1278
1279 match format {
1280 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1281 .map(|json| (StatusCode::OK, json))
1282 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1283 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1284 .map(|yaml| (StatusCode::OK, yaml))
1285 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1286 }
1287}
1288
1289async fn import_mocks(
1291 State(state): State<ManagementState>,
1292 Json(mocks): Json<Vec<MockConfig>>,
1293) -> impl IntoResponse {
1294 let mut current_mocks = state.mocks.write().await;
1295 current_mocks.clear();
1296 current_mocks.extend(mocks);
1297 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1298}
1299
1300#[cfg(feature = "smtp")]
1301async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1303 if let Some(ref smtp_registry) = state.smtp_registry {
1304 match smtp_registry.get_emails() {
1305 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1306 Err(e) => (
1307 StatusCode::INTERNAL_SERVER_ERROR,
1308 Json(serde_json::json!({
1309 "error": "Failed to retrieve emails",
1310 "message": e.to_string()
1311 })),
1312 ),
1313 }
1314 } else {
1315 (
1316 StatusCode::NOT_IMPLEMENTED,
1317 Json(serde_json::json!({
1318 "error": "SMTP mailbox management not available",
1319 "message": "SMTP server is not enabled or registry not available."
1320 })),
1321 )
1322 }
1323}
1324
1325#[cfg(feature = "smtp")]
1327async fn get_smtp_email(
1328 State(state): State<ManagementState>,
1329 Path(id): Path<String>,
1330) -> impl IntoResponse {
1331 if let Some(ref smtp_registry) = state.smtp_registry {
1332 match smtp_registry.get_email_by_id(&id) {
1333 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1334 Ok(None) => (
1335 StatusCode::NOT_FOUND,
1336 Json(serde_json::json!({
1337 "error": "Email not found",
1338 "id": id
1339 })),
1340 ),
1341 Err(e) => (
1342 StatusCode::INTERNAL_SERVER_ERROR,
1343 Json(serde_json::json!({
1344 "error": "Failed to retrieve email",
1345 "message": e.to_string()
1346 })),
1347 ),
1348 }
1349 } else {
1350 (
1351 StatusCode::NOT_IMPLEMENTED,
1352 Json(serde_json::json!({
1353 "error": "SMTP mailbox management not available",
1354 "message": "SMTP server is not enabled or registry not available."
1355 })),
1356 )
1357 }
1358}
1359
1360#[cfg(feature = "smtp")]
1362async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1363 if let Some(ref smtp_registry) = state.smtp_registry {
1364 match smtp_registry.clear_mailbox() {
1365 Ok(()) => (
1366 StatusCode::OK,
1367 Json(serde_json::json!({
1368 "message": "Mailbox cleared successfully"
1369 })),
1370 ),
1371 Err(e) => (
1372 StatusCode::INTERNAL_SERVER_ERROR,
1373 Json(serde_json::json!({
1374 "error": "Failed to clear mailbox",
1375 "message": e.to_string()
1376 })),
1377 ),
1378 }
1379 } else {
1380 (
1381 StatusCode::NOT_IMPLEMENTED,
1382 Json(serde_json::json!({
1383 "error": "SMTP mailbox management not available",
1384 "message": "SMTP server is not enabled or registry not available."
1385 })),
1386 )
1387 }
1388}
1389
1390#[cfg(feature = "smtp")]
1392async fn export_smtp_mailbox(
1393 Query(params): Query<std::collections::HashMap<String, String>>,
1394) -> impl IntoResponse {
1395 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1396 (
1397 StatusCode::NOT_IMPLEMENTED,
1398 Json(serde_json::json!({
1399 "error": "SMTP mailbox management not available via HTTP API",
1400 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1401 "requested_format": format
1402 })),
1403 )
1404}
1405
1406#[cfg(feature = "smtp")]
1408async fn search_smtp_emails(
1409 State(state): State<ManagementState>,
1410 Query(params): Query<std::collections::HashMap<String, String>>,
1411) -> impl IntoResponse {
1412 if let Some(ref smtp_registry) = state.smtp_registry {
1413 let filters = EmailSearchFilters {
1414 sender: params.get("sender").cloned(),
1415 recipient: params.get("recipient").cloned(),
1416 subject: params.get("subject").cloned(),
1417 body: params.get("body").cloned(),
1418 since: params
1419 .get("since")
1420 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1421 .map(|dt| dt.with_timezone(&chrono::Utc)),
1422 until: params
1423 .get("until")
1424 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1425 .map(|dt| dt.with_timezone(&chrono::Utc)),
1426 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1427 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1428 };
1429
1430 match smtp_registry.search_emails(filters) {
1431 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1432 Err(e) => (
1433 StatusCode::INTERNAL_SERVER_ERROR,
1434 Json(serde_json::json!({
1435 "error": "Failed to search emails",
1436 "message": e.to_string()
1437 })),
1438 ),
1439 }
1440 } else {
1441 (
1442 StatusCode::NOT_IMPLEMENTED,
1443 Json(serde_json::json!({
1444 "error": "SMTP mailbox management not available",
1445 "message": "SMTP server is not enabled or registry not available."
1446 })),
1447 )
1448 }
1449}
1450
1451#[cfg(feature = "mqtt")]
1453#[derive(Debug, Clone, Serialize, Deserialize)]
1454pub struct MqttBrokerStats {
1455 pub connected_clients: usize,
1457 pub active_topics: usize,
1459 pub retained_messages: usize,
1461 pub total_subscriptions: usize,
1463}
1464
1465#[cfg(feature = "mqtt")]
1467async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1468 if let Some(broker) = &state.mqtt_broker {
1469 let connected_clients = broker.get_connected_clients().await.len();
1470 let active_topics = broker.get_active_topics().await.len();
1471 let stats = broker.get_topic_stats().await;
1472
1473 let broker_stats = MqttBrokerStats {
1474 connected_clients,
1475 active_topics,
1476 retained_messages: stats.retained_messages,
1477 total_subscriptions: stats.total_subscriptions,
1478 };
1479
1480 Json(broker_stats).into_response()
1481 } else {
1482 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1483 }
1484}
1485
1486#[cfg(feature = "mqtt")]
1487async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1488 if let Some(broker) = &state.mqtt_broker {
1489 let clients = broker.get_connected_clients().await;
1490 Json(serde_json::json!({
1491 "clients": clients
1492 }))
1493 .into_response()
1494 } else {
1495 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1496 }
1497}
1498
1499#[cfg(feature = "mqtt")]
1500async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1501 if let Some(broker) = &state.mqtt_broker {
1502 let topics = broker.get_active_topics().await;
1503 Json(serde_json::json!({
1504 "topics": topics
1505 }))
1506 .into_response()
1507 } else {
1508 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1509 }
1510}
1511
1512#[cfg(feature = "mqtt")]
1513async fn disconnect_mqtt_client(
1514 State(state): State<ManagementState>,
1515 Path(client_id): Path<String>,
1516) -> impl IntoResponse {
1517 if let Some(broker) = &state.mqtt_broker {
1518 match broker.disconnect_client(&client_id).await {
1519 Ok(_) => {
1520 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1521 }
1522 Err(e) => {
1523 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1524 .into_response()
1525 }
1526 }
1527 } else {
1528 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1529 }
1530}
1531
1532#[cfg(feature = "mqtt")]
1535#[derive(Debug, Deserialize)]
1537pub struct MqttPublishRequest {
1538 pub topic: String,
1540 pub payload: String,
1542 #[serde(default = "default_qos")]
1544 pub qos: u8,
1545 #[serde(default)]
1547 pub retain: bool,
1548}
1549
1550#[cfg(feature = "mqtt")]
1551fn default_qos() -> u8 {
1552 0
1553}
1554
1555#[cfg(feature = "mqtt")]
1556async fn publish_mqtt_message_handler(
1558 State(state): State<ManagementState>,
1559 Json(request): Json<serde_json::Value>,
1560) -> impl IntoResponse {
1561 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1563 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1564 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1565 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1566
1567 if topic.is_none() || payload.is_none() {
1568 return (
1569 StatusCode::BAD_REQUEST,
1570 Json(serde_json::json!({
1571 "error": "Invalid request",
1572 "message": "Missing required fields: topic and payload"
1573 })),
1574 );
1575 }
1576
1577 let topic = topic.unwrap();
1578 let payload = payload.unwrap();
1579
1580 if let Some(broker) = &state.mqtt_broker {
1581 if qos > 2 {
1583 return (
1584 StatusCode::BAD_REQUEST,
1585 Json(serde_json::json!({
1586 "error": "Invalid QoS",
1587 "message": "QoS must be 0, 1, or 2"
1588 })),
1589 );
1590 }
1591
1592 let payload_bytes = payload.as_bytes().to_vec();
1594 let client_id = "mockforge-management-api".to_string();
1595
1596 let publish_result = broker
1597 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1598 .await
1599 .map_err(|e| format!("{}", e));
1600
1601 match publish_result {
1602 Ok(_) => {
1603 let event = MessageEvent::Mqtt(MqttMessageEvent {
1605 topic: topic.clone(),
1606 payload: payload.clone(),
1607 qos,
1608 retain,
1609 timestamp: chrono::Utc::now().to_rfc3339(),
1610 });
1611 let _ = state.message_events.send(event);
1612
1613 (
1614 StatusCode::OK,
1615 Json(serde_json::json!({
1616 "success": true,
1617 "message": format!("Message published to topic '{}'", topic),
1618 "topic": topic,
1619 "qos": qos,
1620 "retain": retain
1621 })),
1622 )
1623 }
1624 Err(error_msg) => (
1625 StatusCode::INTERNAL_SERVER_ERROR,
1626 Json(serde_json::json!({
1627 "error": "Failed to publish message",
1628 "message": error_msg
1629 })),
1630 ),
1631 }
1632 } else {
1633 (
1634 StatusCode::SERVICE_UNAVAILABLE,
1635 Json(serde_json::json!({
1636 "error": "MQTT broker not available",
1637 "message": "MQTT broker is not enabled or not available."
1638 })),
1639 )
1640 }
1641}
1642
1643#[cfg(not(feature = "mqtt"))]
1644async fn publish_mqtt_message_handler(
1646 State(_state): State<ManagementState>,
1647 Json(_request): Json<serde_json::Value>,
1648) -> impl IntoResponse {
1649 (
1650 StatusCode::SERVICE_UNAVAILABLE,
1651 Json(serde_json::json!({
1652 "error": "MQTT feature not enabled",
1653 "message": "MQTT support is not compiled into this build"
1654 })),
1655 )
1656}
1657
1658#[cfg(feature = "mqtt")]
1659#[derive(Debug, Deserialize)]
1661pub struct MqttBatchPublishRequest {
1662 pub messages: Vec<MqttPublishRequest>,
1664 #[serde(default = "default_delay")]
1666 pub delay_ms: u64,
1667}
1668
1669#[cfg(feature = "mqtt")]
1670fn default_delay() -> u64 {
1671 100
1672}
1673
1674#[cfg(feature = "mqtt")]
1675async fn publish_mqtt_batch_handler(
1677 State(state): State<ManagementState>,
1678 Json(request): Json<serde_json::Value>,
1679) -> impl IntoResponse {
1680 let messages_json = request.get("messages").and_then(|v| v.as_array());
1682 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1683
1684 if messages_json.is_none() {
1685 return (
1686 StatusCode::BAD_REQUEST,
1687 Json(serde_json::json!({
1688 "error": "Invalid request",
1689 "message": "Missing required field: messages"
1690 })),
1691 );
1692 }
1693
1694 let messages_json = messages_json.unwrap();
1695
1696 if let Some(broker) = &state.mqtt_broker {
1697 if messages_json.is_empty() {
1698 return (
1699 StatusCode::BAD_REQUEST,
1700 Json(serde_json::json!({
1701 "error": "Empty batch",
1702 "message": "At least one message is required"
1703 })),
1704 );
1705 }
1706
1707 let mut results = Vec::new();
1708 let client_id = "mockforge-management-api".to_string();
1709
1710 for (index, msg_json) in messages_json.iter().enumerate() {
1711 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1712 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1713 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1714 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1715
1716 if topic.is_none() || payload.is_none() {
1717 results.push(serde_json::json!({
1718 "index": index,
1719 "success": false,
1720 "error": "Missing required fields: topic and payload"
1721 }));
1722 continue;
1723 }
1724
1725 let topic = topic.unwrap();
1726 let payload = payload.unwrap();
1727
1728 if qos > 2 {
1730 results.push(serde_json::json!({
1731 "index": index,
1732 "success": false,
1733 "error": "Invalid QoS (must be 0, 1, or 2)"
1734 }));
1735 continue;
1736 }
1737
1738 let payload_bytes = payload.as_bytes().to_vec();
1740
1741 let publish_result = broker
1742 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1743 .await
1744 .map_err(|e| format!("{}", e));
1745
1746 match publish_result {
1747 Ok(_) => {
1748 let event = MessageEvent::Mqtt(MqttMessageEvent {
1750 topic: topic.clone(),
1751 payload: payload.clone(),
1752 qos,
1753 retain,
1754 timestamp: chrono::Utc::now().to_rfc3339(),
1755 });
1756 let _ = state.message_events.send(event);
1757
1758 results.push(serde_json::json!({
1759 "index": index,
1760 "success": true,
1761 "topic": topic,
1762 "qos": qos
1763 }));
1764 }
1765 Err(error_msg) => {
1766 results.push(serde_json::json!({
1767 "index": index,
1768 "success": false,
1769 "error": error_msg
1770 }));
1771 }
1772 }
1773
1774 if index < messages_json.len() - 1 && delay_ms > 0 {
1776 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1777 }
1778 }
1779
1780 let success_count =
1781 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1782
1783 (
1784 StatusCode::OK,
1785 Json(serde_json::json!({
1786 "success": true,
1787 "total": messages_json.len(),
1788 "succeeded": success_count,
1789 "failed": messages_json.len() - success_count,
1790 "results": results
1791 })),
1792 )
1793 } else {
1794 (
1795 StatusCode::SERVICE_UNAVAILABLE,
1796 Json(serde_json::json!({
1797 "error": "MQTT broker not available",
1798 "message": "MQTT broker is not enabled or not available."
1799 })),
1800 )
1801 }
1802}
1803
1804#[cfg(not(feature = "mqtt"))]
1805async fn publish_mqtt_batch_handler(
1807 State(_state): State<ManagementState>,
1808 Json(_request): Json<serde_json::Value>,
1809) -> impl IntoResponse {
1810 (
1811 StatusCode::SERVICE_UNAVAILABLE,
1812 Json(serde_json::json!({
1813 "error": "MQTT feature not enabled",
1814 "message": "MQTT support is not compiled into this build"
1815 })),
1816 )
1817}
1818
1819#[derive(Debug, Deserialize)]
1823struct SetMigrationModeRequest {
1824 mode: String,
1825}
1826
1827async fn get_migration_routes(
1829 State(state): State<ManagementState>,
1830) -> Result<Json<serde_json::Value>, StatusCode> {
1831 let proxy_config = match &state.proxy_config {
1832 Some(config) => config,
1833 None => {
1834 return Ok(Json(serde_json::json!({
1835 "error": "Migration not configured. Proxy config not available."
1836 })));
1837 }
1838 };
1839
1840 let config = proxy_config.read().await;
1841 let routes = config.get_migration_routes();
1842
1843 Ok(Json(serde_json::json!({
1844 "routes": routes
1845 })))
1846}
1847
1848async fn toggle_route_migration(
1850 State(state): State<ManagementState>,
1851 Path(pattern): Path<String>,
1852) -> Result<Json<serde_json::Value>, StatusCode> {
1853 let proxy_config = match &state.proxy_config {
1854 Some(config) => config,
1855 None => {
1856 return Ok(Json(serde_json::json!({
1857 "error": "Migration not configured. Proxy config not available."
1858 })));
1859 }
1860 };
1861
1862 let mut config = proxy_config.write().await;
1863 let new_mode = match config.toggle_route_migration(&pattern) {
1864 Some(mode) => mode,
1865 None => {
1866 return Ok(Json(serde_json::json!({
1867 "error": format!("Route pattern not found: {}", pattern)
1868 })));
1869 }
1870 };
1871
1872 Ok(Json(serde_json::json!({
1873 "pattern": pattern,
1874 "mode": format!("{:?}", new_mode).to_lowercase()
1875 })))
1876}
1877
1878async fn set_route_migration_mode(
1880 State(state): State<ManagementState>,
1881 Path(pattern): Path<String>,
1882 Json(request): Json<SetMigrationModeRequest>,
1883) -> Result<Json<serde_json::Value>, StatusCode> {
1884 let proxy_config = match &state.proxy_config {
1885 Some(config) => config,
1886 None => {
1887 return Ok(Json(serde_json::json!({
1888 "error": "Migration not configured. Proxy config not available."
1889 })));
1890 }
1891 };
1892
1893 use mockforge_core::proxy::config::MigrationMode;
1894 let mode = match request.mode.to_lowercase().as_str() {
1895 "mock" => MigrationMode::Mock,
1896 "shadow" => MigrationMode::Shadow,
1897 "real" => MigrationMode::Real,
1898 "auto" => MigrationMode::Auto,
1899 _ => {
1900 return Ok(Json(serde_json::json!({
1901 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1902 })));
1903 }
1904 };
1905
1906 let mut config = proxy_config.write().await;
1907 let updated = config.update_rule_migration_mode(&pattern, mode);
1908
1909 if !updated {
1910 return Ok(Json(serde_json::json!({
1911 "error": format!("Route pattern not found: {}", pattern)
1912 })));
1913 }
1914
1915 Ok(Json(serde_json::json!({
1916 "pattern": pattern,
1917 "mode": format!("{:?}", mode).to_lowercase()
1918 })))
1919}
1920
1921async fn toggle_group_migration(
1923 State(state): State<ManagementState>,
1924 Path(group): Path<String>,
1925) -> Result<Json<serde_json::Value>, StatusCode> {
1926 let proxy_config = match &state.proxy_config {
1927 Some(config) => config,
1928 None => {
1929 return Ok(Json(serde_json::json!({
1930 "error": "Migration not configured. Proxy config not available."
1931 })));
1932 }
1933 };
1934
1935 let mut config = proxy_config.write().await;
1936 let new_mode = config.toggle_group_migration(&group);
1937
1938 Ok(Json(serde_json::json!({
1939 "group": group,
1940 "mode": format!("{:?}", new_mode).to_lowercase()
1941 })))
1942}
1943
1944async fn set_group_migration_mode(
1946 State(state): State<ManagementState>,
1947 Path(group): Path<String>,
1948 Json(request): Json<SetMigrationModeRequest>,
1949) -> Result<Json<serde_json::Value>, StatusCode> {
1950 let proxy_config = match &state.proxy_config {
1951 Some(config) => config,
1952 None => {
1953 return Ok(Json(serde_json::json!({
1954 "error": "Migration not configured. Proxy config not available."
1955 })));
1956 }
1957 };
1958
1959 use mockforge_core::proxy::config::MigrationMode;
1960 let mode = match request.mode.to_lowercase().as_str() {
1961 "mock" => MigrationMode::Mock,
1962 "shadow" => MigrationMode::Shadow,
1963 "real" => MigrationMode::Real,
1964 "auto" => MigrationMode::Auto,
1965 _ => {
1966 return Ok(Json(serde_json::json!({
1967 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1968 })));
1969 }
1970 };
1971
1972 let mut config = proxy_config.write().await;
1973 config.update_group_migration_mode(&group, mode);
1974
1975 Ok(Json(serde_json::json!({
1976 "group": group,
1977 "mode": format!("{:?}", mode).to_lowercase()
1978 })))
1979}
1980
1981async fn get_migration_groups(
1983 State(state): State<ManagementState>,
1984) -> Result<Json<serde_json::Value>, StatusCode> {
1985 let proxy_config = match &state.proxy_config {
1986 Some(config) => config,
1987 None => {
1988 return Ok(Json(serde_json::json!({
1989 "error": "Migration not configured. Proxy config not available."
1990 })));
1991 }
1992 };
1993
1994 let config = proxy_config.read().await;
1995 let groups = config.get_migration_groups();
1996
1997 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1999 .into_iter()
2000 .map(|(name, info)| {
2001 (
2002 name,
2003 serde_json::json!({
2004 "name": info.name,
2005 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
2006 "route_count": info.route_count
2007 }),
2008 )
2009 })
2010 .collect();
2011
2012 Ok(Json(serde_json::json!(groups_json)))
2013}
2014
2015async fn get_migration_status(
2017 State(state): State<ManagementState>,
2018) -> Result<Json<serde_json::Value>, StatusCode> {
2019 let proxy_config = match &state.proxy_config {
2020 Some(config) => config,
2021 None => {
2022 return Ok(Json(serde_json::json!({
2023 "error": "Migration not configured. Proxy config not available."
2024 })));
2025 }
2026 };
2027
2028 let config = proxy_config.read().await;
2029 let routes = config.get_migration_routes();
2030 let groups = config.get_migration_groups();
2031
2032 let mut mock_count = 0;
2033 let mut shadow_count = 0;
2034 let mut real_count = 0;
2035 let mut auto_count = 0;
2036
2037 for route in &routes {
2038 match route.migration_mode {
2039 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
2040 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
2041 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
2042 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
2043 }
2044 }
2045
2046 Ok(Json(serde_json::json!({
2047 "total_routes": routes.len(),
2048 "mock_routes": mock_count,
2049 "shadow_routes": shadow_count,
2050 "real_routes": real_count,
2051 "auto_routes": auto_count,
2052 "total_groups": groups.len(),
2053 "migration_enabled": config.migration_enabled
2054 })))
2055}
2056
2057#[derive(Debug, Deserialize, Serialize)]
2061pub struct ProxyRuleRequest {
2062 pub pattern: String,
2064 #[serde(rename = "type")]
2066 pub rule_type: String,
2067 #[serde(default)]
2069 pub status_codes: Vec<u16>,
2070 pub body_transforms: Vec<BodyTransformRequest>,
2072 #[serde(default = "default_true")]
2074 pub enabled: bool,
2075}
2076
2077#[derive(Debug, Deserialize, Serialize)]
2079pub struct BodyTransformRequest {
2080 pub path: String,
2082 pub replace: String,
2084 #[serde(default)]
2086 pub operation: String,
2087}
2088
2089#[derive(Debug, Serialize)]
2091pub struct ProxyRuleResponse {
2092 pub id: usize,
2094 pub pattern: String,
2096 #[serde(rename = "type")]
2098 pub rule_type: String,
2099 pub status_codes: Vec<u16>,
2101 pub body_transforms: Vec<BodyTransformRequest>,
2103 pub enabled: bool,
2105}
2106
2107async fn list_proxy_rules(
2109 State(state): State<ManagementState>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111 let proxy_config = match &state.proxy_config {
2112 Some(config) => config,
2113 None => {
2114 return Ok(Json(serde_json::json!({
2115 "error": "Proxy not configured. Proxy config not available."
2116 })));
2117 }
2118 };
2119
2120 let config = proxy_config.read().await;
2121
2122 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2123
2124 for (idx, rule) in config.request_replacements.iter().enumerate() {
2126 rules.push(ProxyRuleResponse {
2127 id: idx,
2128 pattern: rule.pattern.clone(),
2129 rule_type: "request".to_string(),
2130 status_codes: Vec::new(),
2131 body_transforms: rule
2132 .body_transforms
2133 .iter()
2134 .map(|t| BodyTransformRequest {
2135 path: t.path.clone(),
2136 replace: t.replace.clone(),
2137 operation: format!("{:?}", t.operation).to_lowercase(),
2138 })
2139 .collect(),
2140 enabled: rule.enabled,
2141 });
2142 }
2143
2144 let request_count = config.request_replacements.len();
2146 for (idx, rule) in config.response_replacements.iter().enumerate() {
2147 rules.push(ProxyRuleResponse {
2148 id: request_count + idx,
2149 pattern: rule.pattern.clone(),
2150 rule_type: "response".to_string(),
2151 status_codes: rule.status_codes.clone(),
2152 body_transforms: rule
2153 .body_transforms
2154 .iter()
2155 .map(|t| BodyTransformRequest {
2156 path: t.path.clone(),
2157 replace: t.replace.clone(),
2158 operation: format!("{:?}", t.operation).to_lowercase(),
2159 })
2160 .collect(),
2161 enabled: rule.enabled,
2162 });
2163 }
2164
2165 Ok(Json(serde_json::json!({
2166 "rules": rules
2167 })))
2168}
2169
2170async fn create_proxy_rule(
2172 State(state): State<ManagementState>,
2173 Json(request): Json<ProxyRuleRequest>,
2174) -> Result<Json<serde_json::Value>, StatusCode> {
2175 let proxy_config = match &state.proxy_config {
2176 Some(config) => config,
2177 None => {
2178 return Ok(Json(serde_json::json!({
2179 "error": "Proxy not configured. Proxy config not available."
2180 })));
2181 }
2182 };
2183
2184 if request.body_transforms.is_empty() {
2186 return Ok(Json(serde_json::json!({
2187 "error": "At least one body transform is required"
2188 })));
2189 }
2190
2191 let body_transforms: Vec<BodyTransform> = request
2192 .body_transforms
2193 .iter()
2194 .map(|t| {
2195 let op = match t.operation.as_str() {
2196 "replace" => TransformOperation::Replace,
2197 "add" => TransformOperation::Add,
2198 "remove" => TransformOperation::Remove,
2199 _ => TransformOperation::Replace,
2200 };
2201 BodyTransform {
2202 path: t.path.clone(),
2203 replace: t.replace.clone(),
2204 operation: op,
2205 }
2206 })
2207 .collect();
2208
2209 let new_rule = BodyTransformRule {
2210 pattern: request.pattern.clone(),
2211 status_codes: request.status_codes.clone(),
2212 body_transforms,
2213 enabled: request.enabled,
2214 };
2215
2216 let mut config = proxy_config.write().await;
2217
2218 let rule_id = if request.rule_type == "request" {
2219 config.request_replacements.push(new_rule);
2220 config.request_replacements.len() - 1
2221 } else if request.rule_type == "response" {
2222 config.response_replacements.push(new_rule);
2223 config.request_replacements.len() + config.response_replacements.len() - 1
2224 } else {
2225 return Ok(Json(serde_json::json!({
2226 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2227 })));
2228 };
2229
2230 Ok(Json(serde_json::json!({
2231 "id": rule_id,
2232 "message": "Rule created successfully"
2233 })))
2234}
2235
2236async fn get_proxy_rule(
2238 State(state): State<ManagementState>,
2239 Path(id): Path<String>,
2240) -> Result<Json<serde_json::Value>, StatusCode> {
2241 let proxy_config = match &state.proxy_config {
2242 Some(config) => config,
2243 None => {
2244 return Ok(Json(serde_json::json!({
2245 "error": "Proxy not configured. Proxy config not available."
2246 })));
2247 }
2248 };
2249
2250 let config = proxy_config.read().await;
2251 let rule_id: usize = match id.parse() {
2252 Ok(id) => id,
2253 Err(_) => {
2254 return Ok(Json(serde_json::json!({
2255 "error": format!("Invalid rule ID: {}", id)
2256 })));
2257 }
2258 };
2259
2260 let request_count = config.request_replacements.len();
2261
2262 if rule_id < request_count {
2263 let rule = &config.request_replacements[rule_id];
2265 Ok(Json(serde_json::json!({
2266 "id": rule_id,
2267 "pattern": rule.pattern,
2268 "type": "request",
2269 "status_codes": [],
2270 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2271 "path": t.path,
2272 "replace": t.replace,
2273 "operation": format!("{:?}", t.operation).to_lowercase()
2274 })).collect::<Vec<_>>(),
2275 "enabled": rule.enabled
2276 })))
2277 } else if rule_id < request_count + config.response_replacements.len() {
2278 let response_idx = rule_id - request_count;
2280 let rule = &config.response_replacements[response_idx];
2281 Ok(Json(serde_json::json!({
2282 "id": rule_id,
2283 "pattern": rule.pattern,
2284 "type": "response",
2285 "status_codes": rule.status_codes,
2286 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2287 "path": t.path,
2288 "replace": t.replace,
2289 "operation": format!("{:?}", t.operation).to_lowercase()
2290 })).collect::<Vec<_>>(),
2291 "enabled": rule.enabled
2292 })))
2293 } else {
2294 Ok(Json(serde_json::json!({
2295 "error": format!("Rule ID {} not found", rule_id)
2296 })))
2297 }
2298}
2299
2300async fn update_proxy_rule(
2302 State(state): State<ManagementState>,
2303 Path(id): Path<String>,
2304 Json(request): Json<ProxyRuleRequest>,
2305) -> Result<Json<serde_json::Value>, StatusCode> {
2306 let proxy_config = match &state.proxy_config {
2307 Some(config) => config,
2308 None => {
2309 return Ok(Json(serde_json::json!({
2310 "error": "Proxy not configured. Proxy config not available."
2311 })));
2312 }
2313 };
2314
2315 let mut config = proxy_config.write().await;
2316 let rule_id: usize = match id.parse() {
2317 Ok(id) => id,
2318 Err(_) => {
2319 return Ok(Json(serde_json::json!({
2320 "error": format!("Invalid rule ID: {}", id)
2321 })));
2322 }
2323 };
2324
2325 let body_transforms: Vec<BodyTransform> = request
2326 .body_transforms
2327 .iter()
2328 .map(|t| {
2329 let op = match t.operation.as_str() {
2330 "replace" => TransformOperation::Replace,
2331 "add" => TransformOperation::Add,
2332 "remove" => TransformOperation::Remove,
2333 _ => TransformOperation::Replace,
2334 };
2335 BodyTransform {
2336 path: t.path.clone(),
2337 replace: t.replace.clone(),
2338 operation: op,
2339 }
2340 })
2341 .collect();
2342
2343 let updated_rule = BodyTransformRule {
2344 pattern: request.pattern.clone(),
2345 status_codes: request.status_codes.clone(),
2346 body_transforms,
2347 enabled: request.enabled,
2348 };
2349
2350 let request_count = config.request_replacements.len();
2351
2352 if rule_id < request_count {
2353 config.request_replacements[rule_id] = updated_rule;
2355 } else if rule_id < request_count + config.response_replacements.len() {
2356 let response_idx = rule_id - request_count;
2358 config.response_replacements[response_idx] = updated_rule;
2359 } else {
2360 return Ok(Json(serde_json::json!({
2361 "error": format!("Rule ID {} not found", rule_id)
2362 })));
2363 }
2364
2365 Ok(Json(serde_json::json!({
2366 "id": rule_id,
2367 "message": "Rule updated successfully"
2368 })))
2369}
2370
2371async fn delete_proxy_rule(
2373 State(state): State<ManagementState>,
2374 Path(id): Path<String>,
2375) -> Result<Json<serde_json::Value>, StatusCode> {
2376 let proxy_config = match &state.proxy_config {
2377 Some(config) => config,
2378 None => {
2379 return Ok(Json(serde_json::json!({
2380 "error": "Proxy not configured. Proxy config not available."
2381 })));
2382 }
2383 };
2384
2385 let mut config = proxy_config.write().await;
2386 let rule_id: usize = match id.parse() {
2387 Ok(id) => id,
2388 Err(_) => {
2389 return Ok(Json(serde_json::json!({
2390 "error": format!("Invalid rule ID: {}", id)
2391 })));
2392 }
2393 };
2394
2395 let request_count = config.request_replacements.len();
2396
2397 if rule_id < request_count {
2398 config.request_replacements.remove(rule_id);
2400 } else if rule_id < request_count + config.response_replacements.len() {
2401 let response_idx = rule_id - request_count;
2403 config.response_replacements.remove(response_idx);
2404 } else {
2405 return Ok(Json(serde_json::json!({
2406 "error": format!("Rule ID {} not found", rule_id)
2407 })));
2408 }
2409
2410 Ok(Json(serde_json::json!({
2411 "id": rule_id,
2412 "message": "Rule deleted successfully"
2413 })))
2414}
2415
2416async fn get_proxy_inspect(
2418 State(state): State<ManagementState>,
2419 Query(params): Query<std::collections::HashMap<String, String>>,
2420) -> Result<Json<serde_json::Value>, StatusCode> {
2421 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2422 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2423
2424 let proxy_config = match &state.proxy_config {
2425 Some(config) => config.read().await,
2426 None => {
2427 return Ok(Json(serde_json::json!({
2428 "error": "Proxy not configured. Proxy config not available."
2429 })));
2430 }
2431 };
2432
2433 let mut rules = Vec::new();
2434 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2435 rules.push(serde_json::json!({
2436 "id": idx,
2437 "kind": "request",
2438 "pattern": rule.pattern,
2439 "enabled": rule.enabled,
2440 "status_codes": rule.status_codes,
2441 "transform_count": rule.body_transforms.len(),
2442 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2443 "path": t.path,
2444 "operation": t.operation,
2445 "replace": t.replace
2446 })).collect::<Vec<_>>()
2447 }));
2448 }
2449 let request_rule_count = rules.len();
2450 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2451 rules.push(serde_json::json!({
2452 "id": request_rule_count + idx,
2453 "kind": "response",
2454 "pattern": rule.pattern,
2455 "enabled": rule.enabled,
2456 "status_codes": rule.status_codes,
2457 "transform_count": rule.body_transforms.len(),
2458 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2459 "path": t.path,
2460 "operation": t.operation,
2461 "replace": t.replace
2462 })).collect::<Vec<_>>()
2463 }));
2464 }
2465
2466 let total = rules.len();
2467 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2468
2469 Ok(Json(serde_json::json!({
2470 "enabled": proxy_config.enabled,
2471 "target_url": proxy_config.target_url,
2472 "prefix": proxy_config.prefix,
2473 "timeout_seconds": proxy_config.timeout_seconds,
2474 "follow_redirects": proxy_config.follow_redirects,
2475 "passthrough_by_default": proxy_config.passthrough_by_default,
2476 "rules": paged_rules,
2477 "request_rule_count": request_rule_count,
2478 "response_rule_count": total.saturating_sub(request_rule_count),
2479 "limit": limit,
2480 "offset": offset,
2481 "total": total
2482 })))
2483}
2484
2485pub fn management_router(state: ManagementState) -> Router {
2487 let router = Router::new()
2488 .route("/health", get(health_check))
2489 .route("/stats", get(get_stats))
2490 .route("/config", get(get_config))
2491 .route("/config/validate", post(validate_config))
2492 .route("/config/bulk", post(bulk_update_config))
2493 .route("/mocks", get(list_mocks))
2494 .route("/mocks", post(create_mock))
2495 .route("/mocks/{id}", get(get_mock))
2496 .route("/mocks/{id}", put(update_mock))
2497 .route("/mocks/{id}", delete(delete_mock))
2498 .route("/export", get(export_mocks))
2499 .route("/import", post(import_mocks))
2500 .route("/spec", get(get_openapi_spec));
2501
2502 #[cfg(feature = "smtp")]
2503 let router = router
2504 .route("/smtp/mailbox", get(list_smtp_emails))
2505 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2506 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2507 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2508 .route("/smtp/mailbox/search", get(search_smtp_emails));
2509
2510 #[cfg(not(feature = "smtp"))]
2511 let router = router;
2512
2513 #[cfg(feature = "mqtt")]
2515 let router = router
2516 .route("/mqtt/stats", get(get_mqtt_stats))
2517 .route("/mqtt/clients", get(get_mqtt_clients))
2518 .route("/mqtt/topics", get(get_mqtt_topics))
2519 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2520 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2521 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2522 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2523
2524 #[cfg(not(feature = "mqtt"))]
2525 let router = router
2526 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2527 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2528
2529 #[cfg(feature = "kafka")]
2530 let router = router
2531 .route("/kafka/stats", get(get_kafka_stats))
2532 .route("/kafka/topics", get(get_kafka_topics))
2533 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2534 .route("/kafka/groups", get(get_kafka_groups))
2535 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2536 .route("/kafka/produce", post(produce_kafka_message))
2537 .route("/kafka/produce/batch", post(produce_kafka_batch))
2538 .route("/kafka/messages/stream", get(kafka_messages_stream));
2539
2540 #[cfg(not(feature = "kafka"))]
2541 let router = router;
2542
2543 let router = router
2545 .route("/migration/routes", get(get_migration_routes))
2546 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2547 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2548 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2549 .route("/migration/groups/{group}", put(set_group_migration_mode))
2550 .route("/migration/groups", get(get_migration_groups))
2551 .route("/migration/status", get(get_migration_status));
2552
2553 let router = router
2555 .route("/proxy/rules", get(list_proxy_rules))
2556 .route("/proxy/rules", post(create_proxy_rule))
2557 .route("/proxy/rules/{id}", get(get_proxy_rule))
2558 .route("/proxy/rules/{id}", put(update_proxy_rule))
2559 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2560 .route("/proxy/inspect", get(get_proxy_inspect));
2561
2562 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2564
2565 let router = router.nest(
2567 "/snapshot-diff",
2568 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2569 );
2570
2571 #[cfg(feature = "behavioral-cloning")]
2572 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2573
2574 let router = router
2575 .route("/mockai/learn", post(learn_from_examples))
2576 .route("/mockai/rules/explanations", get(list_rule_explanations))
2577 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2578 .route("/chaos/config", get(get_chaos_config))
2579 .route("/chaos/config", post(update_chaos_config))
2580 .route("/network/profiles", get(list_network_profiles))
2581 .route("/network/profile/apply", post(apply_network_profile));
2582
2583 let router =
2585 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2586
2587 #[cfg(feature = "conformance")]
2589 let router = router.nest_service(
2590 "/conformance",
2591 crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2592 );
2593 #[cfg(not(feature = "conformance"))]
2594 let router = router;
2595
2596 router.with_state(state)
2597}
2598
2599#[cfg(feature = "kafka")]
2600#[derive(Debug, Clone, Serialize, Deserialize)]
2602pub struct KafkaBrokerStats {
2603 pub topics: usize,
2605 pub partitions: usize,
2607 pub consumer_groups: usize,
2609 pub messages_produced: u64,
2611 pub messages_consumed: u64,
2613}
2614
2615#[cfg(feature = "kafka")]
2616#[allow(missing_docs)]
2617#[derive(Debug, Clone, Serialize, Deserialize)]
2618pub struct KafkaTopicInfo {
2619 pub name: String,
2620 pub partitions: usize,
2621 pub replication_factor: i32,
2622}
2623
2624#[cfg(feature = "kafka")]
2625#[allow(missing_docs)]
2626#[derive(Debug, Clone, Serialize, Deserialize)]
2627pub struct KafkaConsumerGroupInfo {
2628 pub group_id: String,
2629 pub members: usize,
2630 pub state: String,
2631}
2632
2633#[cfg(feature = "kafka")]
2634async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2636 if let Some(broker) = &state.kafka_broker {
2637 let topics = broker.topics.read().await;
2638 let consumer_groups = broker.consumer_groups.read().await;
2639
2640 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2641
2642 let metrics_snapshot = broker.metrics().snapshot();
2644
2645 let stats = KafkaBrokerStats {
2646 topics: topics.len(),
2647 partitions: total_partitions,
2648 consumer_groups: consumer_groups.groups().len(),
2649 messages_produced: metrics_snapshot.messages_produced_total,
2650 messages_consumed: metrics_snapshot.messages_consumed_total,
2651 };
2652
2653 Json(stats).into_response()
2654 } else {
2655 (
2656 StatusCode::SERVICE_UNAVAILABLE,
2657 Json(serde_json::json!({
2658 "error": "Kafka broker not available",
2659 "message": "Kafka broker is not enabled or not available."
2660 })),
2661 )
2662 .into_response()
2663 }
2664}
2665
2666#[cfg(feature = "kafka")]
2667async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2669 if let Some(broker) = &state.kafka_broker {
2670 let topics = broker.topics.read().await;
2671 let topic_list: Vec<KafkaTopicInfo> = topics
2672 .iter()
2673 .map(|(name, topic)| KafkaTopicInfo {
2674 name: name.clone(),
2675 partitions: topic.partitions.len(),
2676 replication_factor: topic.config.replication_factor as i32,
2677 })
2678 .collect();
2679
2680 Json(serde_json::json!({
2681 "topics": topic_list
2682 }))
2683 .into_response()
2684 } else {
2685 (
2686 StatusCode::SERVICE_UNAVAILABLE,
2687 Json(serde_json::json!({
2688 "error": "Kafka broker not available",
2689 "message": "Kafka broker is not enabled or not available."
2690 })),
2691 )
2692 .into_response()
2693 }
2694}
2695
2696#[cfg(feature = "kafka")]
2697async fn get_kafka_topic(
2699 State(state): State<ManagementState>,
2700 Path(topic_name): Path<String>,
2701) -> impl IntoResponse {
2702 if let Some(broker) = &state.kafka_broker {
2703 let topics = broker.topics.read().await;
2704 if let Some(topic) = topics.get(&topic_name) {
2705 Json(serde_json::json!({
2706 "name": topic_name,
2707 "partitions": topic.partitions.len(),
2708 "replication_factor": topic.config.replication_factor,
2709 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2710 "id": idx as i32,
2711 "leader": 0,
2712 "replicas": vec![0],
2713 "message_count": partition.messages.len()
2714 })).collect::<Vec<_>>()
2715 })).into_response()
2716 } else {
2717 (
2718 StatusCode::NOT_FOUND,
2719 Json(serde_json::json!({
2720 "error": "Topic not found",
2721 "topic": topic_name
2722 })),
2723 )
2724 .into_response()
2725 }
2726 } else {
2727 (
2728 StatusCode::SERVICE_UNAVAILABLE,
2729 Json(serde_json::json!({
2730 "error": "Kafka broker not available",
2731 "message": "Kafka broker is not enabled or not available."
2732 })),
2733 )
2734 .into_response()
2735 }
2736}
2737
2738#[cfg(feature = "kafka")]
2739async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2741 if let Some(broker) = &state.kafka_broker {
2742 let consumer_groups = broker.consumer_groups.read().await;
2743 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2744 .groups()
2745 .iter()
2746 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2747 group_id: group_id.clone(),
2748 members: group.members.len(),
2749 state: "Stable".to_string(), })
2751 .collect();
2752
2753 Json(serde_json::json!({
2754 "groups": groups
2755 }))
2756 .into_response()
2757 } else {
2758 (
2759 StatusCode::SERVICE_UNAVAILABLE,
2760 Json(serde_json::json!({
2761 "error": "Kafka broker not available",
2762 "message": "Kafka broker is not enabled or not available."
2763 })),
2764 )
2765 .into_response()
2766 }
2767}
2768
2769#[cfg(feature = "kafka")]
2770async fn get_kafka_group(
2772 State(state): State<ManagementState>,
2773 Path(group_id): Path<String>,
2774) -> impl IntoResponse {
2775 if let Some(broker) = &state.kafka_broker {
2776 let consumer_groups = broker.consumer_groups.read().await;
2777 if let Some(group) = consumer_groups.groups().get(&group_id) {
2778 Json(serde_json::json!({
2779 "group_id": group_id,
2780 "members": group.members.len(),
2781 "state": "Stable",
2782 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2783 "member_id": member_id,
2784 "client_id": member.client_id,
2785 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2786 "topic": a.topic,
2787 "partitions": a.partitions
2788 })).collect::<Vec<_>>()
2789 })).collect::<Vec<_>>(),
2790 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2791 "topic": topic,
2792 "partition": partition,
2793 "offset": offset
2794 })).collect::<Vec<_>>()
2795 })).into_response()
2796 } else {
2797 (
2798 StatusCode::NOT_FOUND,
2799 Json(serde_json::json!({
2800 "error": "Consumer group not found",
2801 "group_id": group_id
2802 })),
2803 )
2804 .into_response()
2805 }
2806 } else {
2807 (
2808 StatusCode::SERVICE_UNAVAILABLE,
2809 Json(serde_json::json!({
2810 "error": "Kafka broker not available",
2811 "message": "Kafka broker is not enabled or not available."
2812 })),
2813 )
2814 .into_response()
2815 }
2816}
2817
2818#[cfg(feature = "kafka")]
2821#[derive(Debug, Deserialize)]
2823pub struct KafkaProduceRequest {
2824 pub topic: String,
2826 #[serde(default)]
2828 pub key: Option<String>,
2829 pub value: String,
2831 #[serde(default)]
2833 pub partition: Option<i32>,
2834 #[serde(default)]
2836 pub headers: Option<std::collections::HashMap<String, String>>,
2837}
2838
2839#[cfg(feature = "kafka")]
2840async fn produce_kafka_message(
2842 State(state): State<ManagementState>,
2843 Json(request): Json<KafkaProduceRequest>,
2844) -> impl IntoResponse {
2845 if let Some(broker) = &state.kafka_broker {
2846 let mut topics = broker.topics.write().await;
2847
2848 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2850 mockforge_kafka::topics::Topic::new(
2851 request.topic.clone(),
2852 mockforge_kafka::topics::TopicConfig::default(),
2853 )
2854 });
2855
2856 let partition_id = if let Some(partition) = request.partition {
2858 partition
2859 } else {
2860 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2861 };
2862
2863 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2865 return (
2866 StatusCode::BAD_REQUEST,
2867 Json(serde_json::json!({
2868 "error": "Invalid partition",
2869 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2870 })),
2871 )
2872 .into_response();
2873 }
2874
2875 let key_clone = request.key.clone();
2877 let headers_clone = request.headers.clone();
2878 let message = mockforge_kafka::partitions::KafkaMessage {
2879 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2881 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2882 value: request.value.as_bytes().to_vec(),
2883 headers: headers_clone
2884 .clone()
2885 .unwrap_or_default()
2886 .into_iter()
2887 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2888 .collect(),
2889 };
2890
2891 match topic_entry.produce(partition_id, message).await {
2893 Ok(offset) => {
2894 if let Some(broker) = &state.kafka_broker {
2896 broker.metrics().record_messages_produced(1);
2897 }
2898
2899 #[cfg(feature = "kafka")]
2901 {
2902 let event = MessageEvent::Kafka(KafkaMessageEvent {
2903 topic: request.topic.clone(),
2904 key: key_clone,
2905 value: request.value.clone(),
2906 partition: partition_id,
2907 offset,
2908 headers: headers_clone,
2909 timestamp: chrono::Utc::now().to_rfc3339(),
2910 });
2911 let _ = state.message_events.send(event);
2912 }
2913
2914 Json(serde_json::json!({
2915 "success": true,
2916 "message": format!("Message produced to topic '{}'", request.topic),
2917 "topic": request.topic,
2918 "partition": partition_id,
2919 "offset": offset
2920 }))
2921 .into_response()
2922 }
2923 Err(e) => (
2924 StatusCode::INTERNAL_SERVER_ERROR,
2925 Json(serde_json::json!({
2926 "error": "Failed to produce message",
2927 "message": e.to_string()
2928 })),
2929 )
2930 .into_response(),
2931 }
2932 } else {
2933 (
2934 StatusCode::SERVICE_UNAVAILABLE,
2935 Json(serde_json::json!({
2936 "error": "Kafka broker not available",
2937 "message": "Kafka broker is not enabled or not available."
2938 })),
2939 )
2940 .into_response()
2941 }
2942}
2943
2944#[cfg(feature = "kafka")]
2945#[derive(Debug, Deserialize)]
2947pub struct KafkaBatchProduceRequest {
2948 pub messages: Vec<KafkaProduceRequest>,
2950 #[serde(default = "default_delay")]
2952 pub delay_ms: u64,
2953}
2954
2955#[cfg(feature = "kafka")]
2956async fn produce_kafka_batch(
2958 State(state): State<ManagementState>,
2959 Json(request): Json<KafkaBatchProduceRequest>,
2960) -> impl IntoResponse {
2961 if let Some(broker) = &state.kafka_broker {
2962 if request.messages.is_empty() {
2963 return (
2964 StatusCode::BAD_REQUEST,
2965 Json(serde_json::json!({
2966 "error": "Empty batch",
2967 "message": "At least one message is required"
2968 })),
2969 )
2970 .into_response();
2971 }
2972
2973 let mut results = Vec::new();
2974
2975 for (index, msg_request) in request.messages.iter().enumerate() {
2976 let mut topics = broker.topics.write().await;
2977
2978 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2980 mockforge_kafka::topics::Topic::new(
2981 msg_request.topic.clone(),
2982 mockforge_kafka::topics::TopicConfig::default(),
2983 )
2984 });
2985
2986 let partition_id = if let Some(partition) = msg_request.partition {
2988 partition
2989 } else {
2990 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2991 };
2992
2993 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2995 results.push(serde_json::json!({
2996 "index": index,
2997 "success": false,
2998 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2999 }));
3000 continue;
3001 }
3002
3003 let message = mockforge_kafka::partitions::KafkaMessage {
3005 offset: 0,
3006 timestamp: chrono::Utc::now().timestamp_millis(),
3007 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
3008 value: msg_request.value.as_bytes().to_vec(),
3009 headers: msg_request
3010 .headers
3011 .clone()
3012 .unwrap_or_default()
3013 .into_iter()
3014 .map(|(k, v)| (k, v.as_bytes().to_vec()))
3015 .collect(),
3016 };
3017
3018 match topic_entry.produce(partition_id, message).await {
3020 Ok(offset) => {
3021 if let Some(broker) = &state.kafka_broker {
3023 broker.metrics().record_messages_produced(1);
3024 }
3025
3026 let event = MessageEvent::Kafka(KafkaMessageEvent {
3028 topic: msg_request.topic.clone(),
3029 key: msg_request.key.clone(),
3030 value: msg_request.value.clone(),
3031 partition: partition_id,
3032 offset,
3033 headers: msg_request.headers.clone(),
3034 timestamp: chrono::Utc::now().to_rfc3339(),
3035 });
3036 let _ = state.message_events.send(event);
3037
3038 results.push(serde_json::json!({
3039 "index": index,
3040 "success": true,
3041 "topic": msg_request.topic,
3042 "partition": partition_id,
3043 "offset": offset
3044 }));
3045 }
3046 Err(e) => {
3047 results.push(serde_json::json!({
3048 "index": index,
3049 "success": false,
3050 "error": e.to_string()
3051 }));
3052 }
3053 }
3054
3055 if index < request.messages.len() - 1 && request.delay_ms > 0 {
3057 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3058 }
3059 }
3060
3061 let success_count =
3062 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3063
3064 Json(serde_json::json!({
3065 "success": true,
3066 "total": request.messages.len(),
3067 "succeeded": success_count,
3068 "failed": request.messages.len() - success_count,
3069 "results": results
3070 }))
3071 .into_response()
3072 } else {
3073 (
3074 StatusCode::SERVICE_UNAVAILABLE,
3075 Json(serde_json::json!({
3076 "error": "Kafka broker not available",
3077 "message": "Kafka broker is not enabled or not available."
3078 })),
3079 )
3080 .into_response()
3081 }
3082}
3083
3084#[cfg(feature = "mqtt")]
3087async fn mqtt_messages_stream(
3089 State(state): State<ManagementState>,
3090 Query(params): Query<std::collections::HashMap<String, String>>,
3091) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3092 let rx = state.message_events.subscribe();
3093 let topic_filter = params.get("topic").cloned();
3094
3095 let stream = stream::unfold(rx, move |mut rx| {
3096 let topic_filter = topic_filter.clone();
3097
3098 async move {
3099 loop {
3100 match rx.recv().await {
3101 Ok(MessageEvent::Mqtt(event)) => {
3102 if let Some(filter) = &topic_filter {
3104 if !event.topic.contains(filter) {
3105 continue;
3106 }
3107 }
3108
3109 let event_json = serde_json::json!({
3110 "protocol": "mqtt",
3111 "topic": event.topic,
3112 "payload": event.payload,
3113 "qos": event.qos,
3114 "retain": event.retain,
3115 "timestamp": event.timestamp,
3116 });
3117
3118 if let Ok(event_data) = serde_json::to_string(&event_json) {
3119 let sse_event = Event::default().event("mqtt_message").data(event_data);
3120 return Some((Ok(sse_event), rx));
3121 }
3122 }
3123 #[cfg(feature = "kafka")]
3124 Ok(MessageEvent::Kafka(_)) => {
3125 continue;
3127 }
3128 Err(broadcast::error::RecvError::Closed) => {
3129 return None;
3130 }
3131 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3132 warn!("MQTT message stream lagged, skipped {} messages", skipped);
3133 continue;
3134 }
3135 }
3136 }
3137 }
3138 });
3139
3140 Sse::new(stream).keep_alive(
3141 axum::response::sse::KeepAlive::new()
3142 .interval(std::time::Duration::from_secs(15))
3143 .text("keep-alive-text"),
3144 )
3145}
3146
3147#[cfg(feature = "kafka")]
3148async fn kafka_messages_stream(
3150 State(state): State<ManagementState>,
3151 Query(params): Query<std::collections::HashMap<String, String>>,
3152) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3153 let rx = state.message_events.subscribe();
3154 let topic_filter = params.get("topic").cloned();
3155
3156 let stream = stream::unfold(rx, move |mut rx| {
3157 let topic_filter = topic_filter.clone();
3158
3159 async move {
3160 loop {
3161 match rx.recv().await {
3162 #[cfg(feature = "mqtt")]
3163 Ok(MessageEvent::Mqtt(_)) => {
3164 continue;
3166 }
3167 Ok(MessageEvent::Kafka(event)) => {
3168 if let Some(filter) = &topic_filter {
3170 if !event.topic.contains(filter) {
3171 continue;
3172 }
3173 }
3174
3175 let event_json = serde_json::json!({
3176 "protocol": "kafka",
3177 "topic": event.topic,
3178 "key": event.key,
3179 "value": event.value,
3180 "partition": event.partition,
3181 "offset": event.offset,
3182 "headers": event.headers,
3183 "timestamp": event.timestamp,
3184 });
3185
3186 if let Ok(event_data) = serde_json::to_string(&event_json) {
3187 let sse_event =
3188 Event::default().event("kafka_message").data(event_data);
3189 return Some((Ok(sse_event), rx));
3190 }
3191 }
3192 Err(broadcast::error::RecvError::Closed) => {
3193 return None;
3194 }
3195 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3196 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3197 continue;
3198 }
3199 }
3200 }
3201 }
3202 });
3203
3204 Sse::new(stream).keep_alive(
3205 axum::response::sse::KeepAlive::new()
3206 .interval(std::time::Duration::from_secs(15))
3207 .text("keep-alive-text"),
3208 )
3209}
3210
3211#[derive(Debug, Deserialize)]
3215pub struct GenerateSpecRequest {
3216 pub query: String,
3218 pub spec_type: String,
3220 pub api_version: Option<String>,
3222}
3223
3224#[derive(Debug, Deserialize)]
3226pub struct GenerateOpenApiFromTrafficRequest {
3227 #[serde(default)]
3229 pub database_path: Option<String>,
3230 #[serde(default)]
3232 pub since: Option<String>,
3233 #[serde(default)]
3235 pub until: Option<String>,
3236 #[serde(default)]
3238 pub path_pattern: Option<String>,
3239 #[serde(default = "default_min_confidence")]
3241 pub min_confidence: f64,
3242}
3243
3244fn default_min_confidence() -> f64 {
3245 0.7
3246}
3247
3248#[cfg(feature = "data-faker")]
3250async fn generate_ai_spec(
3251 State(_state): State<ManagementState>,
3252 Json(request): Json<GenerateSpecRequest>,
3253) -> impl IntoResponse {
3254 use mockforge_data::rag::{
3255 config::{LlmProvider, RagConfig},
3256 engine::RagEngine,
3257 storage::DocumentStorage,
3258 };
3259 use std::sync::Arc;
3260
3261 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3263 .ok()
3264 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3265
3266 if api_key.is_none() {
3268 return (
3269 StatusCode::SERVICE_UNAVAILABLE,
3270 Json(serde_json::json!({
3271 "error": "AI service not configured",
3272 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3273 })),
3274 )
3275 .into_response();
3276 }
3277
3278 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3280 .unwrap_or_else(|_| "openai".to_string())
3281 .to_lowercase();
3282
3283 let provider = match provider_str.as_str() {
3284 "openai" => LlmProvider::OpenAI,
3285 "anthropic" => LlmProvider::Anthropic,
3286 "ollama" => LlmProvider::Ollama,
3287 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3288 _ => LlmProvider::OpenAI,
3289 };
3290
3291 let api_endpoint =
3292 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3293 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3294 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3295 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3296 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3297 });
3298
3299 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3300 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3301 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3302 LlmProvider::Ollama => "llama2".to_string(),
3303 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3304 });
3305
3306 let rag_config = RagConfig {
3308 provider,
3309 api_endpoint,
3310 api_key,
3311 model,
3312 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3313 .unwrap_or_else(|_| "4096".to_string())
3314 .parse()
3315 .unwrap_or(4096),
3316 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3317 .unwrap_or_else(|_| "0.3".to_string())
3318 .parse()
3319 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3321 .unwrap_or_else(|_| "60".to_string())
3322 .parse()
3323 .unwrap_or(60),
3324 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3325 .unwrap_or_else(|_| "4000".to_string())
3326 .parse()
3327 .unwrap_or(4000),
3328 ..Default::default()
3329 };
3330
3331 let spec_type_label = match request.spec_type.as_str() {
3333 "openapi" => "OpenAPI 3.0",
3334 "graphql" => "GraphQL",
3335 "asyncapi" => "AsyncAPI",
3336 _ => "OpenAPI 3.0",
3337 };
3338
3339 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3340
3341 let prompt = format!(
3342 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3343
3344User Requirements:
3345{}
3346
3347Instructions:
33481. Generate a complete, valid {} specification
33492. Include all paths, operations, request/response schemas, and components
33503. Use realistic field names and data types
33514. Include proper descriptions and examples
33525. Follow {} best practices
33536. Return ONLY the specification, no additional explanation
33547. For OpenAPI, use version {}
3355
3356Return the specification in {} format."#,
3357 spec_type_label,
3358 request.query,
3359 spec_type_label,
3360 spec_type_label,
3361 api_version,
3362 if request.spec_type == "graphql" {
3363 "GraphQL SDL"
3364 } else {
3365 "YAML"
3366 }
3367 );
3368
3369 use mockforge_data::rag::storage::InMemoryStorage;
3374 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3375
3376 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3378 Ok(engine) => engine,
3379 Err(e) => {
3380 return (
3381 StatusCode::INTERNAL_SERVER_ERROR,
3382 Json(serde_json::json!({
3383 "error": "Failed to initialize RAG engine",
3384 "message": e.to_string()
3385 })),
3386 )
3387 .into_response();
3388 }
3389 };
3390
3391 match rag_engine.generate(&prompt, None).await {
3393 Ok(generated_text) => {
3394 let spec = if request.spec_type == "graphql" {
3396 extract_graphql_schema(&generated_text)
3398 } else {
3399 extract_yaml_spec(&generated_text)
3401 };
3402
3403 Json(serde_json::json!({
3404 "success": true,
3405 "spec": spec,
3406 "spec_type": request.spec_type,
3407 }))
3408 .into_response()
3409 }
3410 Err(e) => (
3411 StatusCode::INTERNAL_SERVER_ERROR,
3412 Json(serde_json::json!({
3413 "error": "AI generation failed",
3414 "message": e.to_string()
3415 })),
3416 )
3417 .into_response(),
3418 }
3419}
3420
3421#[cfg(not(feature = "data-faker"))]
3422async fn generate_ai_spec(
3423 State(_state): State<ManagementState>,
3424 Json(_request): Json<GenerateSpecRequest>,
3425) -> impl IntoResponse {
3426 (
3427 StatusCode::NOT_IMPLEMENTED,
3428 Json(serde_json::json!({
3429 "error": "AI features not enabled",
3430 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3431 })),
3432 )
3433 .into_response()
3434}
3435
3436#[cfg(feature = "behavioral-cloning")]
3438async fn generate_openapi_from_traffic(
3439 State(_state): State<ManagementState>,
3440 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3441) -> impl IntoResponse {
3442 use chrono::{DateTime, Utc};
3443 use mockforge_core::intelligent_behavior::{
3444 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3445 IntelligentBehaviorConfig,
3446 };
3447 use mockforge_recorder::{
3448 database::RecorderDatabase,
3449 openapi_export::{QueryFilters, RecordingsToOpenApi},
3450 };
3451 use std::path::PathBuf;
3452
3453 let db_path = if let Some(ref path) = request.database_path {
3455 PathBuf::from(path)
3456 } else {
3457 std::env::current_dir()
3458 .unwrap_or_else(|_| PathBuf::from("."))
3459 .join("recordings.db")
3460 };
3461
3462 let db = match RecorderDatabase::new(&db_path).await {
3464 Ok(db) => db,
3465 Err(e) => {
3466 return (
3467 StatusCode::BAD_REQUEST,
3468 Json(serde_json::json!({
3469 "error": "Database error",
3470 "message": format!("Failed to open recorder database: {}", e)
3471 })),
3472 )
3473 .into_response();
3474 }
3475 };
3476
3477 let since_dt = if let Some(ref since_str) = request.since {
3479 match DateTime::parse_from_rfc3339(since_str) {
3480 Ok(dt) => Some(dt.with_timezone(&Utc)),
3481 Err(e) => {
3482 return (
3483 StatusCode::BAD_REQUEST,
3484 Json(serde_json::json!({
3485 "error": "Invalid date format",
3486 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3487 })),
3488 )
3489 .into_response();
3490 }
3491 }
3492 } else {
3493 None
3494 };
3495
3496 let until_dt = if let Some(ref until_str) = request.until {
3497 match DateTime::parse_from_rfc3339(until_str) {
3498 Ok(dt) => Some(dt.with_timezone(&Utc)),
3499 Err(e) => {
3500 return (
3501 StatusCode::BAD_REQUEST,
3502 Json(serde_json::json!({
3503 "error": "Invalid date format",
3504 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3505 })),
3506 )
3507 .into_response();
3508 }
3509 }
3510 } else {
3511 None
3512 };
3513
3514 let query_filters = QueryFilters {
3516 since: since_dt,
3517 until: until_dt,
3518 path_pattern: request.path_pattern.clone(),
3519 min_status_code: None,
3520 max_requests: Some(1000),
3521 };
3522
3523 let exchanges_from_recorder =
3528 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3529 Ok(exchanges) => exchanges,
3530 Err(e) => {
3531 return (
3532 StatusCode::INTERNAL_SERVER_ERROR,
3533 Json(serde_json::json!({
3534 "error": "Query error",
3535 "message": format!("Failed to query HTTP exchanges: {}", e)
3536 })),
3537 )
3538 .into_response();
3539 }
3540 };
3541
3542 if exchanges_from_recorder.is_empty() {
3543 return (
3544 StatusCode::NOT_FOUND,
3545 Json(serde_json::json!({
3546 "error": "No exchanges found",
3547 "message": "No HTTP exchanges found matching the specified filters"
3548 })),
3549 )
3550 .into_response();
3551 }
3552
3553 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3555 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3556 .into_iter()
3557 .map(|e| LocalHttpExchange {
3558 method: e.method,
3559 path: e.path,
3560 query_params: e.query_params,
3561 headers: e.headers,
3562 body: e.body,
3563 body_encoding: e.body_encoding,
3564 status_code: e.status_code,
3565 response_headers: e.response_headers,
3566 response_body: e.response_body,
3567 response_body_encoding: e.response_body_encoding,
3568 timestamp: e.timestamp,
3569 })
3570 .collect();
3571
3572 let behavior_config = IntelligentBehaviorConfig::default();
3574 let gen_config = OpenApiGenerationConfig {
3575 min_confidence: request.min_confidence,
3576 behavior_model: Some(behavior_config.behavior_model),
3577 };
3578
3579 let generator = OpenApiSpecGenerator::new(gen_config);
3581 let result = match generator.generate_from_exchanges(exchanges).await {
3582 Ok(result) => result,
3583 Err(e) => {
3584 return (
3585 StatusCode::INTERNAL_SERVER_ERROR,
3586 Json(serde_json::json!({
3587 "error": "Generation error",
3588 "message": format!("Failed to generate OpenAPI spec: {}", e)
3589 })),
3590 )
3591 .into_response();
3592 }
3593 };
3594
3595 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3597 raw.clone()
3598 } else {
3599 match serde_json::to_value(&result.spec.spec) {
3600 Ok(json) => json,
3601 Err(e) => {
3602 return (
3603 StatusCode::INTERNAL_SERVER_ERROR,
3604 Json(serde_json::json!({
3605 "error": "Serialization error",
3606 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3607 })),
3608 )
3609 .into_response();
3610 }
3611 }
3612 };
3613
3614 let response = serde_json::json!({
3616 "spec": spec_json,
3617 "metadata": {
3618 "requests_analyzed": result.metadata.requests_analyzed,
3619 "paths_inferred": result.metadata.paths_inferred,
3620 "path_confidence": result.metadata.path_confidence,
3621 "generated_at": result.metadata.generated_at.to_rfc3339(),
3622 "duration_ms": result.metadata.duration_ms,
3623 }
3624 });
3625
3626 Json(response).into_response()
3627}
3628
3629async fn list_rule_explanations(
3631 State(state): State<ManagementState>,
3632 Query(params): Query<std::collections::HashMap<String, String>>,
3633) -> impl IntoResponse {
3634 use mockforge_core::intelligent_behavior::RuleType;
3635
3636 let explanations = state.rule_explanations.read().await;
3637 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3638
3639 if let Some(rule_type_str) = params.get("rule_type") {
3641 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3642 explanations_vec.retain(|e| e.rule_type == rule_type);
3643 }
3644 }
3645
3646 if let Some(min_confidence_str) = params.get("min_confidence") {
3648 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3649 explanations_vec.retain(|e| e.confidence >= min_confidence);
3650 }
3651 }
3652
3653 explanations_vec.sort_by(|a, b| {
3655 b.confidence
3656 .partial_cmp(&a.confidence)
3657 .unwrap_or(std::cmp::Ordering::Equal)
3658 .then_with(|| b.generated_at.cmp(&a.generated_at))
3659 });
3660
3661 Json(serde_json::json!({
3662 "explanations": explanations_vec,
3663 "total": explanations_vec.len(),
3664 }))
3665 .into_response()
3666}
3667
3668async fn get_rule_explanation(
3670 State(state): State<ManagementState>,
3671 Path(rule_id): Path<String>,
3672) -> impl IntoResponse {
3673 let explanations = state.rule_explanations.read().await;
3674
3675 match explanations.get(&rule_id) {
3676 Some(explanation) => Json(serde_json::json!({
3677 "explanation": explanation,
3678 }))
3679 .into_response(),
3680 None => (
3681 StatusCode::NOT_FOUND,
3682 Json(serde_json::json!({
3683 "error": "Rule explanation not found",
3684 "message": format!("No explanation found for rule ID: {}", rule_id)
3685 })),
3686 )
3687 .into_response(),
3688 }
3689}
3690
3691#[derive(Debug, Deserialize)]
3693pub struct LearnFromExamplesRequest {
3694 pub examples: Vec<ExamplePairRequest>,
3696 #[serde(default)]
3698 pub config: Option<serde_json::Value>,
3699}
3700
3701#[derive(Debug, Deserialize)]
3703pub struct ExamplePairRequest {
3704 pub request: serde_json::Value,
3706 pub response: serde_json::Value,
3708}
3709
3710async fn learn_from_examples(
3715 State(state): State<ManagementState>,
3716 Json(request): Json<LearnFromExamplesRequest>,
3717) -> impl IntoResponse {
3718 use mockforge_core::intelligent_behavior::{
3719 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3720 rule_generator::{ExamplePair, RuleGenerator},
3721 };
3722
3723 if request.examples.is_empty() {
3724 return (
3725 StatusCode::BAD_REQUEST,
3726 Json(serde_json::json!({
3727 "error": "No examples provided",
3728 "message": "At least one example pair is required"
3729 })),
3730 )
3731 .into_response();
3732 }
3733
3734 let example_pairs: Result<Vec<ExamplePair>, String> = request
3736 .examples
3737 .into_iter()
3738 .enumerate()
3739 .map(|(idx, ex)| {
3740 let method = ex
3742 .request
3743 .get("method")
3744 .and_then(|v| v.as_str())
3745 .map(|s| s.to_string())
3746 .unwrap_or_else(|| "GET".to_string());
3747 let path = ex
3748 .request
3749 .get("path")
3750 .and_then(|v| v.as_str())
3751 .map(|s| s.to_string())
3752 .unwrap_or_else(|| "/".to_string());
3753 let request_body = ex.request.get("body").cloned();
3754 let query_params = ex
3755 .request
3756 .get("query_params")
3757 .and_then(|v| v.as_object())
3758 .map(|obj| {
3759 obj.iter()
3760 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3761 .collect()
3762 })
3763 .unwrap_or_default();
3764 let headers = ex
3765 .request
3766 .get("headers")
3767 .and_then(|v| v.as_object())
3768 .map(|obj| {
3769 obj.iter()
3770 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3771 .collect()
3772 })
3773 .unwrap_or_default();
3774
3775 let status = ex
3777 .response
3778 .get("status_code")
3779 .or_else(|| ex.response.get("status"))
3780 .and_then(|v| v.as_u64())
3781 .map(|n| n as u16)
3782 .unwrap_or(200);
3783 let response_body = ex.response.get("body").cloned();
3784
3785 Ok(ExamplePair {
3786 method,
3787 path,
3788 request: request_body,
3789 status,
3790 response: response_body,
3791 query_params,
3792 headers,
3793 metadata: {
3794 let mut meta = std::collections::HashMap::new();
3795 meta.insert("source".to_string(), "api".to_string());
3796 meta.insert("example_index".to_string(), idx.to_string());
3797 meta
3798 },
3799 })
3800 })
3801 .collect();
3802
3803 let example_pairs = match example_pairs {
3804 Ok(pairs) => pairs,
3805 Err(e) => {
3806 return (
3807 StatusCode::BAD_REQUEST,
3808 Json(serde_json::json!({
3809 "error": "Invalid examples",
3810 "message": e
3811 })),
3812 )
3813 .into_response();
3814 }
3815 };
3816
3817 let behavior_config = if let Some(config_json) = request.config {
3819 serde_json::from_value(config_json)
3821 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3822 .behavior_model
3823 } else {
3824 BehaviorModelConfig::default()
3825 };
3826
3827 let generator = RuleGenerator::new(behavior_config);
3829
3830 let (rules, explanations) =
3832 match generator.generate_rules_with_explanations(example_pairs).await {
3833 Ok(result) => result,
3834 Err(e) => {
3835 return (
3836 StatusCode::INTERNAL_SERVER_ERROR,
3837 Json(serde_json::json!({
3838 "error": "Rule generation failed",
3839 "message": format!("Failed to generate rules: {}", e)
3840 })),
3841 )
3842 .into_response();
3843 }
3844 };
3845
3846 {
3848 let mut stored_explanations = state.rule_explanations.write().await;
3849 for explanation in &explanations {
3850 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3851 }
3852 }
3853
3854 let response = serde_json::json!({
3856 "success": true,
3857 "rules_generated": {
3858 "consistency_rules": rules.consistency_rules.len(),
3859 "schemas": rules.schemas.len(),
3860 "state_machines": rules.state_transitions.len(),
3861 "system_prompt": !rules.system_prompt.is_empty(),
3862 },
3863 "explanations": explanations.iter().map(|e| serde_json::json!({
3864 "rule_id": e.rule_id,
3865 "rule_type": e.rule_type,
3866 "confidence": e.confidence,
3867 "reasoning": e.reasoning,
3868 })).collect::<Vec<_>>(),
3869 "total_explanations": explanations.len(),
3870 });
3871
3872 Json(response).into_response()
3873}
3874
3875#[cfg(feature = "data-faker")]
3876fn extract_yaml_spec(text: &str) -> String {
3877 if let Some(start) = text.find("```yaml") {
3879 let yaml_start = text[start + 7..].trim_start();
3880 if let Some(end) = yaml_start.find("```") {
3881 return yaml_start[..end].trim().to_string();
3882 }
3883 }
3884 if let Some(start) = text.find("```") {
3885 let content_start = text[start + 3..].trim_start();
3886 if let Some(end) = content_start.find("```") {
3887 return content_start[..end].trim().to_string();
3888 }
3889 }
3890
3891 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3893 return text.trim().to_string();
3894 }
3895
3896 text.trim().to_string()
3898}
3899
3900#[cfg(feature = "data-faker")]
3902fn extract_graphql_schema(text: &str) -> String {
3903 if let Some(start) = text.find("```graphql") {
3905 let schema_start = text[start + 10..].trim_start();
3906 if let Some(end) = schema_start.find("```") {
3907 return schema_start[..end].trim().to_string();
3908 }
3909 }
3910 if let Some(start) = text.find("```") {
3911 let content_start = text[start + 3..].trim_start();
3912 if let Some(end) = content_start.find("```") {
3913 return content_start[..end].trim().to_string();
3914 }
3915 }
3916
3917 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3919 return text.trim().to_string();
3920 }
3921
3922 text.trim().to_string()
3923}
3924
3925async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3929 #[cfg(feature = "chaos")]
3930 {
3931 if let Some(chaos_state) = &_state.chaos_api_state {
3932 let config = chaos_state.config.read().await;
3933 Json(serde_json::json!({
3935 "enabled": config.enabled,
3936 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3937 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3938 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3939 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3940 }))
3941 .into_response()
3942 } else {
3943 Json(serde_json::json!({
3945 "enabled": false,
3946 "latency": null,
3947 "fault_injection": null,
3948 "rate_limit": null,
3949 "traffic_shaping": null,
3950 }))
3951 .into_response()
3952 }
3953 }
3954 #[cfg(not(feature = "chaos"))]
3955 {
3956 Json(serde_json::json!({
3958 "enabled": false,
3959 "latency": null,
3960 "fault_injection": null,
3961 "rate_limit": null,
3962 "traffic_shaping": null,
3963 }))
3964 .into_response()
3965 }
3966}
3967
3968#[derive(Debug, Deserialize)]
3970pub struct ChaosConfigUpdate {
3971 pub enabled: Option<bool>,
3973 pub latency: Option<serde_json::Value>,
3975 pub fault_injection: Option<serde_json::Value>,
3977 pub rate_limit: Option<serde_json::Value>,
3979 pub traffic_shaping: Option<serde_json::Value>,
3981}
3982
3983async fn update_chaos_config(
3985 State(_state): State<ManagementState>,
3986 Json(_config_update): Json<ChaosConfigUpdate>,
3987) -> impl IntoResponse {
3988 #[cfg(feature = "chaos")]
3989 {
3990 if let Some(chaos_state) = &_state.chaos_api_state {
3991 use mockforge_chaos::config::{
3992 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3993 };
3994
3995 let mut config = chaos_state.config.write().await;
3996
3997 if let Some(enabled) = _config_update.enabled {
3999 config.enabled = enabled;
4000 }
4001
4002 if let Some(latency_json) = _config_update.latency {
4004 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
4005 config.latency = Some(latency);
4006 }
4007 }
4008
4009 if let Some(fault_json) = _config_update.fault_injection {
4011 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
4012 config.fault_injection = Some(fault);
4013 }
4014 }
4015
4016 if let Some(rate_json) = _config_update.rate_limit {
4018 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
4019 config.rate_limit = Some(rate);
4020 }
4021 }
4022
4023 if let Some(traffic_json) = _config_update.traffic_shaping {
4025 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
4026 config.traffic_shaping = Some(traffic);
4027 }
4028 }
4029
4030 drop(config);
4033
4034 info!("Chaos configuration updated successfully");
4035 Json(serde_json::json!({
4036 "success": true,
4037 "message": "Chaos configuration updated and applied"
4038 }))
4039 .into_response()
4040 } else {
4041 (
4042 StatusCode::SERVICE_UNAVAILABLE,
4043 Json(serde_json::json!({
4044 "success": false,
4045 "error": "Chaos API not available",
4046 "message": "Chaos engineering is not enabled or configured"
4047 })),
4048 )
4049 .into_response()
4050 }
4051 }
4052 #[cfg(not(feature = "chaos"))]
4053 {
4054 (
4055 StatusCode::NOT_IMPLEMENTED,
4056 Json(serde_json::json!({
4057 "success": false,
4058 "error": "Chaos feature not enabled",
4059 "message": "Chaos engineering feature is not compiled into this build"
4060 })),
4061 )
4062 .into_response()
4063 }
4064}
4065
4066async fn list_network_profiles() -> impl IntoResponse {
4070 use mockforge_core::network_profiles::NetworkProfileCatalog;
4071
4072 let catalog = NetworkProfileCatalog::default();
4073 let profiles: Vec<serde_json::Value> = catalog
4074 .list_profiles_with_description()
4075 .iter()
4076 .map(|(name, description)| {
4077 serde_json::json!({
4078 "name": name,
4079 "description": description,
4080 })
4081 })
4082 .collect();
4083
4084 Json(serde_json::json!({
4085 "profiles": profiles
4086 }))
4087 .into_response()
4088}
4089
4090#[derive(Debug, Deserialize)]
4091pub struct ApplyNetworkProfileRequest {
4093 pub profile_name: String,
4095}
4096
4097async fn apply_network_profile(
4099 State(state): State<ManagementState>,
4100 Json(request): Json<ApplyNetworkProfileRequest>,
4101) -> impl IntoResponse {
4102 use mockforge_core::network_profiles::NetworkProfileCatalog;
4103
4104 let catalog = NetworkProfileCatalog::default();
4105 if let Some(profile) = catalog.get(&request.profile_name) {
4106 if let Some(server_config) = &state.server_config {
4109 let mut config = server_config.write().await;
4110
4111 use mockforge_core::config::NetworkShapingConfig;
4113
4114 let network_shaping = NetworkShapingConfig {
4118 enabled: profile.traffic_shaping.bandwidth.enabled
4119 || profile.traffic_shaping.burst_loss.enabled,
4120 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4122 max_connections: 1000, };
4124
4125 if let Some(ref mut chaos) = config.observability.chaos {
4128 chaos.traffic_shaping = Some(network_shaping);
4129 } else {
4130 use mockforge_core::config::ChaosEngConfig;
4132 config.observability.chaos = Some(ChaosEngConfig {
4133 enabled: true,
4134 latency: None,
4135 fault_injection: None,
4136 rate_limit: None,
4137 traffic_shaping: Some(network_shaping),
4138 scenario: None,
4139 });
4140 }
4141
4142 info!("Network profile '{}' applied to server configuration", request.profile_name);
4143 } else {
4144 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4145 }
4146
4147 #[cfg(feature = "chaos")]
4149 {
4150 if let Some(chaos_state) = &state.chaos_api_state {
4151 use mockforge_chaos::config::TrafficShapingConfig;
4152
4153 let mut chaos_config = chaos_state.config.write().await;
4154 let chaos_traffic_shaping = TrafficShapingConfig {
4156 enabled: profile.traffic_shaping.bandwidth.enabled
4157 || profile.traffic_shaping.burst_loss.enabled,
4158 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4160 max_connections: 0,
4161 connection_timeout_ms: 30000,
4162 };
4163 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4164 chaos_config.enabled = true; drop(chaos_config);
4166 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4167 }
4168 }
4169
4170 Json(serde_json::json!({
4171 "success": true,
4172 "message": format!("Network profile '{}' applied", request.profile_name),
4173 "profile": {
4174 "name": profile.name,
4175 "description": profile.description,
4176 }
4177 }))
4178 .into_response()
4179 } else {
4180 (
4181 StatusCode::NOT_FOUND,
4182 Json(serde_json::json!({
4183 "error": "Profile not found",
4184 "message": format!("Network profile '{}' not found", request.profile_name)
4185 })),
4186 )
4187 .into_response()
4188 }
4189}
4190
4191pub fn management_router_with_ui_builder(
4193 state: ManagementState,
4194 server_config: mockforge_core::config::ServerConfig,
4195) -> Router {
4196 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4197
4198 let management = management_router(state);
4200
4201 let ui_builder_state = UIBuilderState::new(server_config);
4203 let ui_builder = create_ui_builder_router(ui_builder_state);
4204
4205 management.nest("/ui-builder", ui_builder)
4207}
4208
4209pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4211 use crate::spec_import::{spec_import_router, SpecImportState};
4212
4213 let management = management_router(state);
4215
4216 Router::new()
4218 .merge(management)
4219 .merge(spec_import_router(SpecImportState::new()))
4220}
4221
4222#[cfg(test)]
4223mod tests {
4224 use super::*;
4225
4226 #[tokio::test]
4227 async fn test_create_and_get_mock() {
4228 let state = ManagementState::new(None, None, 3000);
4229
4230 let mock = MockConfig {
4231 id: "test-1".to_string(),
4232 name: "Test Mock".to_string(),
4233 method: "GET".to_string(),
4234 path: "/test".to_string(),
4235 response: MockResponse {
4236 body: serde_json::json!({"message": "test"}),
4237 headers: None,
4238 },
4239 enabled: true,
4240 latency_ms: None,
4241 status_code: Some(200),
4242 request_match: None,
4243 priority: None,
4244 scenario: None,
4245 required_scenario_state: None,
4246 new_scenario_state: None,
4247 version: 1,
4248 };
4249
4250 {
4252 let mut mocks = state.mocks.write().await;
4253 mocks.push(mock.clone());
4254 }
4255
4256 let mocks = state.mocks.read().await;
4258 let found = mocks.iter().find(|m| m.id == "test-1");
4259 assert!(found.is_some());
4260 assert_eq!(found.unwrap().name, "Test Mock");
4261 }
4262
4263 #[tokio::test]
4264 async fn test_server_stats() {
4265 let state = ManagementState::new(None, None, 3000);
4266
4267 {
4269 let mut mocks = state.mocks.write().await;
4270 mocks.push(MockConfig {
4271 id: "1".to_string(),
4272 name: "Mock 1".to_string(),
4273 method: "GET".to_string(),
4274 path: "/test1".to_string(),
4275 response: MockResponse {
4276 body: serde_json::json!({}),
4277 headers: None,
4278 },
4279 enabled: true,
4280 latency_ms: None,
4281 status_code: Some(200),
4282 request_match: None,
4283 priority: None,
4284 scenario: None,
4285 required_scenario_state: None,
4286 new_scenario_state: None,
4287 version: 1,
4288 });
4289 mocks.push(MockConfig {
4290 id: "2".to_string(),
4291 name: "Mock 2".to_string(),
4292 method: "POST".to_string(),
4293 path: "/test2".to_string(),
4294 response: MockResponse {
4295 body: serde_json::json!({}),
4296 headers: None,
4297 },
4298 enabled: false,
4299 latency_ms: None,
4300 status_code: Some(201),
4301 request_match: None,
4302 priority: None,
4303 scenario: None,
4304 required_scenario_state: None,
4305 new_scenario_state: None,
4306 version: 1,
4307 });
4308 }
4309
4310 let mocks = state.mocks.read().await;
4311 assert_eq!(mocks.len(), 2);
4312 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4313 }
4314
4315 #[test]
4316 fn test_mock_matches_request_with_xpath_absolute_path() {
4317 let mock = MockConfig {
4318 id: "xpath-1".to_string(),
4319 name: "XPath Match".to_string(),
4320 method: "POST".to_string(),
4321 path: "/xml".to_string(),
4322 response: MockResponse {
4323 body: serde_json::json!({"ok": true}),
4324 headers: None,
4325 },
4326 enabled: true,
4327 latency_ms: None,
4328 status_code: Some(200),
4329 request_match: Some(RequestMatchCriteria {
4330 xpath: Some("/root/order/id".to_string()),
4331 ..Default::default()
4332 }),
4333 priority: None,
4334 scenario: None,
4335 required_scenario_state: None,
4336 new_scenario_state: None,
4337 version: 1,
4338 };
4339
4340 let body = br#"<root><order><id>123</id></order></root>"#;
4341 let headers = std::collections::HashMap::new();
4342 let query = std::collections::HashMap::new();
4343
4344 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4345 }
4346
4347 #[test]
4348 fn test_mock_matches_request_with_xpath_text_predicate() {
4349 let mock = MockConfig {
4350 id: "xpath-2".to_string(),
4351 name: "XPath Predicate Match".to_string(),
4352 method: "POST".to_string(),
4353 path: "/xml".to_string(),
4354 response: MockResponse {
4355 body: serde_json::json!({"ok": true}),
4356 headers: None,
4357 },
4358 enabled: true,
4359 latency_ms: None,
4360 status_code: Some(200),
4361 request_match: Some(RequestMatchCriteria {
4362 xpath: Some("//order/id[text()='123']".to_string()),
4363 ..Default::default()
4364 }),
4365 priority: None,
4366 scenario: None,
4367 required_scenario_state: None,
4368 new_scenario_state: None,
4369 version: 1,
4370 };
4371
4372 let body = br#"<root><order><id>123</id></order></root>"#;
4373 let headers = std::collections::HashMap::new();
4374 let query = std::collections::HashMap::new();
4375
4376 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4377 }
4378
4379 #[test]
4380 fn test_mock_matches_request_with_xpath_no_match() {
4381 let mock = MockConfig {
4382 id: "xpath-3".to_string(),
4383 name: "XPath No Match".to_string(),
4384 method: "POST".to_string(),
4385 path: "/xml".to_string(),
4386 response: MockResponse {
4387 body: serde_json::json!({"ok": true}),
4388 headers: None,
4389 },
4390 enabled: true,
4391 latency_ms: None,
4392 status_code: Some(200),
4393 request_match: Some(RequestMatchCriteria {
4394 xpath: Some("//order/id[text()='456']".to_string()),
4395 ..Default::default()
4396 }),
4397 priority: None,
4398 scenario: None,
4399 required_scenario_state: None,
4400 new_scenario_state: None,
4401 version: 1,
4402 };
4403
4404 let body = br#"<root><order><id>123</id></order></root>"#;
4405 let headers = std::collections::HashMap::new();
4406 let query = std::collections::HashMap::new();
4407
4408 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4409 }
4410
4411 fn test_mock(id: &str, name: &str) -> MockConfig {
4413 MockConfig {
4414 id: id.to_string(),
4415 name: name.to_string(),
4416 method: "GET".to_string(),
4417 path: "/test".to_string(),
4418 response: MockResponse {
4419 body: serde_json::json!({"message": "test"}),
4420 headers: None,
4421 },
4422 enabled: true,
4423 latency_ms: None,
4424 status_code: Some(200),
4425 request_match: None,
4426 priority: None,
4427 scenario: None,
4428 required_scenario_state: None,
4429 new_scenario_state: None,
4430 version: 0, }
4432 }
4433
4434 #[tokio::test]
4435 async fn test_create_mock_sets_version_to_1() {
4436 use axum::http::Request;
4437 use tower::ServiceExt;
4438
4439 let state = ManagementState::new(None, None, 3000);
4440 let app = management_router(state.clone());
4441
4442 let mock = test_mock("", "Version Test");
4443 let body = serde_json::to_string(&mock).unwrap();
4444
4445 let response = app
4446 .oneshot(
4447 Request::builder()
4448 .method("POST")
4449 .uri("/mocks")
4450 .header("content-type", "application/json")
4451 .body(axum::body::Body::from(body))
4452 .unwrap(),
4453 )
4454 .await
4455 .unwrap();
4456
4457 assert_eq!(response.status(), StatusCode::OK);
4458
4459 let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4460 let created: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4461 assert_eq!(created.version, 1, "Newly created mock should have version 1");
4462 assert!(!created.id.is_empty(), "Mock should get an auto-generated ID");
4463 }
4464
4465 #[tokio::test]
4466 async fn test_update_mock_with_correct_version_succeeds() {
4467 use axum::http::Request;
4468 use tower::ServiceExt;
4469
4470 let state = ManagementState::new(None, None, 3000);
4471
4472 {
4474 let mut mocks = state.mocks.write().await;
4475 let mut mock = test_mock("v-test-1", "Original");
4476 mock.version = 1;
4477 mocks.push(mock);
4478 }
4479
4480 let app = management_router(state.clone());
4481
4482 let mut updated = test_mock("v-test-1", "Updated");
4484 updated.version = 1; let body = serde_json::to_string(&updated).unwrap();
4487
4488 let response = app
4489 .oneshot(
4490 Request::builder()
4491 .method("PUT")
4492 .uri("/mocks/v-test-1")
4493 .header("content-type", "application/json")
4494 .body(axum::body::Body::from(body))
4495 .unwrap(),
4496 )
4497 .await
4498 .unwrap();
4499
4500 assert_eq!(response.status(), StatusCode::OK);
4501
4502 let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4503 let result: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4504 assert_eq!(result.version, 2, "Version should be incremented to 2");
4505 assert_eq!(result.name, "Updated");
4506 }
4507
4508 #[tokio::test]
4509 async fn test_update_mock_with_stale_version_returns_409() {
4510 use axum::http::Request;
4511 use tower::ServiceExt;
4512
4513 let state = ManagementState::new(None, None, 3000);
4514
4515 {
4517 let mut mocks = state.mocks.write().await;
4518 let mut mock = test_mock("v-test-2", "Current");
4519 mock.version = 3;
4520 mocks.push(mock);
4521 }
4522
4523 let app = management_router(state.clone());
4524
4525 let mut stale = test_mock("v-test-2", "Stale Update");
4527 stale.version = 1;
4528
4529 let body = serde_json::to_string(&stale).unwrap();
4530
4531 let response = app
4532 .oneshot(
4533 Request::builder()
4534 .method("PUT")
4535 .uri("/mocks/v-test-2")
4536 .header("content-type", "application/json")
4537 .body(axum::body::Body::from(body))
4538 .unwrap(),
4539 )
4540 .await
4541 .unwrap();
4542
4543 assert_eq!(
4544 response.status(),
4545 StatusCode::CONFLICT,
4546 "Stale version should be rejected with 409 Conflict"
4547 );
4548
4549 let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4550 let error: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
4551 assert_eq!(error["error"], "Version conflict");
4552 assert_eq!(error["current_version"], 3);
4553
4554 let mocks = state.mocks.read().await;
4556 let mock = mocks.iter().find(|m| m.id == "v-test-2").unwrap();
4557 assert_eq!(mock.name, "Current", "Mock should not have been modified");
4558 assert_eq!(mock.version, 3, "Version should remain unchanged");
4559 }
4560
4561 #[tokio::test]
4562 async fn test_get_mock_includes_version() {
4563 use axum::http::Request;
4564 use tower::ServiceExt;
4565
4566 let state = ManagementState::new(None, None, 3000);
4567
4568 {
4569 let mut mocks = state.mocks.write().await;
4570 let mut mock = test_mock("v-get-1", "Get Test");
4571 mock.version = 5;
4572 mocks.push(mock);
4573 }
4574
4575 let app = management_router(state.clone());
4576
4577 let response = app
4578 .oneshot(
4579 Request::builder()
4580 .method("GET")
4581 .uri("/mocks/v-get-1")
4582 .body(axum::body::Body::empty())
4583 .unwrap(),
4584 )
4585 .await
4586 .unwrap();
4587
4588 assert_eq!(response.status(), StatusCode::OK);
4589
4590 let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
4591 let mock: MockConfig = serde_json::from_slice(&body_bytes).unwrap();
4592 assert_eq!(mock.version, 5, "GET should return the stored version");
4593 }
4594}