1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131 pub body: serde_json::Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub headers: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146 pub query_params: std::collections::HashMap<String, String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub body_pattern: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub json_path: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub xpath: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub custom_matcher: Option<String>,
159}
160
161pub fn mock_matches_request(
170 mock: &MockConfig,
171 method: &str,
172 path: &str,
173 headers: &std::collections::HashMap<String, String>,
174 query_params: &std::collections::HashMap<String, String>,
175 body: Option<&[u8]>,
176) -> bool {
177 use regex::Regex;
178
179 if !mock.enabled {
181 return false;
182 }
183
184 if mock.method.to_uppercase() != method.to_uppercase() {
186 return false;
187 }
188
189 if !path_matches_pattern(&mock.path, path) {
191 return false;
192 }
193
194 if let Some(criteria) = &mock.request_match {
196 for (key, expected_value) in &criteria.headers {
198 let header_key_lower = key.to_lowercase();
199 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201 if let Some((_, actual_value)) = found {
202 if let Ok(re) = Regex::new(expected_value) {
204 if !re.is_match(actual_value) {
205 return false;
206 }
207 } else if actual_value != expected_value {
208 return false;
209 }
210 } else {
211 return false; }
213 }
214
215 for (key, expected_value) in &criteria.query_params {
217 if let Some(actual_value) = query_params.get(key) {
218 if actual_value != expected_value {
219 return false;
220 }
221 } else {
222 return false; }
224 }
225
226 if let Some(pattern) = &criteria.body_pattern {
228 if let Some(body_bytes) = body {
229 let body_str = String::from_utf8_lossy(body_bytes);
230 if let Ok(re) = Regex::new(pattern) {
232 if !re.is_match(&body_str) {
233 return false;
234 }
235 } else if body_str.as_ref() != pattern {
236 return false;
237 }
238 } else {
239 return false; }
241 }
242
243 if let Some(json_path) = &criteria.json_path {
245 if let Some(body_bytes) = body {
246 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248 if !json_path_exists(&json_value, json_path) {
250 return false;
251 }
252 }
253 }
254 }
255 }
256
257 if let Some(xpath) = &criteria.xpath {
259 if let Some(body_bytes) = body {
260 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261 if !xml_xpath_exists(body_str, xpath) {
262 return false;
263 }
264 } else {
265 return false;
266 }
267 } else {
268 return false; }
270 }
271
272 if let Some(custom) = &criteria.custom_matcher {
274 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275 return false;
276 }
277 }
278 }
279
280 true
281}
282
283fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285 if pattern == path {
287 return true;
288 }
289
290 if pattern == "*" {
292 return true;
293 }
294
295 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299 if pattern_parts.len() != path_parts.len() {
300 if pattern.contains('*') {
302 return matches_wildcard_pattern(pattern, path);
303 }
304 return false;
305 }
306
307 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310 continue; }
312
313 if pattern_part != path_part {
314 return false;
315 }
316 }
317
318 true
319}
320
321fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323 use regex::Regex;
324
325 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329 return re.is_match(path);
330 }
331
332 false
333}
334
335fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343 let path = if json_path == "$" {
344 return true;
345 } else if let Some(p) = json_path.strip_prefix("$.") {
346 p
347 } else if let Some(p) = json_path.strip_prefix('$') {
348 p.strip_prefix('.').unwrap_or(p)
349 } else {
350 json_path
351 };
352
353 let mut current = json;
354 for segment in split_json_path_segments(path) {
355 match segment {
356 JsonPathSegment::Field(name) => {
357 if let Some(obj) = current.as_object() {
358 if let Some(value) = obj.get(name) {
359 current = value;
360 } else {
361 return false;
362 }
363 } else {
364 return false;
365 }
366 }
367 JsonPathSegment::Index(idx) => {
368 if let Some(arr) = current.as_array() {
369 if let Some(value) = arr.get(idx) {
370 current = value;
371 } else {
372 return false;
373 }
374 } else {
375 return false;
376 }
377 }
378 JsonPathSegment::Wildcard => {
379 if let Some(arr) = current.as_array() {
380 return !arr.is_empty();
381 }
382 return false;
383 }
384 }
385 }
386 true
387}
388
389enum JsonPathSegment<'a> {
390 Field(&'a str),
391 Index(usize),
392 Wildcard,
393}
394
395fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397 let mut segments = Vec::new();
398 for part in path.split('.') {
399 if part.is_empty() {
400 continue;
401 }
402 if let Some(bracket_start) = part.find('[') {
403 let field_name = &part[..bracket_start];
404 if !field_name.is_empty() {
405 segments.push(JsonPathSegment::Field(field_name));
406 }
407 let bracket_content = &part[bracket_start + 1..part.len() - 1];
408 if bracket_content == "*" {
409 segments.push(JsonPathSegment::Wildcard);
410 } else if let Ok(idx) = bracket_content.parse::<usize>() {
411 segments.push(JsonPathSegment::Index(idx));
412 }
413 } else {
414 segments.push(JsonPathSegment::Field(part));
415 }
416 }
417 segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422 name: String,
423 text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427 if segment.is_empty() {
428 return None;
429 }
430
431 let trimmed = segment.trim();
432 if let Some(bracket_start) = trimmed.find('[') {
433 if !trimmed.ends_with(']') {
434 return None;
435 }
436
437 let name = trimmed[..bracket_start].trim();
438 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439 let predicate = predicate.trim();
440
441 if let Some(raw) = predicate.strip_prefix("text()=") {
443 let raw = raw.trim();
444 if raw.len() >= 2
445 && ((raw.starts_with('"') && raw.ends_with('"'))
446 || (raw.starts_with('\'') && raw.ends_with('\'')))
447 {
448 let text = raw[1..raw.len() - 1].to_string();
449 if !name.is_empty() {
450 return Some(XPathSegment {
451 name: name.to_string(),
452 text_equals: Some(text),
453 });
454 }
455 }
456 }
457
458 None
459 } else {
460 Some(XPathSegment {
461 name: trimmed.to_string(),
462 text_equals: None,
463 })
464 }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468 if !node.is_element() {
469 return false;
470 }
471 if node.tag_name().name() != segment.name {
472 return false;
473 }
474 match &segment.text_equals {
475 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476 None => true,
477 }
478}
479
480fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487 let doc = match roxmltree::Document::parse(xml_body) {
488 Ok(doc) => doc,
489 Err(err) => {
490 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491 return false;
492 }
493 };
494
495 let expr = xpath.trim();
496 if expr.is_empty() {
497 return false;
498 }
499
500 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501 (true, rest)
502 } else if let Some(rest) = expr.strip_prefix('/') {
503 (false, rest)
504 } else {
505 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506 return false;
507 };
508
509 let segments: Vec<XPathSegment> = path_str
510 .split('/')
511 .filter(|s| !s.trim().is_empty())
512 .filter_map(parse_xpath_segment)
513 .collect();
514
515 if segments.is_empty() {
516 return false;
517 }
518
519 if is_descendant {
520 let first = &segments[0];
521 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522 let mut frontier = vec![node];
523 for segment in &segments[1..] {
524 let mut next_frontier = Vec::new();
525 for parent in &frontier {
526 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527 next_frontier.push(child);
528 }
529 }
530 if next_frontier.is_empty() {
531 frontier.clear();
532 break;
533 }
534 frontier = next_frontier;
535 }
536 if !frontier.is_empty() {
537 return true;
538 }
539 }
540 false
541 } else {
542 let mut frontier = vec![doc.root_element()];
543 for (index, segment) in segments.iter().enumerate() {
544 let mut next_frontier = Vec::new();
545 for parent in &frontier {
546 if index == 0 {
547 if segment_matches(*parent, segment) {
548 next_frontier.push(*parent);
549 }
550 continue;
551 }
552 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553 next_frontier.push(child);
554 }
555 }
556 if next_frontier.is_empty() {
557 return false;
558 }
559 frontier = next_frontier;
560 }
561 !frontier.is_empty()
562 }
563}
564
565fn evaluate_custom_matcher(
567 expression: &str,
568 method: &str,
569 path: &str,
570 headers: &std::collections::HashMap<String, String>,
571 query_params: &std::collections::HashMap<String, String>,
572 body: Option<&[u8]>,
573) -> bool {
574 use regex::Regex;
575
576 let expr = expression.trim();
577
578 if expr.contains("==") {
580 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581 if parts.len() != 2 {
582 return false;
583 }
584
585 let field = parts[0];
586 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588 match field {
589 "method" => method == expected_value,
590 "path" => path == expected_value,
591 _ if field.starts_with("headers.") => {
592 let header_name = &field[8..];
593 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594 }
595 _ if field.starts_with("query.") => {
596 let param_name = &field[6..];
597 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598 }
599 _ => false,
600 }
601 }
602 else if expr.contains("=~") {
604 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605 if parts.len() != 2 {
606 return false;
607 }
608
609 let field = parts[0];
610 let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612 if let Ok(re) = Regex::new(pattern) {
613 match field {
614 "method" => re.is_match(method),
615 "path" => re.is_match(path),
616 _ if field.starts_with("headers.") => {
617 let header_name = &field[8..];
618 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619 }
620 _ if field.starts_with("query.") => {
621 let param_name = &field[6..];
622 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623 }
624 _ => false,
625 }
626 } else {
627 false
628 }
629 }
630 else if expr.contains("contains") {
632 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633 if parts.len() != 2 {
634 return false;
635 }
636
637 let field = parts[0];
638 let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640 match field {
641 "path" => path.contains(search_value),
642 _ if field.starts_with("headers.") => {
643 let header_name = &field[8..];
644 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645 }
646 _ if field.starts_with("body") => {
647 if let Some(body_bytes) = body {
648 let body_str = String::from_utf8_lossy(body_bytes);
649 body_str.contains(search_value)
650 } else {
651 false
652 }
653 }
654 _ => false,
655 }
656 } else {
657 tracing::warn!("Unknown custom matcher expression format: {}", expr);
659 false
660 }
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666 pub uptime_seconds: u64,
668 pub total_requests: u64,
670 pub active_mocks: usize,
672 pub enabled_mocks: usize,
674 pub registered_routes: usize,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681 pub version: String,
683 pub port: u16,
685 pub has_openapi_spec: bool,
687 #[serde(skip_serializing_if = "Option::is_none")]
689 pub spec_path: Option<String>,
690}
691
692#[derive(Clone)]
694pub struct ManagementState {
695 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697 pub spec: Option<Arc<OpenApiSpec>>,
699 pub spec_path: Option<String>,
701 pub port: u16,
703 pub start_time: std::time::Instant,
705 pub request_counter: Arc<RwLock<u64>>,
707 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709 #[cfg(feature = "smtp")]
711 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712 #[cfg(feature = "mqtt")]
714 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715 #[cfg(feature = "kafka")]
717 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718 #[cfg(any(feature = "mqtt", feature = "kafka"))]
720 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721 pub state_machine_manager:
723 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728 pub rule_explanations: Arc<
730 RwLock<
731 std::collections::HashMap<
732 String,
733 mockforge_core::intelligent_behavior::RuleExplanation,
734 >,
735 >,
736 >,
737 #[cfg(feature = "chaos")]
739 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742 #[cfg(feature = "conformance")]
744 pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755 Self {
756 mocks: Arc::new(RwLock::new(Vec::new())),
757 spec,
758 spec_path,
759 port,
760 start_time: std::time::Instant::now(),
761 request_counter: Arc::new(RwLock::new(0)),
762 proxy_config: None,
763 #[cfg(feature = "smtp")]
764 smtp_registry: None,
765 #[cfg(feature = "mqtt")]
766 mqtt_broker: None,
767 #[cfg(feature = "kafka")]
768 kafka_broker: None,
769 #[cfg(any(feature = "mqtt", feature = "kafka"))]
770 message_events: {
771 let capacity = get_message_broadcast_capacity();
772 let (tx, _) = broadcast::channel(capacity);
773 Arc::new(tx)
774 },
775 state_machine_manager: Arc::new(RwLock::new(
776 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777 )),
778 ws_broadcast: None,
779 lifecycle_hooks: None,
780 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781 #[cfg(feature = "chaos")]
782 chaos_api_state: None,
783 server_config: None,
784 #[cfg(feature = "conformance")]
785 conformance_state: crate::handlers::conformance::ConformanceState::new(),
786 }
787 }
788
789 pub fn with_lifecycle_hooks(
791 mut self,
792 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793 ) -> Self {
794 self.lifecycle_hooks = Some(hooks);
795 self
796 }
797
798 pub fn with_ws_broadcast(
800 mut self,
801 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802 ) -> Self {
803 self.ws_broadcast = Some(ws_broadcast);
804 self
805 }
806
807 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809 self.proxy_config = Some(proxy_config);
810 self
811 }
812
813 #[cfg(feature = "smtp")]
814 pub fn with_smtp_registry(
816 mut self,
817 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818 ) -> Self {
819 self.smtp_registry = Some(smtp_registry);
820 self
821 }
822
823 #[cfg(feature = "mqtt")]
824 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826 self.mqtt_broker = Some(mqtt_broker);
827 self
828 }
829
830 #[cfg(feature = "kafka")]
831 pub fn with_kafka_broker(
833 mut self,
834 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835 ) -> Self {
836 self.kafka_broker = Some(kafka_broker);
837 self
838 }
839
840 #[cfg(feature = "chaos")]
841 pub fn with_chaos_api_state(
843 mut self,
844 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845 ) -> Self {
846 self.chaos_api_state = Some(chaos_api_state);
847 self
848 }
849
850 pub fn with_server_config(
852 mut self,
853 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854 ) -> Self {
855 self.server_config = Some(server_config);
856 self
857 }
858}
859
860async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862 let mocks = state.mocks.read().await;
863 Json(serde_json::json!({
864 "mocks": *mocks,
865 "total": mocks.len(),
866 "enabled": mocks.iter().filter(|m| m.enabled).count()
867 }))
868}
869
870async fn get_mock(
872 State(state): State<ManagementState>,
873 Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875 let mocks = state.mocks.read().await;
876 mocks
877 .iter()
878 .find(|m| m.id == id)
879 .cloned()
880 .map(Json)
881 .ok_or(StatusCode::NOT_FOUND)
882}
883
884async fn create_mock(
886 State(state): State<ManagementState>,
887 Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889 let mut mocks = state.mocks.write().await;
890
891 if mock.id.is_empty() {
893 mock.id = uuid::Uuid::new_v4().to_string();
894 }
895
896 if mocks.iter().any(|m| m.id == mock.id) {
898 return Err(StatusCode::CONFLICT);
899 }
900
901 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903 if let Some(hooks) = &state.lifecycle_hooks {
905 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906 id: mock.id.clone(),
907 name: mock.name.clone(),
908 config: serde_json::to_value(&mock).unwrap_or_default(),
909 };
910 hooks.invoke_mock_created(&event).await;
911 }
912
913 mocks.push(mock.clone());
914
915 if let Some(tx) = &state.ws_broadcast {
917 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918 }
919
920 Ok(Json(mock))
921}
922
923async fn update_mock(
925 State(state): State<ManagementState>,
926 Path(id): Path<String>,
927 Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929 let mut mocks = state.mocks.write().await;
930
931 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933 let old_mock = mocks[position].clone();
935
936 info!("Updating mock: {}", id);
937 mocks[position] = updated_mock.clone();
938
939 if let Some(hooks) = &state.lifecycle_hooks {
941 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942 id: updated_mock.id.clone(),
943 name: updated_mock.name.clone(),
944 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945 };
946 hooks.invoke_mock_updated(&event).await;
947
948 if old_mock.enabled != updated_mock.enabled {
950 let state_event = if updated_mock.enabled {
951 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952 id: updated_mock.id.clone(),
953 }
954 } else {
955 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956 id: updated_mock.id.clone(),
957 }
958 };
959 hooks.invoke_mock_state_changed(&state_event).await;
960 }
961 }
962
963 if let Some(tx) = &state.ws_broadcast {
965 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966 }
967
968 Ok(Json(updated_mock))
969}
970
971async fn delete_mock(
973 State(state): State<ManagementState>,
974 Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976 let mut mocks = state.mocks.write().await;
977
978 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980 let deleted_mock = mocks[position].clone();
982
983 info!("Deleting mock: {}", id);
984 mocks.remove(position);
985
986 if let Some(hooks) = &state.lifecycle_hooks {
988 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989 id: deleted_mock.id.clone(),
990 name: deleted_mock.name.clone(),
991 };
992 hooks.invoke_mock_deleted(&event).await;
993 }
994
995 if let Some(tx) = &state.ws_broadcast {
997 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998 }
999
1000 Ok(StatusCode::NO_CONTENT)
1001}
1002
1003#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006 pub config: serde_json::Value,
1008 #[serde(default = "default_format")]
1010 pub format: String,
1011}
1012
1013fn default_format() -> String {
1014 "json".to_string()
1015}
1016
1017async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019 use mockforge_core::config::ServerConfig;
1020
1021 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022 "yaml" | "yml" => {
1023 let yaml_str = match serde_json::to_string(&request.config) {
1024 Ok(s) => s,
1025 Err(e) => {
1026 return (
1027 StatusCode::BAD_REQUEST,
1028 Json(serde_json::json!({
1029 "valid": false,
1030 "error": format!("Failed to convert to string: {}", e),
1031 "message": "Configuration validation failed"
1032 })),
1033 )
1034 .into_response();
1035 }
1036 };
1037 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038 }
1039 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040 };
1041
1042 match config_result {
1043 Ok(_) => Json(serde_json::json!({
1044 "valid": true,
1045 "message": "Configuration is valid"
1046 }))
1047 .into_response(),
1048 Err(e) => (
1049 StatusCode::BAD_REQUEST,
1050 Json(serde_json::json!({
1051 "valid": false,
1052 "error": format!("Invalid configuration: {}", e),
1053 "message": "Configuration validation failed"
1054 })),
1055 )
1056 .into_response(),
1057 }
1058}
1059
1060#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063 pub updates: serde_json::Value,
1065}
1066
1067async fn bulk_update_config(
1075 State(state): State<ManagementState>,
1076 Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078 if !request.updates.is_object() {
1080 return (
1081 StatusCode::BAD_REQUEST,
1082 Json(serde_json::json!({
1083 "error": "Invalid request",
1084 "message": "Updates must be a JSON object"
1085 })),
1086 )
1087 .into_response();
1088 }
1089
1090 use mockforge_core::config::ServerConfig;
1092
1093 let base_config = ServerConfig::default();
1095 let base_json = match serde_json::to_value(&base_config) {
1096 Ok(v) => v,
1097 Err(e) => {
1098 return (
1099 StatusCode::INTERNAL_SERVER_ERROR,
1100 Json(serde_json::json!({
1101 "error": "Internal error",
1102 "message": format!("Failed to serialize base config: {}", e)
1103 })),
1104 )
1105 .into_response();
1106 }
1107 };
1108
1109 let mut merged = base_json.clone();
1111 if let (Some(merged_obj), Some(updates_obj)) =
1112 (merged.as_object_mut(), request.updates.as_object())
1113 {
1114 for (key, value) in updates_obj {
1115 merged_obj.insert(key.clone(), value.clone());
1116 }
1117 }
1118
1119 match serde_json::from_value::<ServerConfig>(merged) {
1121 Ok(validated_config) => {
1122 if let Some(ref config_lock) = state.server_config {
1124 let mut config = config_lock.write().await;
1125 *config = validated_config;
1126 Json(serde_json::json!({
1127 "success": true,
1128 "message": "Bulk configuration update applied successfully",
1129 "updates_received": request.updates,
1130 "validated": true,
1131 "applied": true
1132 }))
1133 .into_response()
1134 } else {
1135 Json(serde_json::json!({
1136 "success": true,
1137 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138 "updates_received": request.updates,
1139 "validated": true,
1140 "applied": false
1141 }))
1142 .into_response()
1143 }
1144 }
1145 Err(e) => (
1146 StatusCode::BAD_REQUEST,
1147 Json(serde_json::json!({
1148 "error": "Invalid configuration",
1149 "message": format!("Configuration validation failed: {}", e),
1150 "validated": false
1151 })),
1152 )
1153 .into_response(),
1154 }
1155}
1156
1157async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159 let mocks = state.mocks.read().await;
1160 let request_count = *state.request_counter.read().await;
1161
1162 Json(ServerStats {
1163 uptime_seconds: state.start_time.elapsed().as_secs(),
1164 total_requests: request_count,
1165 active_mocks: mocks.len(),
1166 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167 registered_routes: mocks.len(), })
1169}
1170
1171async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173 Json(ServerConfig {
1174 version: env!("CARGO_PKG_VERSION").to_string(),
1175 port: state.port,
1176 has_openapi_spec: state.spec.is_some(),
1177 spec_path: state.spec_path.clone(),
1178 })
1179}
1180
1181async fn get_openapi_spec(
1183 State(state): State<ManagementState>,
1184) -> Result<Json<serde_json::Value>, StatusCode> {
1185 match &state.spec {
1186 Some(spec) => match &spec.raw_document {
1187 Some(doc) => Ok(Json(doc.clone())),
1188 None => {
1189 match serde_json::to_value(&spec.spec) {
1191 Ok(val) => Ok(Json(val)),
1192 Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
1193 }
1194 }
1195 },
1196 None => Err(StatusCode::NOT_FOUND),
1197 }
1198}
1199
1200async fn health_check() -> Json<serde_json::Value> {
1202 Json(serde_json::json!({
1203 "status": "healthy",
1204 "service": "mockforge-management",
1205 "timestamp": chrono::Utc::now().to_rfc3339()
1206 }))
1207}
1208
1209#[derive(Debug, Clone, Serialize, Deserialize)]
1211#[serde(rename_all = "lowercase")]
1212pub enum ExportFormat {
1213 Json,
1215 Yaml,
1217}
1218
1219async fn export_mocks(
1221 State(state): State<ManagementState>,
1222 Query(params): Query<std::collections::HashMap<String, String>>,
1223) -> Result<(StatusCode, String), StatusCode> {
1224 let mocks = state.mocks.read().await;
1225
1226 let format = params
1227 .get("format")
1228 .map(|f| match f.as_str() {
1229 "yaml" | "yml" => ExportFormat::Yaml,
1230 _ => ExportFormat::Json,
1231 })
1232 .unwrap_or(ExportFormat::Json);
1233
1234 match format {
1235 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1236 .map(|json| (StatusCode::OK, json))
1237 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1238 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1239 .map(|yaml| (StatusCode::OK, yaml))
1240 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1241 }
1242}
1243
1244async fn import_mocks(
1246 State(state): State<ManagementState>,
1247 Json(mocks): Json<Vec<MockConfig>>,
1248) -> impl IntoResponse {
1249 let mut current_mocks = state.mocks.write().await;
1250 current_mocks.clear();
1251 current_mocks.extend(mocks);
1252 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1253}
1254
1255#[cfg(feature = "smtp")]
1256async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1258 if let Some(ref smtp_registry) = state.smtp_registry {
1259 match smtp_registry.get_emails() {
1260 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1261 Err(e) => (
1262 StatusCode::INTERNAL_SERVER_ERROR,
1263 Json(serde_json::json!({
1264 "error": "Failed to retrieve emails",
1265 "message": e.to_string()
1266 })),
1267 ),
1268 }
1269 } else {
1270 (
1271 StatusCode::NOT_IMPLEMENTED,
1272 Json(serde_json::json!({
1273 "error": "SMTP mailbox management not available",
1274 "message": "SMTP server is not enabled or registry not available."
1275 })),
1276 )
1277 }
1278}
1279
1280#[cfg(feature = "smtp")]
1282async fn get_smtp_email(
1283 State(state): State<ManagementState>,
1284 Path(id): Path<String>,
1285) -> impl IntoResponse {
1286 if let Some(ref smtp_registry) = state.smtp_registry {
1287 match smtp_registry.get_email_by_id(&id) {
1288 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1289 Ok(None) => (
1290 StatusCode::NOT_FOUND,
1291 Json(serde_json::json!({
1292 "error": "Email not found",
1293 "id": id
1294 })),
1295 ),
1296 Err(e) => (
1297 StatusCode::INTERNAL_SERVER_ERROR,
1298 Json(serde_json::json!({
1299 "error": "Failed to retrieve email",
1300 "message": e.to_string()
1301 })),
1302 ),
1303 }
1304 } else {
1305 (
1306 StatusCode::NOT_IMPLEMENTED,
1307 Json(serde_json::json!({
1308 "error": "SMTP mailbox management not available",
1309 "message": "SMTP server is not enabled or registry not available."
1310 })),
1311 )
1312 }
1313}
1314
1315#[cfg(feature = "smtp")]
1317async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1318 if let Some(ref smtp_registry) = state.smtp_registry {
1319 match smtp_registry.clear_mailbox() {
1320 Ok(()) => (
1321 StatusCode::OK,
1322 Json(serde_json::json!({
1323 "message": "Mailbox cleared successfully"
1324 })),
1325 ),
1326 Err(e) => (
1327 StatusCode::INTERNAL_SERVER_ERROR,
1328 Json(serde_json::json!({
1329 "error": "Failed to clear mailbox",
1330 "message": e.to_string()
1331 })),
1332 ),
1333 }
1334 } else {
1335 (
1336 StatusCode::NOT_IMPLEMENTED,
1337 Json(serde_json::json!({
1338 "error": "SMTP mailbox management not available",
1339 "message": "SMTP server is not enabled or registry not available."
1340 })),
1341 )
1342 }
1343}
1344
1345#[cfg(feature = "smtp")]
1347async fn export_smtp_mailbox(
1348 Query(params): Query<std::collections::HashMap<String, String>>,
1349) -> impl IntoResponse {
1350 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1351 (
1352 StatusCode::NOT_IMPLEMENTED,
1353 Json(serde_json::json!({
1354 "error": "SMTP mailbox management not available via HTTP API",
1355 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1356 "requested_format": format
1357 })),
1358 )
1359}
1360
1361#[cfg(feature = "smtp")]
1363async fn search_smtp_emails(
1364 State(state): State<ManagementState>,
1365 Query(params): Query<std::collections::HashMap<String, String>>,
1366) -> impl IntoResponse {
1367 if let Some(ref smtp_registry) = state.smtp_registry {
1368 let filters = EmailSearchFilters {
1369 sender: params.get("sender").cloned(),
1370 recipient: params.get("recipient").cloned(),
1371 subject: params.get("subject").cloned(),
1372 body: params.get("body").cloned(),
1373 since: params
1374 .get("since")
1375 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1376 .map(|dt| dt.with_timezone(&chrono::Utc)),
1377 until: params
1378 .get("until")
1379 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1380 .map(|dt| dt.with_timezone(&chrono::Utc)),
1381 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1382 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1383 };
1384
1385 match smtp_registry.search_emails(filters) {
1386 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1387 Err(e) => (
1388 StatusCode::INTERNAL_SERVER_ERROR,
1389 Json(serde_json::json!({
1390 "error": "Failed to search emails",
1391 "message": e.to_string()
1392 })),
1393 ),
1394 }
1395 } else {
1396 (
1397 StatusCode::NOT_IMPLEMENTED,
1398 Json(serde_json::json!({
1399 "error": "SMTP mailbox management not available",
1400 "message": "SMTP server is not enabled or registry not available."
1401 })),
1402 )
1403 }
1404}
1405
1406#[cfg(feature = "mqtt")]
1408#[derive(Debug, Clone, Serialize, Deserialize)]
1409pub struct MqttBrokerStats {
1410 pub connected_clients: usize,
1412 pub active_topics: usize,
1414 pub retained_messages: usize,
1416 pub total_subscriptions: usize,
1418}
1419
1420#[cfg(feature = "mqtt")]
1422async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1423 if let Some(broker) = &state.mqtt_broker {
1424 let connected_clients = broker.get_connected_clients().await.len();
1425 let active_topics = broker.get_active_topics().await.len();
1426 let stats = broker.get_topic_stats().await;
1427
1428 let broker_stats = MqttBrokerStats {
1429 connected_clients,
1430 active_topics,
1431 retained_messages: stats.retained_messages,
1432 total_subscriptions: stats.total_subscriptions,
1433 };
1434
1435 Json(broker_stats).into_response()
1436 } else {
1437 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1438 }
1439}
1440
1441#[cfg(feature = "mqtt")]
1442async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1443 if let Some(broker) = &state.mqtt_broker {
1444 let clients = broker.get_connected_clients().await;
1445 Json(serde_json::json!({
1446 "clients": clients
1447 }))
1448 .into_response()
1449 } else {
1450 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1451 }
1452}
1453
1454#[cfg(feature = "mqtt")]
1455async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1456 if let Some(broker) = &state.mqtt_broker {
1457 let topics = broker.get_active_topics().await;
1458 Json(serde_json::json!({
1459 "topics": topics
1460 }))
1461 .into_response()
1462 } else {
1463 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1464 }
1465}
1466
1467#[cfg(feature = "mqtt")]
1468async fn disconnect_mqtt_client(
1469 State(state): State<ManagementState>,
1470 Path(client_id): Path<String>,
1471) -> impl IntoResponse {
1472 if let Some(broker) = &state.mqtt_broker {
1473 match broker.disconnect_client(&client_id).await {
1474 Ok(_) => {
1475 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1476 }
1477 Err(e) => {
1478 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1479 .into_response()
1480 }
1481 }
1482 } else {
1483 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1484 }
1485}
1486
1487#[cfg(feature = "mqtt")]
1490#[derive(Debug, Deserialize)]
1492pub struct MqttPublishRequest {
1493 pub topic: String,
1495 pub payload: String,
1497 #[serde(default = "default_qos")]
1499 pub qos: u8,
1500 #[serde(default)]
1502 pub retain: bool,
1503}
1504
1505#[cfg(feature = "mqtt")]
1506fn default_qos() -> u8 {
1507 0
1508}
1509
1510#[cfg(feature = "mqtt")]
1511async fn publish_mqtt_message_handler(
1513 State(state): State<ManagementState>,
1514 Json(request): Json<serde_json::Value>,
1515) -> impl IntoResponse {
1516 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1518 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1519 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1520 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1521
1522 if topic.is_none() || payload.is_none() {
1523 return (
1524 StatusCode::BAD_REQUEST,
1525 Json(serde_json::json!({
1526 "error": "Invalid request",
1527 "message": "Missing required fields: topic and payload"
1528 })),
1529 );
1530 }
1531
1532 let topic = topic.unwrap();
1533 let payload = payload.unwrap();
1534
1535 if let Some(broker) = &state.mqtt_broker {
1536 if qos > 2 {
1538 return (
1539 StatusCode::BAD_REQUEST,
1540 Json(serde_json::json!({
1541 "error": "Invalid QoS",
1542 "message": "QoS must be 0, 1, or 2"
1543 })),
1544 );
1545 }
1546
1547 let payload_bytes = payload.as_bytes().to_vec();
1549 let client_id = "mockforge-management-api".to_string();
1550
1551 let publish_result = broker
1552 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1553 .await
1554 .map_err(|e| format!("{}", e));
1555
1556 match publish_result {
1557 Ok(_) => {
1558 let event = MessageEvent::Mqtt(MqttMessageEvent {
1560 topic: topic.clone(),
1561 payload: payload.clone(),
1562 qos,
1563 retain,
1564 timestamp: chrono::Utc::now().to_rfc3339(),
1565 });
1566 let _ = state.message_events.send(event);
1567
1568 (
1569 StatusCode::OK,
1570 Json(serde_json::json!({
1571 "success": true,
1572 "message": format!("Message published to topic '{}'", topic),
1573 "topic": topic,
1574 "qos": qos,
1575 "retain": retain
1576 })),
1577 )
1578 }
1579 Err(error_msg) => (
1580 StatusCode::INTERNAL_SERVER_ERROR,
1581 Json(serde_json::json!({
1582 "error": "Failed to publish message",
1583 "message": error_msg
1584 })),
1585 ),
1586 }
1587 } else {
1588 (
1589 StatusCode::SERVICE_UNAVAILABLE,
1590 Json(serde_json::json!({
1591 "error": "MQTT broker not available",
1592 "message": "MQTT broker is not enabled or not available."
1593 })),
1594 )
1595 }
1596}
1597
1598#[cfg(not(feature = "mqtt"))]
1599async fn publish_mqtt_message_handler(
1601 State(_state): State<ManagementState>,
1602 Json(_request): Json<serde_json::Value>,
1603) -> impl IntoResponse {
1604 (
1605 StatusCode::SERVICE_UNAVAILABLE,
1606 Json(serde_json::json!({
1607 "error": "MQTT feature not enabled",
1608 "message": "MQTT support is not compiled into this build"
1609 })),
1610 )
1611}
1612
1613#[cfg(feature = "mqtt")]
1614#[derive(Debug, Deserialize)]
1616pub struct MqttBatchPublishRequest {
1617 pub messages: Vec<MqttPublishRequest>,
1619 #[serde(default = "default_delay")]
1621 pub delay_ms: u64,
1622}
1623
1624#[cfg(feature = "mqtt")]
1625fn default_delay() -> u64 {
1626 100
1627}
1628
1629#[cfg(feature = "mqtt")]
1630async fn publish_mqtt_batch_handler(
1632 State(state): State<ManagementState>,
1633 Json(request): Json<serde_json::Value>,
1634) -> impl IntoResponse {
1635 let messages_json = request.get("messages").and_then(|v| v.as_array());
1637 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1638
1639 if messages_json.is_none() {
1640 return (
1641 StatusCode::BAD_REQUEST,
1642 Json(serde_json::json!({
1643 "error": "Invalid request",
1644 "message": "Missing required field: messages"
1645 })),
1646 );
1647 }
1648
1649 let messages_json = messages_json.unwrap();
1650
1651 if let Some(broker) = &state.mqtt_broker {
1652 if messages_json.is_empty() {
1653 return (
1654 StatusCode::BAD_REQUEST,
1655 Json(serde_json::json!({
1656 "error": "Empty batch",
1657 "message": "At least one message is required"
1658 })),
1659 );
1660 }
1661
1662 let mut results = Vec::new();
1663 let client_id = "mockforge-management-api".to_string();
1664
1665 for (index, msg_json) in messages_json.iter().enumerate() {
1666 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1667 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1668 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1669 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1670
1671 if topic.is_none() || payload.is_none() {
1672 results.push(serde_json::json!({
1673 "index": index,
1674 "success": false,
1675 "error": "Missing required fields: topic and payload"
1676 }));
1677 continue;
1678 }
1679
1680 let topic = topic.unwrap();
1681 let payload = payload.unwrap();
1682
1683 if qos > 2 {
1685 results.push(serde_json::json!({
1686 "index": index,
1687 "success": false,
1688 "error": "Invalid QoS (must be 0, 1, or 2)"
1689 }));
1690 continue;
1691 }
1692
1693 let payload_bytes = payload.as_bytes().to_vec();
1695
1696 let publish_result = broker
1697 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1698 .await
1699 .map_err(|e| format!("{}", e));
1700
1701 match publish_result {
1702 Ok(_) => {
1703 let event = MessageEvent::Mqtt(MqttMessageEvent {
1705 topic: topic.clone(),
1706 payload: payload.clone(),
1707 qos,
1708 retain,
1709 timestamp: chrono::Utc::now().to_rfc3339(),
1710 });
1711 let _ = state.message_events.send(event);
1712
1713 results.push(serde_json::json!({
1714 "index": index,
1715 "success": true,
1716 "topic": topic,
1717 "qos": qos
1718 }));
1719 }
1720 Err(error_msg) => {
1721 results.push(serde_json::json!({
1722 "index": index,
1723 "success": false,
1724 "error": error_msg
1725 }));
1726 }
1727 }
1728
1729 if index < messages_json.len() - 1 && delay_ms > 0 {
1731 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1732 }
1733 }
1734
1735 let success_count =
1736 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1737
1738 (
1739 StatusCode::OK,
1740 Json(serde_json::json!({
1741 "success": true,
1742 "total": messages_json.len(),
1743 "succeeded": success_count,
1744 "failed": messages_json.len() - success_count,
1745 "results": results
1746 })),
1747 )
1748 } else {
1749 (
1750 StatusCode::SERVICE_UNAVAILABLE,
1751 Json(serde_json::json!({
1752 "error": "MQTT broker not available",
1753 "message": "MQTT broker is not enabled or not available."
1754 })),
1755 )
1756 }
1757}
1758
1759#[cfg(not(feature = "mqtt"))]
1760async fn publish_mqtt_batch_handler(
1762 State(_state): State<ManagementState>,
1763 Json(_request): Json<serde_json::Value>,
1764) -> impl IntoResponse {
1765 (
1766 StatusCode::SERVICE_UNAVAILABLE,
1767 Json(serde_json::json!({
1768 "error": "MQTT feature not enabled",
1769 "message": "MQTT support is not compiled into this build"
1770 })),
1771 )
1772}
1773
1774#[derive(Debug, Deserialize)]
1778struct SetMigrationModeRequest {
1779 mode: String,
1780}
1781
1782async fn get_migration_routes(
1784 State(state): State<ManagementState>,
1785) -> Result<Json<serde_json::Value>, StatusCode> {
1786 let proxy_config = match &state.proxy_config {
1787 Some(config) => config,
1788 None => {
1789 return Ok(Json(serde_json::json!({
1790 "error": "Migration not configured. Proxy config not available."
1791 })));
1792 }
1793 };
1794
1795 let config = proxy_config.read().await;
1796 let routes = config.get_migration_routes();
1797
1798 Ok(Json(serde_json::json!({
1799 "routes": routes
1800 })))
1801}
1802
1803async fn toggle_route_migration(
1805 State(state): State<ManagementState>,
1806 Path(pattern): Path<String>,
1807) -> Result<Json<serde_json::Value>, StatusCode> {
1808 let proxy_config = match &state.proxy_config {
1809 Some(config) => config,
1810 None => {
1811 return Ok(Json(serde_json::json!({
1812 "error": "Migration not configured. Proxy config not available."
1813 })));
1814 }
1815 };
1816
1817 let mut config = proxy_config.write().await;
1818 let new_mode = match config.toggle_route_migration(&pattern) {
1819 Some(mode) => mode,
1820 None => {
1821 return Ok(Json(serde_json::json!({
1822 "error": format!("Route pattern not found: {}", pattern)
1823 })));
1824 }
1825 };
1826
1827 Ok(Json(serde_json::json!({
1828 "pattern": pattern,
1829 "mode": format!("{:?}", new_mode).to_lowercase()
1830 })))
1831}
1832
1833async fn set_route_migration_mode(
1835 State(state): State<ManagementState>,
1836 Path(pattern): Path<String>,
1837 Json(request): Json<SetMigrationModeRequest>,
1838) -> Result<Json<serde_json::Value>, StatusCode> {
1839 let proxy_config = match &state.proxy_config {
1840 Some(config) => config,
1841 None => {
1842 return Ok(Json(serde_json::json!({
1843 "error": "Migration not configured. Proxy config not available."
1844 })));
1845 }
1846 };
1847
1848 use mockforge_core::proxy::config::MigrationMode;
1849 let mode = match request.mode.to_lowercase().as_str() {
1850 "mock" => MigrationMode::Mock,
1851 "shadow" => MigrationMode::Shadow,
1852 "real" => MigrationMode::Real,
1853 "auto" => MigrationMode::Auto,
1854 _ => {
1855 return Ok(Json(serde_json::json!({
1856 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1857 })));
1858 }
1859 };
1860
1861 let mut config = proxy_config.write().await;
1862 let updated = config.update_rule_migration_mode(&pattern, mode);
1863
1864 if !updated {
1865 return Ok(Json(serde_json::json!({
1866 "error": format!("Route pattern not found: {}", pattern)
1867 })));
1868 }
1869
1870 Ok(Json(serde_json::json!({
1871 "pattern": pattern,
1872 "mode": format!("{:?}", mode).to_lowercase()
1873 })))
1874}
1875
1876async fn toggle_group_migration(
1878 State(state): State<ManagementState>,
1879 Path(group): Path<String>,
1880) -> Result<Json<serde_json::Value>, StatusCode> {
1881 let proxy_config = match &state.proxy_config {
1882 Some(config) => config,
1883 None => {
1884 return Ok(Json(serde_json::json!({
1885 "error": "Migration not configured. Proxy config not available."
1886 })));
1887 }
1888 };
1889
1890 let mut config = proxy_config.write().await;
1891 let new_mode = config.toggle_group_migration(&group);
1892
1893 Ok(Json(serde_json::json!({
1894 "group": group,
1895 "mode": format!("{:?}", new_mode).to_lowercase()
1896 })))
1897}
1898
1899async fn set_group_migration_mode(
1901 State(state): State<ManagementState>,
1902 Path(group): Path<String>,
1903 Json(request): Json<SetMigrationModeRequest>,
1904) -> Result<Json<serde_json::Value>, StatusCode> {
1905 let proxy_config = match &state.proxy_config {
1906 Some(config) => config,
1907 None => {
1908 return Ok(Json(serde_json::json!({
1909 "error": "Migration not configured. Proxy config not available."
1910 })));
1911 }
1912 };
1913
1914 use mockforge_core::proxy::config::MigrationMode;
1915 let mode = match request.mode.to_lowercase().as_str() {
1916 "mock" => MigrationMode::Mock,
1917 "shadow" => MigrationMode::Shadow,
1918 "real" => MigrationMode::Real,
1919 "auto" => MigrationMode::Auto,
1920 _ => {
1921 return Ok(Json(serde_json::json!({
1922 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1923 })));
1924 }
1925 };
1926
1927 let mut config = proxy_config.write().await;
1928 config.update_group_migration_mode(&group, mode);
1929
1930 Ok(Json(serde_json::json!({
1931 "group": group,
1932 "mode": format!("{:?}", mode).to_lowercase()
1933 })))
1934}
1935
1936async fn get_migration_groups(
1938 State(state): State<ManagementState>,
1939) -> Result<Json<serde_json::Value>, StatusCode> {
1940 let proxy_config = match &state.proxy_config {
1941 Some(config) => config,
1942 None => {
1943 return Ok(Json(serde_json::json!({
1944 "error": "Migration not configured. Proxy config not available."
1945 })));
1946 }
1947 };
1948
1949 let config = proxy_config.read().await;
1950 let groups = config.get_migration_groups();
1951
1952 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1954 .into_iter()
1955 .map(|(name, info)| {
1956 (
1957 name,
1958 serde_json::json!({
1959 "name": info.name,
1960 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1961 "route_count": info.route_count
1962 }),
1963 )
1964 })
1965 .collect();
1966
1967 Ok(Json(serde_json::json!(groups_json)))
1968}
1969
1970async fn get_migration_status(
1972 State(state): State<ManagementState>,
1973) -> Result<Json<serde_json::Value>, StatusCode> {
1974 let proxy_config = match &state.proxy_config {
1975 Some(config) => config,
1976 None => {
1977 return Ok(Json(serde_json::json!({
1978 "error": "Migration not configured. Proxy config not available."
1979 })));
1980 }
1981 };
1982
1983 let config = proxy_config.read().await;
1984 let routes = config.get_migration_routes();
1985 let groups = config.get_migration_groups();
1986
1987 let mut mock_count = 0;
1988 let mut shadow_count = 0;
1989 let mut real_count = 0;
1990 let mut auto_count = 0;
1991
1992 for route in &routes {
1993 match route.migration_mode {
1994 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1995 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1996 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1997 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1998 }
1999 }
2000
2001 Ok(Json(serde_json::json!({
2002 "total_routes": routes.len(),
2003 "mock_routes": mock_count,
2004 "shadow_routes": shadow_count,
2005 "real_routes": real_count,
2006 "auto_routes": auto_count,
2007 "total_groups": groups.len(),
2008 "migration_enabled": config.migration_enabled
2009 })))
2010}
2011
2012#[derive(Debug, Deserialize, Serialize)]
2016pub struct ProxyRuleRequest {
2017 pub pattern: String,
2019 #[serde(rename = "type")]
2021 pub rule_type: String,
2022 #[serde(default)]
2024 pub status_codes: Vec<u16>,
2025 pub body_transforms: Vec<BodyTransformRequest>,
2027 #[serde(default = "default_true")]
2029 pub enabled: bool,
2030}
2031
2032#[derive(Debug, Deserialize, Serialize)]
2034pub struct BodyTransformRequest {
2035 pub path: String,
2037 pub replace: String,
2039 #[serde(default)]
2041 pub operation: String,
2042}
2043
2044#[derive(Debug, Serialize)]
2046pub struct ProxyRuleResponse {
2047 pub id: usize,
2049 pub pattern: String,
2051 #[serde(rename = "type")]
2053 pub rule_type: String,
2054 pub status_codes: Vec<u16>,
2056 pub body_transforms: Vec<BodyTransformRequest>,
2058 pub enabled: bool,
2060}
2061
2062async fn list_proxy_rules(
2064 State(state): State<ManagementState>,
2065) -> Result<Json<serde_json::Value>, StatusCode> {
2066 let proxy_config = match &state.proxy_config {
2067 Some(config) => config,
2068 None => {
2069 return Ok(Json(serde_json::json!({
2070 "error": "Proxy not configured. Proxy config not available."
2071 })));
2072 }
2073 };
2074
2075 let config = proxy_config.read().await;
2076
2077 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2078
2079 for (idx, rule) in config.request_replacements.iter().enumerate() {
2081 rules.push(ProxyRuleResponse {
2082 id: idx,
2083 pattern: rule.pattern.clone(),
2084 rule_type: "request".to_string(),
2085 status_codes: Vec::new(),
2086 body_transforms: rule
2087 .body_transforms
2088 .iter()
2089 .map(|t| BodyTransformRequest {
2090 path: t.path.clone(),
2091 replace: t.replace.clone(),
2092 operation: format!("{:?}", t.operation).to_lowercase(),
2093 })
2094 .collect(),
2095 enabled: rule.enabled,
2096 });
2097 }
2098
2099 let request_count = config.request_replacements.len();
2101 for (idx, rule) in config.response_replacements.iter().enumerate() {
2102 rules.push(ProxyRuleResponse {
2103 id: request_count + idx,
2104 pattern: rule.pattern.clone(),
2105 rule_type: "response".to_string(),
2106 status_codes: rule.status_codes.clone(),
2107 body_transforms: rule
2108 .body_transforms
2109 .iter()
2110 .map(|t| BodyTransformRequest {
2111 path: t.path.clone(),
2112 replace: t.replace.clone(),
2113 operation: format!("{:?}", t.operation).to_lowercase(),
2114 })
2115 .collect(),
2116 enabled: rule.enabled,
2117 });
2118 }
2119
2120 Ok(Json(serde_json::json!({
2121 "rules": rules
2122 })))
2123}
2124
2125async fn create_proxy_rule(
2127 State(state): State<ManagementState>,
2128 Json(request): Json<ProxyRuleRequest>,
2129) -> Result<Json<serde_json::Value>, StatusCode> {
2130 let proxy_config = match &state.proxy_config {
2131 Some(config) => config,
2132 None => {
2133 return Ok(Json(serde_json::json!({
2134 "error": "Proxy not configured. Proxy config not available."
2135 })));
2136 }
2137 };
2138
2139 if request.body_transforms.is_empty() {
2141 return Ok(Json(serde_json::json!({
2142 "error": "At least one body transform is required"
2143 })));
2144 }
2145
2146 let body_transforms: Vec<BodyTransform> = request
2147 .body_transforms
2148 .iter()
2149 .map(|t| {
2150 let op = match t.operation.as_str() {
2151 "replace" => TransformOperation::Replace,
2152 "add" => TransformOperation::Add,
2153 "remove" => TransformOperation::Remove,
2154 _ => TransformOperation::Replace,
2155 };
2156 BodyTransform {
2157 path: t.path.clone(),
2158 replace: t.replace.clone(),
2159 operation: op,
2160 }
2161 })
2162 .collect();
2163
2164 let new_rule = BodyTransformRule {
2165 pattern: request.pattern.clone(),
2166 status_codes: request.status_codes.clone(),
2167 body_transforms,
2168 enabled: request.enabled,
2169 };
2170
2171 let mut config = proxy_config.write().await;
2172
2173 let rule_id = if request.rule_type == "request" {
2174 config.request_replacements.push(new_rule);
2175 config.request_replacements.len() - 1
2176 } else if request.rule_type == "response" {
2177 config.response_replacements.push(new_rule);
2178 config.request_replacements.len() + config.response_replacements.len() - 1
2179 } else {
2180 return Ok(Json(serde_json::json!({
2181 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2182 })));
2183 };
2184
2185 Ok(Json(serde_json::json!({
2186 "id": rule_id,
2187 "message": "Rule created successfully"
2188 })))
2189}
2190
2191async fn get_proxy_rule(
2193 State(state): State<ManagementState>,
2194 Path(id): Path<String>,
2195) -> Result<Json<serde_json::Value>, StatusCode> {
2196 let proxy_config = match &state.proxy_config {
2197 Some(config) => config,
2198 None => {
2199 return Ok(Json(serde_json::json!({
2200 "error": "Proxy not configured. Proxy config not available."
2201 })));
2202 }
2203 };
2204
2205 let config = proxy_config.read().await;
2206 let rule_id: usize = match id.parse() {
2207 Ok(id) => id,
2208 Err(_) => {
2209 return Ok(Json(serde_json::json!({
2210 "error": format!("Invalid rule ID: {}", id)
2211 })));
2212 }
2213 };
2214
2215 let request_count = config.request_replacements.len();
2216
2217 if rule_id < request_count {
2218 let rule = &config.request_replacements[rule_id];
2220 Ok(Json(serde_json::json!({
2221 "id": rule_id,
2222 "pattern": rule.pattern,
2223 "type": "request",
2224 "status_codes": [],
2225 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2226 "path": t.path,
2227 "replace": t.replace,
2228 "operation": format!("{:?}", t.operation).to_lowercase()
2229 })).collect::<Vec<_>>(),
2230 "enabled": rule.enabled
2231 })))
2232 } else if rule_id < request_count + config.response_replacements.len() {
2233 let response_idx = rule_id - request_count;
2235 let rule = &config.response_replacements[response_idx];
2236 Ok(Json(serde_json::json!({
2237 "id": rule_id,
2238 "pattern": rule.pattern,
2239 "type": "response",
2240 "status_codes": rule.status_codes,
2241 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2242 "path": t.path,
2243 "replace": t.replace,
2244 "operation": format!("{:?}", t.operation).to_lowercase()
2245 })).collect::<Vec<_>>(),
2246 "enabled": rule.enabled
2247 })))
2248 } else {
2249 Ok(Json(serde_json::json!({
2250 "error": format!("Rule ID {} not found", rule_id)
2251 })))
2252 }
2253}
2254
2255async fn update_proxy_rule(
2257 State(state): State<ManagementState>,
2258 Path(id): Path<String>,
2259 Json(request): Json<ProxyRuleRequest>,
2260) -> Result<Json<serde_json::Value>, StatusCode> {
2261 let proxy_config = match &state.proxy_config {
2262 Some(config) => config,
2263 None => {
2264 return Ok(Json(serde_json::json!({
2265 "error": "Proxy not configured. Proxy config not available."
2266 })));
2267 }
2268 };
2269
2270 let mut config = proxy_config.write().await;
2271 let rule_id: usize = match id.parse() {
2272 Ok(id) => id,
2273 Err(_) => {
2274 return Ok(Json(serde_json::json!({
2275 "error": format!("Invalid rule ID: {}", id)
2276 })));
2277 }
2278 };
2279
2280 let body_transforms: Vec<BodyTransform> = request
2281 .body_transforms
2282 .iter()
2283 .map(|t| {
2284 let op = match t.operation.as_str() {
2285 "replace" => TransformOperation::Replace,
2286 "add" => TransformOperation::Add,
2287 "remove" => TransformOperation::Remove,
2288 _ => TransformOperation::Replace,
2289 };
2290 BodyTransform {
2291 path: t.path.clone(),
2292 replace: t.replace.clone(),
2293 operation: op,
2294 }
2295 })
2296 .collect();
2297
2298 let updated_rule = BodyTransformRule {
2299 pattern: request.pattern.clone(),
2300 status_codes: request.status_codes.clone(),
2301 body_transforms,
2302 enabled: request.enabled,
2303 };
2304
2305 let request_count = config.request_replacements.len();
2306
2307 if rule_id < request_count {
2308 config.request_replacements[rule_id] = updated_rule;
2310 } else if rule_id < request_count + config.response_replacements.len() {
2311 let response_idx = rule_id - request_count;
2313 config.response_replacements[response_idx] = updated_rule;
2314 } else {
2315 return Ok(Json(serde_json::json!({
2316 "error": format!("Rule ID {} not found", rule_id)
2317 })));
2318 }
2319
2320 Ok(Json(serde_json::json!({
2321 "id": rule_id,
2322 "message": "Rule updated successfully"
2323 })))
2324}
2325
2326async fn delete_proxy_rule(
2328 State(state): State<ManagementState>,
2329 Path(id): Path<String>,
2330) -> Result<Json<serde_json::Value>, StatusCode> {
2331 let proxy_config = match &state.proxy_config {
2332 Some(config) => config,
2333 None => {
2334 return Ok(Json(serde_json::json!({
2335 "error": "Proxy not configured. Proxy config not available."
2336 })));
2337 }
2338 };
2339
2340 let mut config = proxy_config.write().await;
2341 let rule_id: usize = match id.parse() {
2342 Ok(id) => id,
2343 Err(_) => {
2344 return Ok(Json(serde_json::json!({
2345 "error": format!("Invalid rule ID: {}", id)
2346 })));
2347 }
2348 };
2349
2350 let request_count = config.request_replacements.len();
2351
2352 if rule_id < request_count {
2353 config.request_replacements.remove(rule_id);
2355 } else if rule_id < request_count + config.response_replacements.len() {
2356 let response_idx = rule_id - request_count;
2358 config.response_replacements.remove(response_idx);
2359 } else {
2360 return Ok(Json(serde_json::json!({
2361 "error": format!("Rule ID {} not found", rule_id)
2362 })));
2363 }
2364
2365 Ok(Json(serde_json::json!({
2366 "id": rule_id,
2367 "message": "Rule deleted successfully"
2368 })))
2369}
2370
2371async fn get_proxy_inspect(
2373 State(state): State<ManagementState>,
2374 Query(params): Query<std::collections::HashMap<String, String>>,
2375) -> Result<Json<serde_json::Value>, StatusCode> {
2376 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2377 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2378
2379 let proxy_config = match &state.proxy_config {
2380 Some(config) => config.read().await,
2381 None => {
2382 return Ok(Json(serde_json::json!({
2383 "error": "Proxy not configured. Proxy config not available."
2384 })));
2385 }
2386 };
2387
2388 let mut rules = Vec::new();
2389 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2390 rules.push(serde_json::json!({
2391 "id": idx,
2392 "kind": "request",
2393 "pattern": rule.pattern,
2394 "enabled": rule.enabled,
2395 "status_codes": rule.status_codes,
2396 "transform_count": rule.body_transforms.len(),
2397 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2398 "path": t.path,
2399 "operation": t.operation,
2400 "replace": t.replace
2401 })).collect::<Vec<_>>()
2402 }));
2403 }
2404 let request_rule_count = rules.len();
2405 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2406 rules.push(serde_json::json!({
2407 "id": request_rule_count + idx,
2408 "kind": "response",
2409 "pattern": rule.pattern,
2410 "enabled": rule.enabled,
2411 "status_codes": rule.status_codes,
2412 "transform_count": rule.body_transforms.len(),
2413 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2414 "path": t.path,
2415 "operation": t.operation,
2416 "replace": t.replace
2417 })).collect::<Vec<_>>()
2418 }));
2419 }
2420
2421 let total = rules.len();
2422 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2423
2424 Ok(Json(serde_json::json!({
2425 "enabled": proxy_config.enabled,
2426 "target_url": proxy_config.target_url,
2427 "prefix": proxy_config.prefix,
2428 "timeout_seconds": proxy_config.timeout_seconds,
2429 "follow_redirects": proxy_config.follow_redirects,
2430 "passthrough_by_default": proxy_config.passthrough_by_default,
2431 "rules": paged_rules,
2432 "request_rule_count": request_rule_count,
2433 "response_rule_count": total.saturating_sub(request_rule_count),
2434 "limit": limit,
2435 "offset": offset,
2436 "total": total
2437 })))
2438}
2439
2440pub fn management_router(state: ManagementState) -> Router {
2442 let router = Router::new()
2443 .route("/health", get(health_check))
2444 .route("/stats", get(get_stats))
2445 .route("/config", get(get_config))
2446 .route("/config/validate", post(validate_config))
2447 .route("/config/bulk", post(bulk_update_config))
2448 .route("/mocks", get(list_mocks))
2449 .route("/mocks", post(create_mock))
2450 .route("/mocks/{id}", get(get_mock))
2451 .route("/mocks/{id}", put(update_mock))
2452 .route("/mocks/{id}", delete(delete_mock))
2453 .route("/export", get(export_mocks))
2454 .route("/import", post(import_mocks))
2455 .route("/spec", get(get_openapi_spec));
2456
2457 #[cfg(feature = "smtp")]
2458 let router = router
2459 .route("/smtp/mailbox", get(list_smtp_emails))
2460 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2461 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2462 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2463 .route("/smtp/mailbox/search", get(search_smtp_emails));
2464
2465 #[cfg(not(feature = "smtp"))]
2466 let router = router;
2467
2468 #[cfg(feature = "mqtt")]
2470 let router = router
2471 .route("/mqtt/stats", get(get_mqtt_stats))
2472 .route("/mqtt/clients", get(get_mqtt_clients))
2473 .route("/mqtt/topics", get(get_mqtt_topics))
2474 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2475 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2476 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2477 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2478
2479 #[cfg(not(feature = "mqtt"))]
2480 let router = router
2481 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2482 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2483
2484 #[cfg(feature = "kafka")]
2485 let router = router
2486 .route("/kafka/stats", get(get_kafka_stats))
2487 .route("/kafka/topics", get(get_kafka_topics))
2488 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2489 .route("/kafka/groups", get(get_kafka_groups))
2490 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2491 .route("/kafka/produce", post(produce_kafka_message))
2492 .route("/kafka/produce/batch", post(produce_kafka_batch))
2493 .route("/kafka/messages/stream", get(kafka_messages_stream));
2494
2495 #[cfg(not(feature = "kafka"))]
2496 let router = router;
2497
2498 let router = router
2500 .route("/migration/routes", get(get_migration_routes))
2501 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2502 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2503 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2504 .route("/migration/groups/{group}", put(set_group_migration_mode))
2505 .route("/migration/groups", get(get_migration_groups))
2506 .route("/migration/status", get(get_migration_status));
2507
2508 let router = router
2510 .route("/proxy/rules", get(list_proxy_rules))
2511 .route("/proxy/rules", post(create_proxy_rule))
2512 .route("/proxy/rules/{id}", get(get_proxy_rule))
2513 .route("/proxy/rules/{id}", put(update_proxy_rule))
2514 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2515 .route("/proxy/inspect", get(get_proxy_inspect));
2516
2517 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2519
2520 let router = router.nest(
2522 "/snapshot-diff",
2523 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2524 );
2525
2526 #[cfg(feature = "behavioral-cloning")]
2527 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2528
2529 let router = router
2530 .route("/mockai/learn", post(learn_from_examples))
2531 .route("/mockai/rules/explanations", get(list_rule_explanations))
2532 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2533 .route("/chaos/config", get(get_chaos_config))
2534 .route("/chaos/config", post(update_chaos_config))
2535 .route("/network/profiles", get(list_network_profiles))
2536 .route("/network/profile/apply", post(apply_network_profile));
2537
2538 let router =
2540 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2541
2542 #[cfg(feature = "conformance")]
2544 let router = router.nest_service(
2545 "/conformance",
2546 crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2547 );
2548 #[cfg(not(feature = "conformance"))]
2549 let router = router;
2550
2551 router.with_state(state)
2552}
2553
2554#[cfg(feature = "kafka")]
2555#[derive(Debug, Clone, Serialize, Deserialize)]
2557pub struct KafkaBrokerStats {
2558 pub topics: usize,
2560 pub partitions: usize,
2562 pub consumer_groups: usize,
2564 pub messages_produced: u64,
2566 pub messages_consumed: u64,
2568}
2569
2570#[cfg(feature = "kafka")]
2571#[allow(missing_docs)]
2572#[derive(Debug, Clone, Serialize, Deserialize)]
2573pub struct KafkaTopicInfo {
2574 pub name: String,
2575 pub partitions: usize,
2576 pub replication_factor: i32,
2577}
2578
2579#[cfg(feature = "kafka")]
2580#[allow(missing_docs)]
2581#[derive(Debug, Clone, Serialize, Deserialize)]
2582pub struct KafkaConsumerGroupInfo {
2583 pub group_id: String,
2584 pub members: usize,
2585 pub state: String,
2586}
2587
2588#[cfg(feature = "kafka")]
2589async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2591 if let Some(broker) = &state.kafka_broker {
2592 let topics = broker.topics.read().await;
2593 let consumer_groups = broker.consumer_groups.read().await;
2594
2595 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2596
2597 let metrics_snapshot = broker.metrics().snapshot();
2599
2600 let stats = KafkaBrokerStats {
2601 topics: topics.len(),
2602 partitions: total_partitions,
2603 consumer_groups: consumer_groups.groups().len(),
2604 messages_produced: metrics_snapshot.messages_produced_total,
2605 messages_consumed: metrics_snapshot.messages_consumed_total,
2606 };
2607
2608 Json(stats).into_response()
2609 } else {
2610 (
2611 StatusCode::SERVICE_UNAVAILABLE,
2612 Json(serde_json::json!({
2613 "error": "Kafka broker not available",
2614 "message": "Kafka broker is not enabled or not available."
2615 })),
2616 )
2617 .into_response()
2618 }
2619}
2620
2621#[cfg(feature = "kafka")]
2622async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2624 if let Some(broker) = &state.kafka_broker {
2625 let topics = broker.topics.read().await;
2626 let topic_list: Vec<KafkaTopicInfo> = topics
2627 .iter()
2628 .map(|(name, topic)| KafkaTopicInfo {
2629 name: name.clone(),
2630 partitions: topic.partitions.len(),
2631 replication_factor: topic.config.replication_factor as i32,
2632 })
2633 .collect();
2634
2635 Json(serde_json::json!({
2636 "topics": topic_list
2637 }))
2638 .into_response()
2639 } else {
2640 (
2641 StatusCode::SERVICE_UNAVAILABLE,
2642 Json(serde_json::json!({
2643 "error": "Kafka broker not available",
2644 "message": "Kafka broker is not enabled or not available."
2645 })),
2646 )
2647 .into_response()
2648 }
2649}
2650
2651#[cfg(feature = "kafka")]
2652async fn get_kafka_topic(
2654 State(state): State<ManagementState>,
2655 Path(topic_name): Path<String>,
2656) -> impl IntoResponse {
2657 if let Some(broker) = &state.kafka_broker {
2658 let topics = broker.topics.read().await;
2659 if let Some(topic) = topics.get(&topic_name) {
2660 Json(serde_json::json!({
2661 "name": topic_name,
2662 "partitions": topic.partitions.len(),
2663 "replication_factor": topic.config.replication_factor,
2664 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2665 "id": idx as i32,
2666 "leader": 0,
2667 "replicas": vec![0],
2668 "message_count": partition.messages.len()
2669 })).collect::<Vec<_>>()
2670 })).into_response()
2671 } else {
2672 (
2673 StatusCode::NOT_FOUND,
2674 Json(serde_json::json!({
2675 "error": "Topic not found",
2676 "topic": topic_name
2677 })),
2678 )
2679 .into_response()
2680 }
2681 } else {
2682 (
2683 StatusCode::SERVICE_UNAVAILABLE,
2684 Json(serde_json::json!({
2685 "error": "Kafka broker not available",
2686 "message": "Kafka broker is not enabled or not available."
2687 })),
2688 )
2689 .into_response()
2690 }
2691}
2692
2693#[cfg(feature = "kafka")]
2694async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2696 if let Some(broker) = &state.kafka_broker {
2697 let consumer_groups = broker.consumer_groups.read().await;
2698 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2699 .groups()
2700 .iter()
2701 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2702 group_id: group_id.clone(),
2703 members: group.members.len(),
2704 state: "Stable".to_string(), })
2706 .collect();
2707
2708 Json(serde_json::json!({
2709 "groups": groups
2710 }))
2711 .into_response()
2712 } else {
2713 (
2714 StatusCode::SERVICE_UNAVAILABLE,
2715 Json(serde_json::json!({
2716 "error": "Kafka broker not available",
2717 "message": "Kafka broker is not enabled or not available."
2718 })),
2719 )
2720 .into_response()
2721 }
2722}
2723
2724#[cfg(feature = "kafka")]
2725async fn get_kafka_group(
2727 State(state): State<ManagementState>,
2728 Path(group_id): Path<String>,
2729) -> impl IntoResponse {
2730 if let Some(broker) = &state.kafka_broker {
2731 let consumer_groups = broker.consumer_groups.read().await;
2732 if let Some(group) = consumer_groups.groups().get(&group_id) {
2733 Json(serde_json::json!({
2734 "group_id": group_id,
2735 "members": group.members.len(),
2736 "state": "Stable",
2737 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2738 "member_id": member_id,
2739 "client_id": member.client_id,
2740 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2741 "topic": a.topic,
2742 "partitions": a.partitions
2743 })).collect::<Vec<_>>()
2744 })).collect::<Vec<_>>(),
2745 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2746 "topic": topic,
2747 "partition": partition,
2748 "offset": offset
2749 })).collect::<Vec<_>>()
2750 })).into_response()
2751 } else {
2752 (
2753 StatusCode::NOT_FOUND,
2754 Json(serde_json::json!({
2755 "error": "Consumer group not found",
2756 "group_id": group_id
2757 })),
2758 )
2759 .into_response()
2760 }
2761 } else {
2762 (
2763 StatusCode::SERVICE_UNAVAILABLE,
2764 Json(serde_json::json!({
2765 "error": "Kafka broker not available",
2766 "message": "Kafka broker is not enabled or not available."
2767 })),
2768 )
2769 .into_response()
2770 }
2771}
2772
2773#[cfg(feature = "kafka")]
2776#[derive(Debug, Deserialize)]
2778pub struct KafkaProduceRequest {
2779 pub topic: String,
2781 #[serde(default)]
2783 pub key: Option<String>,
2784 pub value: String,
2786 #[serde(default)]
2788 pub partition: Option<i32>,
2789 #[serde(default)]
2791 pub headers: Option<std::collections::HashMap<String, String>>,
2792}
2793
2794#[cfg(feature = "kafka")]
2795async fn produce_kafka_message(
2797 State(state): State<ManagementState>,
2798 Json(request): Json<KafkaProduceRequest>,
2799) -> impl IntoResponse {
2800 if let Some(broker) = &state.kafka_broker {
2801 let mut topics = broker.topics.write().await;
2802
2803 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2805 mockforge_kafka::topics::Topic::new(
2806 request.topic.clone(),
2807 mockforge_kafka::topics::TopicConfig::default(),
2808 )
2809 });
2810
2811 let partition_id = if let Some(partition) = request.partition {
2813 partition
2814 } else {
2815 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2816 };
2817
2818 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2820 return (
2821 StatusCode::BAD_REQUEST,
2822 Json(serde_json::json!({
2823 "error": "Invalid partition",
2824 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2825 })),
2826 )
2827 .into_response();
2828 }
2829
2830 let key_clone = request.key.clone();
2832 let headers_clone = request.headers.clone();
2833 let message = mockforge_kafka::partitions::KafkaMessage {
2834 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2836 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2837 value: request.value.as_bytes().to_vec(),
2838 headers: headers_clone
2839 .clone()
2840 .unwrap_or_default()
2841 .into_iter()
2842 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2843 .collect(),
2844 };
2845
2846 match topic_entry.produce(partition_id, message).await {
2848 Ok(offset) => {
2849 if let Some(broker) = &state.kafka_broker {
2851 broker.metrics().record_messages_produced(1);
2852 }
2853
2854 #[cfg(feature = "kafka")]
2856 {
2857 let event = MessageEvent::Kafka(KafkaMessageEvent {
2858 topic: request.topic.clone(),
2859 key: key_clone,
2860 value: request.value.clone(),
2861 partition: partition_id,
2862 offset,
2863 headers: headers_clone,
2864 timestamp: chrono::Utc::now().to_rfc3339(),
2865 });
2866 let _ = state.message_events.send(event);
2867 }
2868
2869 Json(serde_json::json!({
2870 "success": true,
2871 "message": format!("Message produced to topic '{}'", request.topic),
2872 "topic": request.topic,
2873 "partition": partition_id,
2874 "offset": offset
2875 }))
2876 .into_response()
2877 }
2878 Err(e) => (
2879 StatusCode::INTERNAL_SERVER_ERROR,
2880 Json(serde_json::json!({
2881 "error": "Failed to produce message",
2882 "message": e.to_string()
2883 })),
2884 )
2885 .into_response(),
2886 }
2887 } else {
2888 (
2889 StatusCode::SERVICE_UNAVAILABLE,
2890 Json(serde_json::json!({
2891 "error": "Kafka broker not available",
2892 "message": "Kafka broker is not enabled or not available."
2893 })),
2894 )
2895 .into_response()
2896 }
2897}
2898
2899#[cfg(feature = "kafka")]
2900#[derive(Debug, Deserialize)]
2902pub struct KafkaBatchProduceRequest {
2903 pub messages: Vec<KafkaProduceRequest>,
2905 #[serde(default = "default_delay")]
2907 pub delay_ms: u64,
2908}
2909
2910#[cfg(feature = "kafka")]
2911async fn produce_kafka_batch(
2913 State(state): State<ManagementState>,
2914 Json(request): Json<KafkaBatchProduceRequest>,
2915) -> impl IntoResponse {
2916 if let Some(broker) = &state.kafka_broker {
2917 if request.messages.is_empty() {
2918 return (
2919 StatusCode::BAD_REQUEST,
2920 Json(serde_json::json!({
2921 "error": "Empty batch",
2922 "message": "At least one message is required"
2923 })),
2924 )
2925 .into_response();
2926 }
2927
2928 let mut results = Vec::new();
2929
2930 for (index, msg_request) in request.messages.iter().enumerate() {
2931 let mut topics = broker.topics.write().await;
2932
2933 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2935 mockforge_kafka::topics::Topic::new(
2936 msg_request.topic.clone(),
2937 mockforge_kafka::topics::TopicConfig::default(),
2938 )
2939 });
2940
2941 let partition_id = if let Some(partition) = msg_request.partition {
2943 partition
2944 } else {
2945 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2946 };
2947
2948 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2950 results.push(serde_json::json!({
2951 "index": index,
2952 "success": false,
2953 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2954 }));
2955 continue;
2956 }
2957
2958 let message = mockforge_kafka::partitions::KafkaMessage {
2960 offset: 0,
2961 timestamp: chrono::Utc::now().timestamp_millis(),
2962 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2963 value: msg_request.value.as_bytes().to_vec(),
2964 headers: msg_request
2965 .headers
2966 .clone()
2967 .unwrap_or_default()
2968 .into_iter()
2969 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2970 .collect(),
2971 };
2972
2973 match topic_entry.produce(partition_id, message).await {
2975 Ok(offset) => {
2976 if let Some(broker) = &state.kafka_broker {
2978 broker.metrics().record_messages_produced(1);
2979 }
2980
2981 let event = MessageEvent::Kafka(KafkaMessageEvent {
2983 topic: msg_request.topic.clone(),
2984 key: msg_request.key.clone(),
2985 value: msg_request.value.clone(),
2986 partition: partition_id,
2987 offset,
2988 headers: msg_request.headers.clone(),
2989 timestamp: chrono::Utc::now().to_rfc3339(),
2990 });
2991 let _ = state.message_events.send(event);
2992
2993 results.push(serde_json::json!({
2994 "index": index,
2995 "success": true,
2996 "topic": msg_request.topic,
2997 "partition": partition_id,
2998 "offset": offset
2999 }));
3000 }
3001 Err(e) => {
3002 results.push(serde_json::json!({
3003 "index": index,
3004 "success": false,
3005 "error": e.to_string()
3006 }));
3007 }
3008 }
3009
3010 if index < request.messages.len() - 1 && request.delay_ms > 0 {
3012 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
3013 }
3014 }
3015
3016 let success_count =
3017 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
3018
3019 Json(serde_json::json!({
3020 "success": true,
3021 "total": request.messages.len(),
3022 "succeeded": success_count,
3023 "failed": request.messages.len() - success_count,
3024 "results": results
3025 }))
3026 .into_response()
3027 } else {
3028 (
3029 StatusCode::SERVICE_UNAVAILABLE,
3030 Json(serde_json::json!({
3031 "error": "Kafka broker not available",
3032 "message": "Kafka broker is not enabled or not available."
3033 })),
3034 )
3035 .into_response()
3036 }
3037}
3038
3039#[cfg(feature = "mqtt")]
3042async fn mqtt_messages_stream(
3044 State(state): State<ManagementState>,
3045 Query(params): Query<std::collections::HashMap<String, String>>,
3046) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3047 let rx = state.message_events.subscribe();
3048 let topic_filter = params.get("topic").cloned();
3049
3050 let stream = stream::unfold(rx, move |mut rx| {
3051 let topic_filter = topic_filter.clone();
3052
3053 async move {
3054 loop {
3055 match rx.recv().await {
3056 Ok(MessageEvent::Mqtt(event)) => {
3057 if let Some(filter) = &topic_filter {
3059 if !event.topic.contains(filter) {
3060 continue;
3061 }
3062 }
3063
3064 let event_json = serde_json::json!({
3065 "protocol": "mqtt",
3066 "topic": event.topic,
3067 "payload": event.payload,
3068 "qos": event.qos,
3069 "retain": event.retain,
3070 "timestamp": event.timestamp,
3071 });
3072
3073 if let Ok(event_data) = serde_json::to_string(&event_json) {
3074 let sse_event = Event::default().event("mqtt_message").data(event_data);
3075 return Some((Ok(sse_event), rx));
3076 }
3077 }
3078 #[cfg(feature = "kafka")]
3079 Ok(MessageEvent::Kafka(_)) => {
3080 continue;
3082 }
3083 Err(broadcast::error::RecvError::Closed) => {
3084 return None;
3085 }
3086 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3087 warn!("MQTT message stream lagged, skipped {} messages", skipped);
3088 continue;
3089 }
3090 }
3091 }
3092 }
3093 });
3094
3095 Sse::new(stream).keep_alive(
3096 axum::response::sse::KeepAlive::new()
3097 .interval(std::time::Duration::from_secs(15))
3098 .text("keep-alive-text"),
3099 )
3100}
3101
3102#[cfg(feature = "kafka")]
3103async fn kafka_messages_stream(
3105 State(state): State<ManagementState>,
3106 Query(params): Query<std::collections::HashMap<String, String>>,
3107) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3108 let rx = state.message_events.subscribe();
3109 let topic_filter = params.get("topic").cloned();
3110
3111 let stream = stream::unfold(rx, move |mut rx| {
3112 let topic_filter = topic_filter.clone();
3113
3114 async move {
3115 loop {
3116 match rx.recv().await {
3117 #[cfg(feature = "mqtt")]
3118 Ok(MessageEvent::Mqtt(_)) => {
3119 continue;
3121 }
3122 Ok(MessageEvent::Kafka(event)) => {
3123 if let Some(filter) = &topic_filter {
3125 if !event.topic.contains(filter) {
3126 continue;
3127 }
3128 }
3129
3130 let event_json = serde_json::json!({
3131 "protocol": "kafka",
3132 "topic": event.topic,
3133 "key": event.key,
3134 "value": event.value,
3135 "partition": event.partition,
3136 "offset": event.offset,
3137 "headers": event.headers,
3138 "timestamp": event.timestamp,
3139 });
3140
3141 if let Ok(event_data) = serde_json::to_string(&event_json) {
3142 let sse_event =
3143 Event::default().event("kafka_message").data(event_data);
3144 return Some((Ok(sse_event), rx));
3145 }
3146 }
3147 Err(broadcast::error::RecvError::Closed) => {
3148 return None;
3149 }
3150 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3151 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3152 continue;
3153 }
3154 }
3155 }
3156 }
3157 });
3158
3159 Sse::new(stream).keep_alive(
3160 axum::response::sse::KeepAlive::new()
3161 .interval(std::time::Duration::from_secs(15))
3162 .text("keep-alive-text"),
3163 )
3164}
3165
3166#[derive(Debug, Deserialize)]
3170pub struct GenerateSpecRequest {
3171 pub query: String,
3173 pub spec_type: String,
3175 pub api_version: Option<String>,
3177}
3178
3179#[derive(Debug, Deserialize)]
3181pub struct GenerateOpenApiFromTrafficRequest {
3182 #[serde(default)]
3184 pub database_path: Option<String>,
3185 #[serde(default)]
3187 pub since: Option<String>,
3188 #[serde(default)]
3190 pub until: Option<String>,
3191 #[serde(default)]
3193 pub path_pattern: Option<String>,
3194 #[serde(default = "default_min_confidence")]
3196 pub min_confidence: f64,
3197}
3198
3199fn default_min_confidence() -> f64 {
3200 0.7
3201}
3202
3203#[cfg(feature = "data-faker")]
3205async fn generate_ai_spec(
3206 State(_state): State<ManagementState>,
3207 Json(request): Json<GenerateSpecRequest>,
3208) -> impl IntoResponse {
3209 use mockforge_data::rag::{
3210 config::{LlmProvider, RagConfig},
3211 engine::RagEngine,
3212 storage::DocumentStorage,
3213 };
3214 use std::sync::Arc;
3215
3216 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3218 .ok()
3219 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3220
3221 if api_key.is_none() {
3223 return (
3224 StatusCode::SERVICE_UNAVAILABLE,
3225 Json(serde_json::json!({
3226 "error": "AI service not configured",
3227 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3228 })),
3229 )
3230 .into_response();
3231 }
3232
3233 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3235 .unwrap_or_else(|_| "openai".to_string())
3236 .to_lowercase();
3237
3238 let provider = match provider_str.as_str() {
3239 "openai" => LlmProvider::OpenAI,
3240 "anthropic" => LlmProvider::Anthropic,
3241 "ollama" => LlmProvider::Ollama,
3242 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3243 _ => LlmProvider::OpenAI,
3244 };
3245
3246 let api_endpoint =
3247 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3248 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3249 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3250 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3251 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3252 });
3253
3254 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3255 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3256 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3257 LlmProvider::Ollama => "llama2".to_string(),
3258 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3259 });
3260
3261 let rag_config = RagConfig {
3263 provider,
3264 api_endpoint,
3265 api_key,
3266 model,
3267 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3268 .unwrap_or_else(|_| "4096".to_string())
3269 .parse()
3270 .unwrap_or(4096),
3271 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3272 .unwrap_or_else(|_| "0.3".to_string())
3273 .parse()
3274 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3276 .unwrap_or_else(|_| "60".to_string())
3277 .parse()
3278 .unwrap_or(60),
3279 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3280 .unwrap_or_else(|_| "4000".to_string())
3281 .parse()
3282 .unwrap_or(4000),
3283 ..Default::default()
3284 };
3285
3286 let spec_type_label = match request.spec_type.as_str() {
3288 "openapi" => "OpenAPI 3.0",
3289 "graphql" => "GraphQL",
3290 "asyncapi" => "AsyncAPI",
3291 _ => "OpenAPI 3.0",
3292 };
3293
3294 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3295
3296 let prompt = format!(
3297 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3298
3299User Requirements:
3300{}
3301
3302Instructions:
33031. Generate a complete, valid {} specification
33042. Include all paths, operations, request/response schemas, and components
33053. Use realistic field names and data types
33064. Include proper descriptions and examples
33075. Follow {} best practices
33086. Return ONLY the specification, no additional explanation
33097. For OpenAPI, use version {}
3310
3311Return the specification in {} format."#,
3312 spec_type_label,
3313 request.query,
3314 spec_type_label,
3315 spec_type_label,
3316 api_version,
3317 if request.spec_type == "graphql" {
3318 "GraphQL SDL"
3319 } else {
3320 "YAML"
3321 }
3322 );
3323
3324 use mockforge_data::rag::storage::InMemoryStorage;
3329 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3330
3331 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3333 Ok(engine) => engine,
3334 Err(e) => {
3335 return (
3336 StatusCode::INTERNAL_SERVER_ERROR,
3337 Json(serde_json::json!({
3338 "error": "Failed to initialize RAG engine",
3339 "message": e.to_string()
3340 })),
3341 )
3342 .into_response();
3343 }
3344 };
3345
3346 match rag_engine.generate(&prompt, None).await {
3348 Ok(generated_text) => {
3349 let spec = if request.spec_type == "graphql" {
3351 extract_graphql_schema(&generated_text)
3353 } else {
3354 extract_yaml_spec(&generated_text)
3356 };
3357
3358 Json(serde_json::json!({
3359 "success": true,
3360 "spec": spec,
3361 "spec_type": request.spec_type,
3362 }))
3363 .into_response()
3364 }
3365 Err(e) => (
3366 StatusCode::INTERNAL_SERVER_ERROR,
3367 Json(serde_json::json!({
3368 "error": "AI generation failed",
3369 "message": e.to_string()
3370 })),
3371 )
3372 .into_response(),
3373 }
3374}
3375
3376#[cfg(not(feature = "data-faker"))]
3377async fn generate_ai_spec(
3378 State(_state): State<ManagementState>,
3379 Json(_request): Json<GenerateSpecRequest>,
3380) -> impl IntoResponse {
3381 (
3382 StatusCode::NOT_IMPLEMENTED,
3383 Json(serde_json::json!({
3384 "error": "AI features not enabled",
3385 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3386 })),
3387 )
3388 .into_response()
3389}
3390
3391#[cfg(feature = "behavioral-cloning")]
3393async fn generate_openapi_from_traffic(
3394 State(_state): State<ManagementState>,
3395 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3396) -> impl IntoResponse {
3397 use chrono::{DateTime, Utc};
3398 use mockforge_core::intelligent_behavior::{
3399 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3400 IntelligentBehaviorConfig,
3401 };
3402 use mockforge_recorder::{
3403 database::RecorderDatabase,
3404 openapi_export::{QueryFilters, RecordingsToOpenApi},
3405 };
3406 use std::path::PathBuf;
3407
3408 let db_path = if let Some(ref path) = request.database_path {
3410 PathBuf::from(path)
3411 } else {
3412 std::env::current_dir()
3413 .unwrap_or_else(|_| PathBuf::from("."))
3414 .join("recordings.db")
3415 };
3416
3417 let db = match RecorderDatabase::new(&db_path).await {
3419 Ok(db) => db,
3420 Err(e) => {
3421 return (
3422 StatusCode::BAD_REQUEST,
3423 Json(serde_json::json!({
3424 "error": "Database error",
3425 "message": format!("Failed to open recorder database: {}", e)
3426 })),
3427 )
3428 .into_response();
3429 }
3430 };
3431
3432 let since_dt = if let Some(ref since_str) = request.since {
3434 match DateTime::parse_from_rfc3339(since_str) {
3435 Ok(dt) => Some(dt.with_timezone(&Utc)),
3436 Err(e) => {
3437 return (
3438 StatusCode::BAD_REQUEST,
3439 Json(serde_json::json!({
3440 "error": "Invalid date format",
3441 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3442 })),
3443 )
3444 .into_response();
3445 }
3446 }
3447 } else {
3448 None
3449 };
3450
3451 let until_dt = if let Some(ref until_str) = request.until {
3452 match DateTime::parse_from_rfc3339(until_str) {
3453 Ok(dt) => Some(dt.with_timezone(&Utc)),
3454 Err(e) => {
3455 return (
3456 StatusCode::BAD_REQUEST,
3457 Json(serde_json::json!({
3458 "error": "Invalid date format",
3459 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3460 })),
3461 )
3462 .into_response();
3463 }
3464 }
3465 } else {
3466 None
3467 };
3468
3469 let query_filters = QueryFilters {
3471 since: since_dt,
3472 until: until_dt,
3473 path_pattern: request.path_pattern.clone(),
3474 min_status_code: None,
3475 max_requests: Some(1000),
3476 };
3477
3478 let exchanges_from_recorder =
3483 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3484 Ok(exchanges) => exchanges,
3485 Err(e) => {
3486 return (
3487 StatusCode::INTERNAL_SERVER_ERROR,
3488 Json(serde_json::json!({
3489 "error": "Query error",
3490 "message": format!("Failed to query HTTP exchanges: {}", e)
3491 })),
3492 )
3493 .into_response();
3494 }
3495 };
3496
3497 if exchanges_from_recorder.is_empty() {
3498 return (
3499 StatusCode::NOT_FOUND,
3500 Json(serde_json::json!({
3501 "error": "No exchanges found",
3502 "message": "No HTTP exchanges found matching the specified filters"
3503 })),
3504 )
3505 .into_response();
3506 }
3507
3508 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3510 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3511 .into_iter()
3512 .map(|e| LocalHttpExchange {
3513 method: e.method,
3514 path: e.path,
3515 query_params: e.query_params,
3516 headers: e.headers,
3517 body: e.body,
3518 body_encoding: e.body_encoding,
3519 status_code: e.status_code,
3520 response_headers: e.response_headers,
3521 response_body: e.response_body,
3522 response_body_encoding: e.response_body_encoding,
3523 timestamp: e.timestamp,
3524 })
3525 .collect();
3526
3527 let behavior_config = IntelligentBehaviorConfig::default();
3529 let gen_config = OpenApiGenerationConfig {
3530 min_confidence: request.min_confidence,
3531 behavior_model: Some(behavior_config.behavior_model),
3532 };
3533
3534 let generator = OpenApiSpecGenerator::new(gen_config);
3536 let result = match generator.generate_from_exchanges(exchanges).await {
3537 Ok(result) => result,
3538 Err(e) => {
3539 return (
3540 StatusCode::INTERNAL_SERVER_ERROR,
3541 Json(serde_json::json!({
3542 "error": "Generation error",
3543 "message": format!("Failed to generate OpenAPI spec: {}", e)
3544 })),
3545 )
3546 .into_response();
3547 }
3548 };
3549
3550 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3552 raw.clone()
3553 } else {
3554 match serde_json::to_value(&result.spec.spec) {
3555 Ok(json) => json,
3556 Err(e) => {
3557 return (
3558 StatusCode::INTERNAL_SERVER_ERROR,
3559 Json(serde_json::json!({
3560 "error": "Serialization error",
3561 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3562 })),
3563 )
3564 .into_response();
3565 }
3566 }
3567 };
3568
3569 let response = serde_json::json!({
3571 "spec": spec_json,
3572 "metadata": {
3573 "requests_analyzed": result.metadata.requests_analyzed,
3574 "paths_inferred": result.metadata.paths_inferred,
3575 "path_confidence": result.metadata.path_confidence,
3576 "generated_at": result.metadata.generated_at.to_rfc3339(),
3577 "duration_ms": result.metadata.duration_ms,
3578 }
3579 });
3580
3581 Json(response).into_response()
3582}
3583
3584async fn list_rule_explanations(
3586 State(state): State<ManagementState>,
3587 Query(params): Query<std::collections::HashMap<String, String>>,
3588) -> impl IntoResponse {
3589 use mockforge_core::intelligent_behavior::RuleType;
3590
3591 let explanations = state.rule_explanations.read().await;
3592 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3593
3594 if let Some(rule_type_str) = params.get("rule_type") {
3596 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3597 explanations_vec.retain(|e| e.rule_type == rule_type);
3598 }
3599 }
3600
3601 if let Some(min_confidence_str) = params.get("min_confidence") {
3603 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3604 explanations_vec.retain(|e| e.confidence >= min_confidence);
3605 }
3606 }
3607
3608 explanations_vec.sort_by(|a, b| {
3610 b.confidence
3611 .partial_cmp(&a.confidence)
3612 .unwrap_or(std::cmp::Ordering::Equal)
3613 .then_with(|| b.generated_at.cmp(&a.generated_at))
3614 });
3615
3616 Json(serde_json::json!({
3617 "explanations": explanations_vec,
3618 "total": explanations_vec.len(),
3619 }))
3620 .into_response()
3621}
3622
3623async fn get_rule_explanation(
3625 State(state): State<ManagementState>,
3626 Path(rule_id): Path<String>,
3627) -> impl IntoResponse {
3628 let explanations = state.rule_explanations.read().await;
3629
3630 match explanations.get(&rule_id) {
3631 Some(explanation) => Json(serde_json::json!({
3632 "explanation": explanation,
3633 }))
3634 .into_response(),
3635 None => (
3636 StatusCode::NOT_FOUND,
3637 Json(serde_json::json!({
3638 "error": "Rule explanation not found",
3639 "message": format!("No explanation found for rule ID: {}", rule_id)
3640 })),
3641 )
3642 .into_response(),
3643 }
3644}
3645
3646#[derive(Debug, Deserialize)]
3648pub struct LearnFromExamplesRequest {
3649 pub examples: Vec<ExamplePairRequest>,
3651 #[serde(default)]
3653 pub config: Option<serde_json::Value>,
3654}
3655
3656#[derive(Debug, Deserialize)]
3658pub struct ExamplePairRequest {
3659 pub request: serde_json::Value,
3661 pub response: serde_json::Value,
3663}
3664
3665async fn learn_from_examples(
3670 State(state): State<ManagementState>,
3671 Json(request): Json<LearnFromExamplesRequest>,
3672) -> impl IntoResponse {
3673 use mockforge_core::intelligent_behavior::{
3674 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3675 rule_generator::{ExamplePair, RuleGenerator},
3676 };
3677
3678 if request.examples.is_empty() {
3679 return (
3680 StatusCode::BAD_REQUEST,
3681 Json(serde_json::json!({
3682 "error": "No examples provided",
3683 "message": "At least one example pair is required"
3684 })),
3685 )
3686 .into_response();
3687 }
3688
3689 let example_pairs: Result<Vec<ExamplePair>, String> = request
3691 .examples
3692 .into_iter()
3693 .enumerate()
3694 .map(|(idx, ex)| {
3695 let method = ex
3697 .request
3698 .get("method")
3699 .and_then(|v| v.as_str())
3700 .map(|s| s.to_string())
3701 .unwrap_or_else(|| "GET".to_string());
3702 let path = ex
3703 .request
3704 .get("path")
3705 .and_then(|v| v.as_str())
3706 .map(|s| s.to_string())
3707 .unwrap_or_else(|| "/".to_string());
3708 let request_body = ex.request.get("body").cloned();
3709 let query_params = ex
3710 .request
3711 .get("query_params")
3712 .and_then(|v| v.as_object())
3713 .map(|obj| {
3714 obj.iter()
3715 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3716 .collect()
3717 })
3718 .unwrap_or_default();
3719 let headers = ex
3720 .request
3721 .get("headers")
3722 .and_then(|v| v.as_object())
3723 .map(|obj| {
3724 obj.iter()
3725 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3726 .collect()
3727 })
3728 .unwrap_or_default();
3729
3730 let status = ex
3732 .response
3733 .get("status_code")
3734 .or_else(|| ex.response.get("status"))
3735 .and_then(|v| v.as_u64())
3736 .map(|n| n as u16)
3737 .unwrap_or(200);
3738 let response_body = ex.response.get("body").cloned();
3739
3740 Ok(ExamplePair {
3741 method,
3742 path,
3743 request: request_body,
3744 status,
3745 response: response_body,
3746 query_params,
3747 headers,
3748 metadata: {
3749 let mut meta = std::collections::HashMap::new();
3750 meta.insert("source".to_string(), "api".to_string());
3751 meta.insert("example_index".to_string(), idx.to_string());
3752 meta
3753 },
3754 })
3755 })
3756 .collect();
3757
3758 let example_pairs = match example_pairs {
3759 Ok(pairs) => pairs,
3760 Err(e) => {
3761 return (
3762 StatusCode::BAD_REQUEST,
3763 Json(serde_json::json!({
3764 "error": "Invalid examples",
3765 "message": e
3766 })),
3767 )
3768 .into_response();
3769 }
3770 };
3771
3772 let behavior_config = if let Some(config_json) = request.config {
3774 serde_json::from_value(config_json)
3776 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3777 .behavior_model
3778 } else {
3779 BehaviorModelConfig::default()
3780 };
3781
3782 let generator = RuleGenerator::new(behavior_config);
3784
3785 let (rules, explanations) =
3787 match generator.generate_rules_with_explanations(example_pairs).await {
3788 Ok(result) => result,
3789 Err(e) => {
3790 return (
3791 StatusCode::INTERNAL_SERVER_ERROR,
3792 Json(serde_json::json!({
3793 "error": "Rule generation failed",
3794 "message": format!("Failed to generate rules: {}", e)
3795 })),
3796 )
3797 .into_response();
3798 }
3799 };
3800
3801 {
3803 let mut stored_explanations = state.rule_explanations.write().await;
3804 for explanation in &explanations {
3805 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3806 }
3807 }
3808
3809 let response = serde_json::json!({
3811 "success": true,
3812 "rules_generated": {
3813 "consistency_rules": rules.consistency_rules.len(),
3814 "schemas": rules.schemas.len(),
3815 "state_machines": rules.state_transitions.len(),
3816 "system_prompt": !rules.system_prompt.is_empty(),
3817 },
3818 "explanations": explanations.iter().map(|e| serde_json::json!({
3819 "rule_id": e.rule_id,
3820 "rule_type": e.rule_type,
3821 "confidence": e.confidence,
3822 "reasoning": e.reasoning,
3823 })).collect::<Vec<_>>(),
3824 "total_explanations": explanations.len(),
3825 });
3826
3827 Json(response).into_response()
3828}
3829
3830#[cfg(feature = "data-faker")]
3831fn extract_yaml_spec(text: &str) -> String {
3832 if let Some(start) = text.find("```yaml") {
3834 let yaml_start = text[start + 7..].trim_start();
3835 if let Some(end) = yaml_start.find("```") {
3836 return yaml_start[..end].trim().to_string();
3837 }
3838 }
3839 if let Some(start) = text.find("```") {
3840 let content_start = text[start + 3..].trim_start();
3841 if let Some(end) = content_start.find("```") {
3842 return content_start[..end].trim().to_string();
3843 }
3844 }
3845
3846 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3848 return text.trim().to_string();
3849 }
3850
3851 text.trim().to_string()
3853}
3854
3855#[cfg(feature = "data-faker")]
3857fn extract_graphql_schema(text: &str) -> String {
3858 if let Some(start) = text.find("```graphql") {
3860 let schema_start = text[start + 10..].trim_start();
3861 if let Some(end) = schema_start.find("```") {
3862 return schema_start[..end].trim().to_string();
3863 }
3864 }
3865 if let Some(start) = text.find("```") {
3866 let content_start = text[start + 3..].trim_start();
3867 if let Some(end) = content_start.find("```") {
3868 return content_start[..end].trim().to_string();
3869 }
3870 }
3871
3872 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3874 return text.trim().to_string();
3875 }
3876
3877 text.trim().to_string()
3878}
3879
3880async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3884 #[cfg(feature = "chaos")]
3885 {
3886 if let Some(chaos_state) = &_state.chaos_api_state {
3887 let config = chaos_state.config.read().await;
3888 Json(serde_json::json!({
3890 "enabled": config.enabled,
3891 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3892 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3893 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3894 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3895 }))
3896 .into_response()
3897 } else {
3898 Json(serde_json::json!({
3900 "enabled": false,
3901 "latency": null,
3902 "fault_injection": null,
3903 "rate_limit": null,
3904 "traffic_shaping": null,
3905 }))
3906 .into_response()
3907 }
3908 }
3909 #[cfg(not(feature = "chaos"))]
3910 {
3911 Json(serde_json::json!({
3913 "enabled": false,
3914 "latency": null,
3915 "fault_injection": null,
3916 "rate_limit": null,
3917 "traffic_shaping": null,
3918 }))
3919 .into_response()
3920 }
3921}
3922
3923#[derive(Debug, Deserialize)]
3925pub struct ChaosConfigUpdate {
3926 pub enabled: Option<bool>,
3928 pub latency: Option<serde_json::Value>,
3930 pub fault_injection: Option<serde_json::Value>,
3932 pub rate_limit: Option<serde_json::Value>,
3934 pub traffic_shaping: Option<serde_json::Value>,
3936}
3937
3938async fn update_chaos_config(
3940 State(_state): State<ManagementState>,
3941 Json(_config_update): Json<ChaosConfigUpdate>,
3942) -> impl IntoResponse {
3943 #[cfg(feature = "chaos")]
3944 {
3945 if let Some(chaos_state) = &_state.chaos_api_state {
3946 use mockforge_chaos::config::{
3947 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3948 };
3949
3950 let mut config = chaos_state.config.write().await;
3951
3952 if let Some(enabled) = _config_update.enabled {
3954 config.enabled = enabled;
3955 }
3956
3957 if let Some(latency_json) = _config_update.latency {
3959 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3960 config.latency = Some(latency);
3961 }
3962 }
3963
3964 if let Some(fault_json) = _config_update.fault_injection {
3966 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3967 config.fault_injection = Some(fault);
3968 }
3969 }
3970
3971 if let Some(rate_json) = _config_update.rate_limit {
3973 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3974 config.rate_limit = Some(rate);
3975 }
3976 }
3977
3978 if let Some(traffic_json) = _config_update.traffic_shaping {
3980 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3981 config.traffic_shaping = Some(traffic);
3982 }
3983 }
3984
3985 drop(config);
3988
3989 info!("Chaos configuration updated successfully");
3990 Json(serde_json::json!({
3991 "success": true,
3992 "message": "Chaos configuration updated and applied"
3993 }))
3994 .into_response()
3995 } else {
3996 (
3997 StatusCode::SERVICE_UNAVAILABLE,
3998 Json(serde_json::json!({
3999 "success": false,
4000 "error": "Chaos API not available",
4001 "message": "Chaos engineering is not enabled or configured"
4002 })),
4003 )
4004 .into_response()
4005 }
4006 }
4007 #[cfg(not(feature = "chaos"))]
4008 {
4009 (
4010 StatusCode::NOT_IMPLEMENTED,
4011 Json(serde_json::json!({
4012 "success": false,
4013 "error": "Chaos feature not enabled",
4014 "message": "Chaos engineering feature is not compiled into this build"
4015 })),
4016 )
4017 .into_response()
4018 }
4019}
4020
4021async fn list_network_profiles() -> impl IntoResponse {
4025 use mockforge_core::network_profiles::NetworkProfileCatalog;
4026
4027 let catalog = NetworkProfileCatalog::default();
4028 let profiles: Vec<serde_json::Value> = catalog
4029 .list_profiles_with_description()
4030 .iter()
4031 .map(|(name, description)| {
4032 serde_json::json!({
4033 "name": name,
4034 "description": description,
4035 })
4036 })
4037 .collect();
4038
4039 Json(serde_json::json!({
4040 "profiles": profiles
4041 }))
4042 .into_response()
4043}
4044
4045#[derive(Debug, Deserialize)]
4046pub struct ApplyNetworkProfileRequest {
4048 pub profile_name: String,
4050}
4051
4052async fn apply_network_profile(
4054 State(state): State<ManagementState>,
4055 Json(request): Json<ApplyNetworkProfileRequest>,
4056) -> impl IntoResponse {
4057 use mockforge_core::network_profiles::NetworkProfileCatalog;
4058
4059 let catalog = NetworkProfileCatalog::default();
4060 if let Some(profile) = catalog.get(&request.profile_name) {
4061 if let Some(server_config) = &state.server_config {
4064 let mut config = server_config.write().await;
4065
4066 use mockforge_core::config::NetworkShapingConfig;
4068
4069 let network_shaping = NetworkShapingConfig {
4073 enabled: profile.traffic_shaping.bandwidth.enabled
4074 || profile.traffic_shaping.burst_loss.enabled,
4075 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4077 max_connections: 1000, };
4079
4080 if let Some(ref mut chaos) = config.observability.chaos {
4083 chaos.traffic_shaping = Some(network_shaping);
4084 } else {
4085 use mockforge_core::config::ChaosEngConfig;
4087 config.observability.chaos = Some(ChaosEngConfig {
4088 enabled: true,
4089 latency: None,
4090 fault_injection: None,
4091 rate_limit: None,
4092 traffic_shaping: Some(network_shaping),
4093 scenario: None,
4094 });
4095 }
4096
4097 info!("Network profile '{}' applied to server configuration", request.profile_name);
4098 } else {
4099 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4100 }
4101
4102 #[cfg(feature = "chaos")]
4104 {
4105 if let Some(chaos_state) = &state.chaos_api_state {
4106 use mockforge_chaos::config::TrafficShapingConfig;
4107
4108 let mut chaos_config = chaos_state.config.write().await;
4109 let chaos_traffic_shaping = TrafficShapingConfig {
4111 enabled: profile.traffic_shaping.bandwidth.enabled
4112 || profile.traffic_shaping.burst_loss.enabled,
4113 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4115 max_connections: 0,
4116 connection_timeout_ms: 30000,
4117 };
4118 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4119 chaos_config.enabled = true; drop(chaos_config);
4121 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4122 }
4123 }
4124
4125 Json(serde_json::json!({
4126 "success": true,
4127 "message": format!("Network profile '{}' applied", request.profile_name),
4128 "profile": {
4129 "name": profile.name,
4130 "description": profile.description,
4131 }
4132 }))
4133 .into_response()
4134 } else {
4135 (
4136 StatusCode::NOT_FOUND,
4137 Json(serde_json::json!({
4138 "error": "Profile not found",
4139 "message": format!("Network profile '{}' not found", request.profile_name)
4140 })),
4141 )
4142 .into_response()
4143 }
4144}
4145
4146pub fn management_router_with_ui_builder(
4148 state: ManagementState,
4149 server_config: mockforge_core::config::ServerConfig,
4150) -> Router {
4151 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4152
4153 let management = management_router(state);
4155
4156 let ui_builder_state = UIBuilderState::new(server_config);
4158 let ui_builder = create_ui_builder_router(ui_builder_state);
4159
4160 management.nest("/ui-builder", ui_builder)
4162}
4163
4164pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4166 use crate::spec_import::{spec_import_router, SpecImportState};
4167
4168 let management = management_router(state);
4170
4171 Router::new()
4173 .merge(management)
4174 .merge(spec_import_router(SpecImportState::new()))
4175}
4176
4177#[cfg(test)]
4178mod tests {
4179 use super::*;
4180
4181 #[tokio::test]
4182 async fn test_create_and_get_mock() {
4183 let state = ManagementState::new(None, None, 3000);
4184
4185 let mock = MockConfig {
4186 id: "test-1".to_string(),
4187 name: "Test Mock".to_string(),
4188 method: "GET".to_string(),
4189 path: "/test".to_string(),
4190 response: MockResponse {
4191 body: serde_json::json!({"message": "test"}),
4192 headers: None,
4193 },
4194 enabled: true,
4195 latency_ms: None,
4196 status_code: Some(200),
4197 request_match: None,
4198 priority: None,
4199 scenario: None,
4200 required_scenario_state: None,
4201 new_scenario_state: None,
4202 };
4203
4204 {
4206 let mut mocks = state.mocks.write().await;
4207 mocks.push(mock.clone());
4208 }
4209
4210 let mocks = state.mocks.read().await;
4212 let found = mocks.iter().find(|m| m.id == "test-1");
4213 assert!(found.is_some());
4214 assert_eq!(found.unwrap().name, "Test Mock");
4215 }
4216
4217 #[tokio::test]
4218 async fn test_server_stats() {
4219 let state = ManagementState::new(None, None, 3000);
4220
4221 {
4223 let mut mocks = state.mocks.write().await;
4224 mocks.push(MockConfig {
4225 id: "1".to_string(),
4226 name: "Mock 1".to_string(),
4227 method: "GET".to_string(),
4228 path: "/test1".to_string(),
4229 response: MockResponse {
4230 body: serde_json::json!({}),
4231 headers: None,
4232 },
4233 enabled: true,
4234 latency_ms: None,
4235 status_code: Some(200),
4236 request_match: None,
4237 priority: None,
4238 scenario: None,
4239 required_scenario_state: None,
4240 new_scenario_state: None,
4241 });
4242 mocks.push(MockConfig {
4243 id: "2".to_string(),
4244 name: "Mock 2".to_string(),
4245 method: "POST".to_string(),
4246 path: "/test2".to_string(),
4247 response: MockResponse {
4248 body: serde_json::json!({}),
4249 headers: None,
4250 },
4251 enabled: false,
4252 latency_ms: None,
4253 status_code: Some(201),
4254 request_match: None,
4255 priority: None,
4256 scenario: None,
4257 required_scenario_state: None,
4258 new_scenario_state: None,
4259 });
4260 }
4261
4262 let mocks = state.mocks.read().await;
4263 assert_eq!(mocks.len(), 2);
4264 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4265 }
4266
4267 #[test]
4268 fn test_mock_matches_request_with_xpath_absolute_path() {
4269 let mock = MockConfig {
4270 id: "xpath-1".to_string(),
4271 name: "XPath Match".to_string(),
4272 method: "POST".to_string(),
4273 path: "/xml".to_string(),
4274 response: MockResponse {
4275 body: serde_json::json!({"ok": true}),
4276 headers: None,
4277 },
4278 enabled: true,
4279 latency_ms: None,
4280 status_code: Some(200),
4281 request_match: Some(RequestMatchCriteria {
4282 xpath: Some("/root/order/id".to_string()),
4283 ..Default::default()
4284 }),
4285 priority: None,
4286 scenario: None,
4287 required_scenario_state: None,
4288 new_scenario_state: None,
4289 };
4290
4291 let body = br#"<root><order><id>123</id></order></root>"#;
4292 let headers = std::collections::HashMap::new();
4293 let query = std::collections::HashMap::new();
4294
4295 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4296 }
4297
4298 #[test]
4299 fn test_mock_matches_request_with_xpath_text_predicate() {
4300 let mock = MockConfig {
4301 id: "xpath-2".to_string(),
4302 name: "XPath Predicate Match".to_string(),
4303 method: "POST".to_string(),
4304 path: "/xml".to_string(),
4305 response: MockResponse {
4306 body: serde_json::json!({"ok": true}),
4307 headers: None,
4308 },
4309 enabled: true,
4310 latency_ms: None,
4311 status_code: Some(200),
4312 request_match: Some(RequestMatchCriteria {
4313 xpath: Some("//order/id[text()='123']".to_string()),
4314 ..Default::default()
4315 }),
4316 priority: None,
4317 scenario: None,
4318 required_scenario_state: None,
4319 new_scenario_state: None,
4320 };
4321
4322 let body = br#"<root><order><id>123</id></order></root>"#;
4323 let headers = std::collections::HashMap::new();
4324 let query = std::collections::HashMap::new();
4325
4326 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4327 }
4328
4329 #[test]
4330 fn test_mock_matches_request_with_xpath_no_match() {
4331 let mock = MockConfig {
4332 id: "xpath-3".to_string(),
4333 name: "XPath No Match".to_string(),
4334 method: "POST".to_string(),
4335 path: "/xml".to_string(),
4336 response: MockResponse {
4337 body: serde_json::json!({"ok": true}),
4338 headers: None,
4339 },
4340 enabled: true,
4341 latency_ms: None,
4342 status_code: Some(200),
4343 request_match: Some(RequestMatchCriteria {
4344 xpath: Some("//order/id[text()='456']".to_string()),
4345 ..Default::default()
4346 }),
4347 priority: None,
4348 scenario: None,
4349 required_scenario_state: None,
4350 new_scenario_state: None,
4351 };
4352
4353 let body = br#"<root><order><id>123</id></order></root>"#;
4354 let headers = std::collections::HashMap::new();
4355 let query = std::collections::HashMap::new();
4356
4357 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4358 }
4359}