1#[cfg(any(feature = "mqtt", feature = "kafka"))]
2use axum::response::sse::{Event, Sse};
3use axum::{
8 extract::{Path, Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Json},
11 routing::{delete, get, post, put},
12 Router,
13};
14#[cfg(any(feature = "mqtt", feature = "kafka"))]
15use futures::stream::{self, Stream};
16use mockforge_core::openapi::OpenApiSpec;
17use mockforge_core::proxy::config::{
18 BodyTransform, BodyTransformRule, ProxyConfig, TransformOperation,
19};
20#[cfg(feature = "smtp")]
21use mockforge_smtp::EmailSearchFilters;
22use serde::{Deserialize, Serialize};
23#[cfg(any(feature = "mqtt", feature = "kafka"))]
24use std::convert::Infallible;
25use std::sync::Arc;
26use tokio::sync::{broadcast, RwLock};
27use tracing::*;
28
29#[cfg(any(feature = "mqtt", feature = "kafka"))]
31const DEFAULT_MESSAGE_BROADCAST_CAPACITY: usize = 1000;
32
33#[cfg(any(feature = "mqtt", feature = "kafka"))]
35fn get_message_broadcast_capacity() -> usize {
36 std::env::var("MOCKFORGE_MESSAGE_BROADCAST_CAPACITY")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MESSAGE_BROADCAST_CAPACITY)
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", content = "data")]
45#[serde(rename_all = "lowercase")]
46pub enum MessageEvent {
47 #[cfg(feature = "mqtt")]
48 Mqtt(MqttMessageEvent),
50 #[cfg(feature = "kafka")]
51 Kafka(KafkaMessageEvent),
53}
54
55#[cfg(feature = "mqtt")]
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MqttMessageEvent {
59 pub topic: String,
61 pub payload: String,
63 pub qos: u8,
65 pub retain: bool,
67 pub timestamp: String,
69}
70
71#[cfg(feature = "kafka")]
72#[allow(missing_docs)]
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct KafkaMessageEvent {
75 pub topic: String,
76 pub key: Option<String>,
77 pub value: String,
78 pub partition: i32,
79 pub offset: i64,
80 pub headers: Option<std::collections::HashMap<String, String>>,
81 pub timestamp: String,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MockConfig {
87 #[serde(skip_serializing_if = "String::is_empty")]
89 pub id: String,
90 pub name: String,
92 pub method: String,
94 pub path: String,
96 pub response: MockResponse,
98 #[serde(default = "default_true")]
100 pub enabled: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub latency_ms: Option<u64>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub status_code: Option<u16>,
107 #[serde(skip_serializing_if = "Option::is_none")]
109 pub request_match: Option<RequestMatchCriteria>,
110 #[serde(skip_serializing_if = "Option::is_none")]
112 pub priority: Option<i32>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub scenario: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub required_scenario_state: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub new_scenario_state: Option<String>,
122}
123
124fn default_true() -> bool {
125 true
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct MockResponse {
131 pub body: serde_json::Value,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub headers: Option<std::collections::HashMap<String, String>>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, Default)]
140pub struct RequestMatchCriteria {
141 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
143 pub headers: std::collections::HashMap<String, String>,
144 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
146 pub query_params: std::collections::HashMap<String, String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub body_pattern: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub json_path: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub xpath: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub custom_matcher: Option<String>,
159}
160
161pub fn mock_matches_request(
170 mock: &MockConfig,
171 method: &str,
172 path: &str,
173 headers: &std::collections::HashMap<String, String>,
174 query_params: &std::collections::HashMap<String, String>,
175 body: Option<&[u8]>,
176) -> bool {
177 use regex::Regex;
178
179 if !mock.enabled {
181 return false;
182 }
183
184 if mock.method.to_uppercase() != method.to_uppercase() {
186 return false;
187 }
188
189 if !path_matches_pattern(&mock.path, path) {
191 return false;
192 }
193
194 if let Some(criteria) = &mock.request_match {
196 for (key, expected_value) in &criteria.headers {
198 let header_key_lower = key.to_lowercase();
199 let found = headers.iter().find(|(k, _)| k.to_lowercase() == header_key_lower);
200
201 if let Some((_, actual_value)) = found {
202 if let Ok(re) = Regex::new(expected_value) {
204 if !re.is_match(actual_value) {
205 return false;
206 }
207 } else if actual_value != expected_value {
208 return false;
209 }
210 } else {
211 return false; }
213 }
214
215 for (key, expected_value) in &criteria.query_params {
217 if let Some(actual_value) = query_params.get(key) {
218 if actual_value != expected_value {
219 return false;
220 }
221 } else {
222 return false; }
224 }
225
226 if let Some(pattern) = &criteria.body_pattern {
228 if let Some(body_bytes) = body {
229 let body_str = String::from_utf8_lossy(body_bytes);
230 if let Ok(re) = Regex::new(pattern) {
232 if !re.is_match(&body_str) {
233 return false;
234 }
235 } else if body_str.as_ref() != pattern {
236 return false;
237 }
238 } else {
239 return false; }
241 }
242
243 if let Some(json_path) = &criteria.json_path {
245 if let Some(body_bytes) = body {
246 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
247 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(body_str) {
248 if !json_path_exists(&json_value, json_path) {
250 return false;
251 }
252 }
253 }
254 }
255 }
256
257 if let Some(xpath) = &criteria.xpath {
259 if let Some(body_bytes) = body {
260 if let Ok(body_str) = std::str::from_utf8(body_bytes) {
261 if !xml_xpath_exists(body_str, xpath) {
262 return false;
263 }
264 } else {
265 return false;
266 }
267 } else {
268 return false; }
270 }
271
272 if let Some(custom) = &criteria.custom_matcher {
274 if !evaluate_custom_matcher(custom, method, path, headers, query_params, body) {
275 return false;
276 }
277 }
278 }
279
280 true
281}
282
283fn path_matches_pattern(pattern: &str, path: &str) -> bool {
285 if pattern == path {
287 return true;
288 }
289
290 if pattern == "*" {
292 return true;
293 }
294
295 let pattern_parts: Vec<&str> = pattern.split('/').filter(|s| !s.is_empty()).collect();
297 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
298
299 if pattern_parts.len() != path_parts.len() {
300 if pattern.contains('*') {
302 return matches_wildcard_pattern(pattern, path);
303 }
304 return false;
305 }
306
307 for (pattern_part, path_part) in pattern_parts.iter().zip(path_parts.iter()) {
308 if pattern_part.starts_with('{') && pattern_part.ends_with('}') {
310 continue; }
312
313 if pattern_part != path_part {
314 return false;
315 }
316 }
317
318 true
319}
320
321fn matches_wildcard_pattern(pattern: &str, path: &str) -> bool {
323 use regex::Regex;
324
325 let regex_pattern = pattern.replace('*', ".*").replace('?', ".?");
327
328 if let Ok(re) = Regex::new(&format!("^{}$", regex_pattern)) {
329 return re.is_match(path);
330 }
331
332 false
333}
334
335fn json_path_exists(json: &serde_json::Value, json_path: &str) -> bool {
343 let path = if json_path == "$" {
344 return true;
345 } else if let Some(p) = json_path.strip_prefix("$.") {
346 p
347 } else if let Some(p) = json_path.strip_prefix('$') {
348 p.strip_prefix('.').unwrap_or(p)
349 } else {
350 json_path
351 };
352
353 let mut current = json;
354 for segment in split_json_path_segments(path) {
355 match segment {
356 JsonPathSegment::Field(name) => {
357 if let Some(obj) = current.as_object() {
358 if let Some(value) = obj.get(name) {
359 current = value;
360 } else {
361 return false;
362 }
363 } else {
364 return false;
365 }
366 }
367 JsonPathSegment::Index(idx) => {
368 if let Some(arr) = current.as_array() {
369 if let Some(value) = arr.get(idx) {
370 current = value;
371 } else {
372 return false;
373 }
374 } else {
375 return false;
376 }
377 }
378 JsonPathSegment::Wildcard => {
379 if let Some(arr) = current.as_array() {
380 return !arr.is_empty();
381 }
382 return false;
383 }
384 }
385 }
386 true
387}
388
389enum JsonPathSegment<'a> {
390 Field(&'a str),
391 Index(usize),
392 Wildcard,
393}
394
395fn split_json_path_segments(path: &str) -> Vec<JsonPathSegment<'_>> {
397 let mut segments = Vec::new();
398 for part in path.split('.') {
399 if part.is_empty() {
400 continue;
401 }
402 if let Some(bracket_start) = part.find('[') {
403 let field_name = &part[..bracket_start];
404 if !field_name.is_empty() {
405 segments.push(JsonPathSegment::Field(field_name));
406 }
407 let bracket_content = &part[bracket_start + 1..part.len() - 1];
408 if bracket_content == "*" {
409 segments.push(JsonPathSegment::Wildcard);
410 } else if let Ok(idx) = bracket_content.parse::<usize>() {
411 segments.push(JsonPathSegment::Index(idx));
412 }
413 } else {
414 segments.push(JsonPathSegment::Field(part));
415 }
416 }
417 segments
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421struct XPathSegment {
422 name: String,
423 text_equals: Option<String>,
424}
425
426fn parse_xpath_segment(segment: &str) -> Option<XPathSegment> {
427 if segment.is_empty() {
428 return None;
429 }
430
431 let trimmed = segment.trim();
432 if let Some(bracket_start) = trimmed.find('[') {
433 if !trimmed.ends_with(']') {
434 return None;
435 }
436
437 let name = trimmed[..bracket_start].trim();
438 let predicate = &trimmed[bracket_start + 1..trimmed.len() - 1];
439 let predicate = predicate.trim();
440
441 if let Some(raw) = predicate.strip_prefix("text()=") {
443 let raw = raw.trim();
444 if raw.len() >= 2
445 && ((raw.starts_with('"') && raw.ends_with('"'))
446 || (raw.starts_with('\'') && raw.ends_with('\'')))
447 {
448 let text = raw[1..raw.len() - 1].to_string();
449 if !name.is_empty() {
450 return Some(XPathSegment {
451 name: name.to_string(),
452 text_equals: Some(text),
453 });
454 }
455 }
456 }
457
458 None
459 } else {
460 Some(XPathSegment {
461 name: trimmed.to_string(),
462 text_equals: None,
463 })
464 }
465}
466
467fn segment_matches(node: roxmltree::Node<'_, '_>, segment: &XPathSegment) -> bool {
468 if !node.is_element() {
469 return false;
470 }
471 if node.tag_name().name() != segment.name {
472 return false;
473 }
474 match &segment.text_equals {
475 Some(expected) => node.text().map(str::trim).unwrap_or_default() == expected,
476 None => true,
477 }
478}
479
480fn xml_xpath_exists(xml_body: &str, xpath: &str) -> bool {
487 let doc = match roxmltree::Document::parse(xml_body) {
488 Ok(doc) => doc,
489 Err(err) => {
490 tracing::warn!("Failed to parse XML for XPath matching: {}", err);
491 return false;
492 }
493 };
494
495 let expr = xpath.trim();
496 if expr.is_empty() {
497 return false;
498 }
499
500 let (is_descendant, path_str) = if let Some(rest) = expr.strip_prefix("//") {
501 (true, rest)
502 } else if let Some(rest) = expr.strip_prefix('/') {
503 (false, rest)
504 } else {
505 tracing::warn!("Unsupported XPath expression (must start with / or //): {}", expr);
506 return false;
507 };
508
509 let segments: Vec<XPathSegment> = path_str
510 .split('/')
511 .filter(|s| !s.trim().is_empty())
512 .filter_map(parse_xpath_segment)
513 .collect();
514
515 if segments.is_empty() {
516 return false;
517 }
518
519 if is_descendant {
520 let first = &segments[0];
521 for node in doc.descendants().filter(|n| segment_matches(*n, first)) {
522 let mut frontier = vec![node];
523 for segment in &segments[1..] {
524 let mut next_frontier = Vec::new();
525 for parent in &frontier {
526 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
527 next_frontier.push(child);
528 }
529 }
530 if next_frontier.is_empty() {
531 frontier.clear();
532 break;
533 }
534 frontier = next_frontier;
535 }
536 if !frontier.is_empty() {
537 return true;
538 }
539 }
540 false
541 } else {
542 let mut frontier = vec![doc.root_element()];
543 for (index, segment) in segments.iter().enumerate() {
544 let mut next_frontier = Vec::new();
545 for parent in &frontier {
546 if index == 0 {
547 if segment_matches(*parent, segment) {
548 next_frontier.push(*parent);
549 }
550 continue;
551 }
552 for child in parent.children().filter(|n| segment_matches(*n, segment)) {
553 next_frontier.push(child);
554 }
555 }
556 if next_frontier.is_empty() {
557 return false;
558 }
559 frontier = next_frontier;
560 }
561 !frontier.is_empty()
562 }
563}
564
565fn evaluate_custom_matcher(
567 expression: &str,
568 method: &str,
569 path: &str,
570 headers: &std::collections::HashMap<String, String>,
571 query_params: &std::collections::HashMap<String, String>,
572 body: Option<&[u8]>,
573) -> bool {
574 use regex::Regex;
575
576 let expr = expression.trim();
577
578 if expr.contains("==") {
580 let parts: Vec<&str> = expr.split("==").map(|s| s.trim()).collect();
581 if parts.len() != 2 {
582 return false;
583 }
584
585 let field = parts[0];
586 let expected_value = parts[1].trim_matches('"').trim_matches('\'');
587
588 match field {
589 "method" => method == expected_value,
590 "path" => path == expected_value,
591 _ if field.starts_with("headers.") => {
592 let header_name = &field[8..];
593 headers.get(header_name).map(|v| v == expected_value).unwrap_or(false)
594 }
595 _ if field.starts_with("query.") => {
596 let param_name = &field[6..];
597 query_params.get(param_name).map(|v| v == expected_value).unwrap_or(false)
598 }
599 _ => false,
600 }
601 }
602 else if expr.contains("=~") {
604 let parts: Vec<&str> = expr.split("=~").map(|s| s.trim()).collect();
605 if parts.len() != 2 {
606 return false;
607 }
608
609 let field = parts[0];
610 let pattern = parts[1].trim_matches('"').trim_matches('\'');
611
612 if let Ok(re) = Regex::new(pattern) {
613 match field {
614 "method" => re.is_match(method),
615 "path" => re.is_match(path),
616 _ if field.starts_with("headers.") => {
617 let header_name = &field[8..];
618 headers.get(header_name).map(|v| re.is_match(v)).unwrap_or(false)
619 }
620 _ if field.starts_with("query.") => {
621 let param_name = &field[6..];
622 query_params.get(param_name).map(|v| re.is_match(v)).unwrap_or(false)
623 }
624 _ => false,
625 }
626 } else {
627 false
628 }
629 }
630 else if expr.contains("contains") {
632 let parts: Vec<&str> = expr.split("contains").map(|s| s.trim()).collect();
633 if parts.len() != 2 {
634 return false;
635 }
636
637 let field = parts[0];
638 let search_value = parts[1].trim_matches('"').trim_matches('\'');
639
640 match field {
641 "path" => path.contains(search_value),
642 _ if field.starts_with("headers.") => {
643 let header_name = &field[8..];
644 headers.get(header_name).map(|v| v.contains(search_value)).unwrap_or(false)
645 }
646 _ if field.starts_with("body") => {
647 if let Some(body_bytes) = body {
648 let body_str = String::from_utf8_lossy(body_bytes);
649 body_str.contains(search_value)
650 } else {
651 false
652 }
653 }
654 _ => false,
655 }
656 } else {
657 tracing::warn!("Unknown custom matcher expression format: {}", expr);
659 false
660 }
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct ServerStats {
666 pub uptime_seconds: u64,
668 pub total_requests: u64,
670 pub active_mocks: usize,
672 pub enabled_mocks: usize,
674 pub registered_routes: usize,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct ServerConfig {
681 pub version: String,
683 pub port: u16,
685 pub has_openapi_spec: bool,
687 #[serde(skip_serializing_if = "Option::is_none")]
689 pub spec_path: Option<String>,
690}
691
692#[derive(Clone)]
694pub struct ManagementState {
695 pub mocks: Arc<RwLock<Vec<MockConfig>>>,
697 pub spec: Option<Arc<OpenApiSpec>>,
699 pub spec_path: Option<String>,
701 pub port: u16,
703 pub start_time: std::time::Instant,
705 pub request_counter: Arc<RwLock<u64>>,
707 pub proxy_config: Option<Arc<RwLock<ProxyConfig>>>,
709 #[cfg(feature = "smtp")]
711 pub smtp_registry: Option<Arc<mockforge_smtp::SmtpSpecRegistry>>,
712 #[cfg(feature = "mqtt")]
714 pub mqtt_broker: Option<Arc<mockforge_mqtt::MqttBroker>>,
715 #[cfg(feature = "kafka")]
717 pub kafka_broker: Option<Arc<mockforge_kafka::KafkaMockBroker>>,
718 #[cfg(any(feature = "mqtt", feature = "kafka"))]
720 pub message_events: Arc<broadcast::Sender<MessageEvent>>,
721 pub state_machine_manager:
723 Arc<RwLock<mockforge_scenarios::state_machine::ScenarioStateMachineManager>>,
724 pub ws_broadcast: Option<Arc<broadcast::Sender<crate::management_ws::MockEvent>>>,
726 pub lifecycle_hooks: Option<Arc<mockforge_core::lifecycle::LifecycleHookRegistry>>,
728 pub rule_explanations: Arc<
730 RwLock<
731 std::collections::HashMap<
732 String,
733 mockforge_core::intelligent_behavior::RuleExplanation,
734 >,
735 >,
736 >,
737 #[cfg(feature = "chaos")]
739 pub chaos_api_state: Option<Arc<mockforge_chaos::api::ChaosApiState>>,
740 pub server_config: Option<Arc<RwLock<mockforge_core::config::ServerConfig>>>,
742 #[cfg(feature = "conformance")]
744 pub conformance_state: crate::handlers::conformance::ConformanceState,
745}
746
747impl ManagementState {
748 pub fn new(spec: Option<Arc<OpenApiSpec>>, spec_path: Option<String>, port: u16) -> Self {
755 Self {
756 mocks: Arc::new(RwLock::new(Vec::new())),
757 spec,
758 spec_path,
759 port,
760 start_time: std::time::Instant::now(),
761 request_counter: Arc::new(RwLock::new(0)),
762 proxy_config: None,
763 #[cfg(feature = "smtp")]
764 smtp_registry: None,
765 #[cfg(feature = "mqtt")]
766 mqtt_broker: None,
767 #[cfg(feature = "kafka")]
768 kafka_broker: None,
769 #[cfg(any(feature = "mqtt", feature = "kafka"))]
770 message_events: {
771 let capacity = get_message_broadcast_capacity();
772 let (tx, _) = broadcast::channel(capacity);
773 Arc::new(tx)
774 },
775 state_machine_manager: Arc::new(RwLock::new(
776 mockforge_scenarios::state_machine::ScenarioStateMachineManager::new(),
777 )),
778 ws_broadcast: None,
779 lifecycle_hooks: None,
780 rule_explanations: Arc::new(RwLock::new(std::collections::HashMap::new())),
781 #[cfg(feature = "chaos")]
782 chaos_api_state: None,
783 server_config: None,
784 #[cfg(feature = "conformance")]
785 conformance_state: crate::handlers::conformance::ConformanceState::new(),
786 }
787 }
788
789 pub fn with_lifecycle_hooks(
791 mut self,
792 hooks: Arc<mockforge_core::lifecycle::LifecycleHookRegistry>,
793 ) -> Self {
794 self.lifecycle_hooks = Some(hooks);
795 self
796 }
797
798 pub fn with_ws_broadcast(
800 mut self,
801 ws_broadcast: Arc<broadcast::Sender<crate::management_ws::MockEvent>>,
802 ) -> Self {
803 self.ws_broadcast = Some(ws_broadcast);
804 self
805 }
806
807 pub fn with_proxy_config(mut self, proxy_config: Arc<RwLock<ProxyConfig>>) -> Self {
809 self.proxy_config = Some(proxy_config);
810 self
811 }
812
813 #[cfg(feature = "smtp")]
814 pub fn with_smtp_registry(
816 mut self,
817 smtp_registry: Arc<mockforge_smtp::SmtpSpecRegistry>,
818 ) -> Self {
819 self.smtp_registry = Some(smtp_registry);
820 self
821 }
822
823 #[cfg(feature = "mqtt")]
824 pub fn with_mqtt_broker(mut self, mqtt_broker: Arc<mockforge_mqtt::MqttBroker>) -> Self {
826 self.mqtt_broker = Some(mqtt_broker);
827 self
828 }
829
830 #[cfg(feature = "kafka")]
831 pub fn with_kafka_broker(
833 mut self,
834 kafka_broker: Arc<mockforge_kafka::KafkaMockBroker>,
835 ) -> Self {
836 self.kafka_broker = Some(kafka_broker);
837 self
838 }
839
840 #[cfg(feature = "chaos")]
841 pub fn with_chaos_api_state(
843 mut self,
844 chaos_api_state: Arc<mockforge_chaos::api::ChaosApiState>,
845 ) -> Self {
846 self.chaos_api_state = Some(chaos_api_state);
847 self
848 }
849
850 pub fn with_server_config(
852 mut self,
853 server_config: Arc<RwLock<mockforge_core::config::ServerConfig>>,
854 ) -> Self {
855 self.server_config = Some(server_config);
856 self
857 }
858}
859
860async fn list_mocks(State(state): State<ManagementState>) -> Json<serde_json::Value> {
862 let mocks = state.mocks.read().await;
863 Json(serde_json::json!({
864 "mocks": *mocks,
865 "total": mocks.len(),
866 "enabled": mocks.iter().filter(|m| m.enabled).count()
867 }))
868}
869
870async fn get_mock(
872 State(state): State<ManagementState>,
873 Path(id): Path<String>,
874) -> Result<Json<MockConfig>, StatusCode> {
875 let mocks = state.mocks.read().await;
876 mocks
877 .iter()
878 .find(|m| m.id == id)
879 .cloned()
880 .map(Json)
881 .ok_or(StatusCode::NOT_FOUND)
882}
883
884async fn create_mock(
886 State(state): State<ManagementState>,
887 Json(mut mock): Json<MockConfig>,
888) -> Result<Json<MockConfig>, StatusCode> {
889 let mut mocks = state.mocks.write().await;
890
891 if mock.id.is_empty() {
893 mock.id = uuid::Uuid::new_v4().to_string();
894 }
895
896 if mocks.iter().any(|m| m.id == mock.id) {
898 return Err(StatusCode::CONFLICT);
899 }
900
901 info!("Creating mock: {} {} {}", mock.method, mock.path, mock.id);
902
903 if let Some(hooks) = &state.lifecycle_hooks {
905 let event = mockforge_core::lifecycle::MockLifecycleEvent::Created {
906 id: mock.id.clone(),
907 name: mock.name.clone(),
908 config: serde_json::to_value(&mock).unwrap_or_default(),
909 };
910 hooks.invoke_mock_created(&event).await;
911 }
912
913 mocks.push(mock.clone());
914
915 if let Some(tx) = &state.ws_broadcast {
917 let _ = tx.send(crate::management_ws::MockEvent::mock_created(mock.clone()));
918 }
919
920 Ok(Json(mock))
921}
922
923async fn update_mock(
925 State(state): State<ManagementState>,
926 Path(id): Path<String>,
927 Json(updated_mock): Json<MockConfig>,
928) -> Result<Json<MockConfig>, StatusCode> {
929 let mut mocks = state.mocks.write().await;
930
931 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
932
933 let old_mock = mocks[position].clone();
935
936 info!("Updating mock: {}", id);
937 mocks[position] = updated_mock.clone();
938
939 if let Some(hooks) = &state.lifecycle_hooks {
941 let event = mockforge_core::lifecycle::MockLifecycleEvent::Updated {
942 id: updated_mock.id.clone(),
943 name: updated_mock.name.clone(),
944 config: serde_json::to_value(&updated_mock).unwrap_or_default(),
945 };
946 hooks.invoke_mock_updated(&event).await;
947
948 if old_mock.enabled != updated_mock.enabled {
950 let state_event = if updated_mock.enabled {
951 mockforge_core::lifecycle::MockLifecycleEvent::Enabled {
952 id: updated_mock.id.clone(),
953 }
954 } else {
955 mockforge_core::lifecycle::MockLifecycleEvent::Disabled {
956 id: updated_mock.id.clone(),
957 }
958 };
959 hooks.invoke_mock_state_changed(&state_event).await;
960 }
961 }
962
963 if let Some(tx) = &state.ws_broadcast {
965 let _ = tx.send(crate::management_ws::MockEvent::mock_updated(updated_mock.clone()));
966 }
967
968 Ok(Json(updated_mock))
969}
970
971async fn delete_mock(
973 State(state): State<ManagementState>,
974 Path(id): Path<String>,
975) -> Result<StatusCode, StatusCode> {
976 let mut mocks = state.mocks.write().await;
977
978 let position = mocks.iter().position(|m| m.id == id).ok_or(StatusCode::NOT_FOUND)?;
979
980 let deleted_mock = mocks[position].clone();
982
983 info!("Deleting mock: {}", id);
984 mocks.remove(position);
985
986 if let Some(hooks) = &state.lifecycle_hooks {
988 let event = mockforge_core::lifecycle::MockLifecycleEvent::Deleted {
989 id: deleted_mock.id.clone(),
990 name: deleted_mock.name.clone(),
991 };
992 hooks.invoke_mock_deleted(&event).await;
993 }
994
995 if let Some(tx) = &state.ws_broadcast {
997 let _ = tx.send(crate::management_ws::MockEvent::mock_deleted(id.clone()));
998 }
999
1000 Ok(StatusCode::NO_CONTENT)
1001}
1002
1003#[derive(Debug, Deserialize)]
1005pub struct ValidateConfigRequest {
1006 pub config: serde_json::Value,
1008 #[serde(default = "default_format")]
1010 pub format: String,
1011}
1012
1013fn default_format() -> String {
1014 "json".to_string()
1015}
1016
1017async fn validate_config(Json(request): Json<ValidateConfigRequest>) -> impl IntoResponse {
1019 use mockforge_core::config::ServerConfig;
1020
1021 let config_result: Result<ServerConfig, String> = match request.format.as_str() {
1022 "yaml" | "yml" => {
1023 let yaml_str = match serde_json::to_string(&request.config) {
1024 Ok(s) => s,
1025 Err(e) => {
1026 return (
1027 StatusCode::BAD_REQUEST,
1028 Json(serde_json::json!({
1029 "valid": false,
1030 "error": format!("Failed to convert to string: {}", e),
1031 "message": "Configuration validation failed"
1032 })),
1033 )
1034 .into_response();
1035 }
1036 };
1037 serde_yaml::from_str(&yaml_str).map_err(|e| format!("YAML parse error: {}", e))
1038 }
1039 _ => serde_json::from_value(request.config).map_err(|e| format!("JSON parse error: {}", e)),
1040 };
1041
1042 match config_result {
1043 Ok(_) => Json(serde_json::json!({
1044 "valid": true,
1045 "message": "Configuration is valid"
1046 }))
1047 .into_response(),
1048 Err(e) => (
1049 StatusCode::BAD_REQUEST,
1050 Json(serde_json::json!({
1051 "valid": false,
1052 "error": format!("Invalid configuration: {}", e),
1053 "message": "Configuration validation failed"
1054 })),
1055 )
1056 .into_response(),
1057 }
1058}
1059
1060#[derive(Debug, Deserialize)]
1062pub struct BulkConfigUpdateRequest {
1063 pub updates: serde_json::Value,
1065}
1066
1067async fn bulk_update_config(
1075 State(state): State<ManagementState>,
1076 Json(request): Json<BulkConfigUpdateRequest>,
1077) -> impl IntoResponse {
1078 if !request.updates.is_object() {
1080 return (
1081 StatusCode::BAD_REQUEST,
1082 Json(serde_json::json!({
1083 "error": "Invalid request",
1084 "message": "Updates must be a JSON object"
1085 })),
1086 )
1087 .into_response();
1088 }
1089
1090 use mockforge_core::config::ServerConfig;
1092
1093 let base_config = ServerConfig::default();
1095 let base_json = match serde_json::to_value(&base_config) {
1096 Ok(v) => v,
1097 Err(e) => {
1098 return (
1099 StatusCode::INTERNAL_SERVER_ERROR,
1100 Json(serde_json::json!({
1101 "error": "Internal error",
1102 "message": format!("Failed to serialize base config: {}", e)
1103 })),
1104 )
1105 .into_response();
1106 }
1107 };
1108
1109 let mut merged = base_json.clone();
1111 if let (Some(merged_obj), Some(updates_obj)) =
1112 (merged.as_object_mut(), request.updates.as_object())
1113 {
1114 for (key, value) in updates_obj {
1115 merged_obj.insert(key.clone(), value.clone());
1116 }
1117 }
1118
1119 match serde_json::from_value::<ServerConfig>(merged) {
1121 Ok(validated_config) => {
1122 if let Some(ref config_lock) = state.server_config {
1124 let mut config = config_lock.write().await;
1125 *config = validated_config;
1126 Json(serde_json::json!({
1127 "success": true,
1128 "message": "Bulk configuration update applied successfully",
1129 "updates_received": request.updates,
1130 "validated": true,
1131 "applied": true
1132 }))
1133 .into_response()
1134 } else {
1135 Json(serde_json::json!({
1136 "success": true,
1137 "message": "Bulk configuration update validated but not applied (no server config in state). Use .with_server_config() when building ManagementState.",
1138 "updates_received": request.updates,
1139 "validated": true,
1140 "applied": false
1141 }))
1142 .into_response()
1143 }
1144 }
1145 Err(e) => (
1146 StatusCode::BAD_REQUEST,
1147 Json(serde_json::json!({
1148 "error": "Invalid configuration",
1149 "message": format!("Configuration validation failed: {}", e),
1150 "validated": false
1151 })),
1152 )
1153 .into_response(),
1154 }
1155}
1156
1157async fn get_stats(State(state): State<ManagementState>) -> Json<ServerStats> {
1159 let mocks = state.mocks.read().await;
1160 let request_count = *state.request_counter.read().await;
1161
1162 Json(ServerStats {
1163 uptime_seconds: state.start_time.elapsed().as_secs(),
1164 total_requests: request_count,
1165 active_mocks: mocks.len(),
1166 enabled_mocks: mocks.iter().filter(|m| m.enabled).count(),
1167 registered_routes: mocks.len(), })
1169}
1170
1171async fn get_config(State(state): State<ManagementState>) -> Json<ServerConfig> {
1173 Json(ServerConfig {
1174 version: env!("CARGO_PKG_VERSION").to_string(),
1175 port: state.port,
1176 has_openapi_spec: state.spec.is_some(),
1177 spec_path: state.spec_path.clone(),
1178 })
1179}
1180
1181async fn health_check() -> Json<serde_json::Value> {
1183 Json(serde_json::json!({
1184 "status": "healthy",
1185 "service": "mockforge-management",
1186 "timestamp": chrono::Utc::now().to_rfc3339()
1187 }))
1188}
1189
1190#[derive(Debug, Clone, Serialize, Deserialize)]
1192#[serde(rename_all = "lowercase")]
1193pub enum ExportFormat {
1194 Json,
1196 Yaml,
1198}
1199
1200async fn export_mocks(
1202 State(state): State<ManagementState>,
1203 Query(params): Query<std::collections::HashMap<String, String>>,
1204) -> Result<(StatusCode, String), StatusCode> {
1205 let mocks = state.mocks.read().await;
1206
1207 let format = params
1208 .get("format")
1209 .map(|f| match f.as_str() {
1210 "yaml" | "yml" => ExportFormat::Yaml,
1211 _ => ExportFormat::Json,
1212 })
1213 .unwrap_or(ExportFormat::Json);
1214
1215 match format {
1216 ExportFormat::Json => serde_json::to_string_pretty(&*mocks)
1217 .map(|json| (StatusCode::OK, json))
1218 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1219 ExportFormat::Yaml => serde_yaml::to_string(&*mocks)
1220 .map(|yaml| (StatusCode::OK, yaml))
1221 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR),
1222 }
1223}
1224
1225async fn import_mocks(
1227 State(state): State<ManagementState>,
1228 Json(mocks): Json<Vec<MockConfig>>,
1229) -> impl IntoResponse {
1230 let mut current_mocks = state.mocks.write().await;
1231 current_mocks.clear();
1232 current_mocks.extend(mocks);
1233 Json(serde_json::json!({ "status": "imported", "count": current_mocks.len() }))
1234}
1235
1236#[cfg(feature = "smtp")]
1237async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
1239 if let Some(ref smtp_registry) = state.smtp_registry {
1240 match smtp_registry.get_emails() {
1241 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1242 Err(e) => (
1243 StatusCode::INTERNAL_SERVER_ERROR,
1244 Json(serde_json::json!({
1245 "error": "Failed to retrieve emails",
1246 "message": e.to_string()
1247 })),
1248 ),
1249 }
1250 } else {
1251 (
1252 StatusCode::NOT_IMPLEMENTED,
1253 Json(serde_json::json!({
1254 "error": "SMTP mailbox management not available",
1255 "message": "SMTP server is not enabled or registry not available."
1256 })),
1257 )
1258 }
1259}
1260
1261#[cfg(feature = "smtp")]
1263async fn get_smtp_email(
1264 State(state): State<ManagementState>,
1265 Path(id): Path<String>,
1266) -> impl IntoResponse {
1267 if let Some(ref smtp_registry) = state.smtp_registry {
1268 match smtp_registry.get_email_by_id(&id) {
1269 Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
1270 Ok(None) => (
1271 StatusCode::NOT_FOUND,
1272 Json(serde_json::json!({
1273 "error": "Email not found",
1274 "id": id
1275 })),
1276 ),
1277 Err(e) => (
1278 StatusCode::INTERNAL_SERVER_ERROR,
1279 Json(serde_json::json!({
1280 "error": "Failed to retrieve email",
1281 "message": e.to_string()
1282 })),
1283 ),
1284 }
1285 } else {
1286 (
1287 StatusCode::NOT_IMPLEMENTED,
1288 Json(serde_json::json!({
1289 "error": "SMTP mailbox management not available",
1290 "message": "SMTP server is not enabled or registry not available."
1291 })),
1292 )
1293 }
1294}
1295
1296#[cfg(feature = "smtp")]
1298async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
1299 if let Some(ref smtp_registry) = state.smtp_registry {
1300 match smtp_registry.clear_mailbox() {
1301 Ok(()) => (
1302 StatusCode::OK,
1303 Json(serde_json::json!({
1304 "message": "Mailbox cleared successfully"
1305 })),
1306 ),
1307 Err(e) => (
1308 StatusCode::INTERNAL_SERVER_ERROR,
1309 Json(serde_json::json!({
1310 "error": "Failed to clear mailbox",
1311 "message": e.to_string()
1312 })),
1313 ),
1314 }
1315 } else {
1316 (
1317 StatusCode::NOT_IMPLEMENTED,
1318 Json(serde_json::json!({
1319 "error": "SMTP mailbox management not available",
1320 "message": "SMTP server is not enabled or registry not available."
1321 })),
1322 )
1323 }
1324}
1325
1326#[cfg(feature = "smtp")]
1328async fn export_smtp_mailbox(
1329 Query(params): Query<std::collections::HashMap<String, String>>,
1330) -> impl IntoResponse {
1331 let format = params.get("format").unwrap_or(&"json".to_string()).clone();
1332 (
1333 StatusCode::NOT_IMPLEMENTED,
1334 Json(serde_json::json!({
1335 "error": "SMTP mailbox management not available via HTTP API",
1336 "message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
1337 "requested_format": format
1338 })),
1339 )
1340}
1341
1342#[cfg(feature = "smtp")]
1344async fn search_smtp_emails(
1345 State(state): State<ManagementState>,
1346 Query(params): Query<std::collections::HashMap<String, String>>,
1347) -> impl IntoResponse {
1348 if let Some(ref smtp_registry) = state.smtp_registry {
1349 let filters = EmailSearchFilters {
1350 sender: params.get("sender").cloned(),
1351 recipient: params.get("recipient").cloned(),
1352 subject: params.get("subject").cloned(),
1353 body: params.get("body").cloned(),
1354 since: params
1355 .get("since")
1356 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1357 .map(|dt| dt.with_timezone(&chrono::Utc)),
1358 until: params
1359 .get("until")
1360 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1361 .map(|dt| dt.with_timezone(&chrono::Utc)),
1362 use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
1363 case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
1364 };
1365
1366 match smtp_registry.search_emails(filters) {
1367 Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
1368 Err(e) => (
1369 StatusCode::INTERNAL_SERVER_ERROR,
1370 Json(serde_json::json!({
1371 "error": "Failed to search emails",
1372 "message": e.to_string()
1373 })),
1374 ),
1375 }
1376 } else {
1377 (
1378 StatusCode::NOT_IMPLEMENTED,
1379 Json(serde_json::json!({
1380 "error": "SMTP mailbox management not available",
1381 "message": "SMTP server is not enabled or registry not available."
1382 })),
1383 )
1384 }
1385}
1386
1387#[cfg(feature = "mqtt")]
1389#[derive(Debug, Clone, Serialize, Deserialize)]
1390pub struct MqttBrokerStats {
1391 pub connected_clients: usize,
1393 pub active_topics: usize,
1395 pub retained_messages: usize,
1397 pub total_subscriptions: usize,
1399}
1400
1401#[cfg(feature = "mqtt")]
1403async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
1404 if let Some(broker) = &state.mqtt_broker {
1405 let connected_clients = broker.get_connected_clients().await.len();
1406 let active_topics = broker.get_active_topics().await.len();
1407 let stats = broker.get_topic_stats().await;
1408
1409 let broker_stats = MqttBrokerStats {
1410 connected_clients,
1411 active_topics,
1412 retained_messages: stats.retained_messages,
1413 total_subscriptions: stats.total_subscriptions,
1414 };
1415
1416 Json(broker_stats).into_response()
1417 } else {
1418 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1419 }
1420}
1421
1422#[cfg(feature = "mqtt")]
1423async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
1424 if let Some(broker) = &state.mqtt_broker {
1425 let clients = broker.get_connected_clients().await;
1426 Json(serde_json::json!({
1427 "clients": clients
1428 }))
1429 .into_response()
1430 } else {
1431 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1432 }
1433}
1434
1435#[cfg(feature = "mqtt")]
1436async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
1437 if let Some(broker) = &state.mqtt_broker {
1438 let topics = broker.get_active_topics().await;
1439 Json(serde_json::json!({
1440 "topics": topics
1441 }))
1442 .into_response()
1443 } else {
1444 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1445 }
1446}
1447
1448#[cfg(feature = "mqtt")]
1449async fn disconnect_mqtt_client(
1450 State(state): State<ManagementState>,
1451 Path(client_id): Path<String>,
1452) -> impl IntoResponse {
1453 if let Some(broker) = &state.mqtt_broker {
1454 match broker.disconnect_client(&client_id).await {
1455 Ok(_) => {
1456 (StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
1457 }
1458 Err(e) => {
1459 (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to disconnect client: {}", e))
1460 .into_response()
1461 }
1462 }
1463 } else {
1464 (StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
1465 }
1466}
1467
1468#[cfg(feature = "mqtt")]
1471#[derive(Debug, Deserialize)]
1473pub struct MqttPublishRequest {
1474 pub topic: String,
1476 pub payload: String,
1478 #[serde(default = "default_qos")]
1480 pub qos: u8,
1481 #[serde(default)]
1483 pub retain: bool,
1484}
1485
1486#[cfg(feature = "mqtt")]
1487fn default_qos() -> u8 {
1488 0
1489}
1490
1491#[cfg(feature = "mqtt")]
1492async fn publish_mqtt_message_handler(
1494 State(state): State<ManagementState>,
1495 Json(request): Json<serde_json::Value>,
1496) -> impl IntoResponse {
1497 let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1499 let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1500 let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1501 let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1502
1503 if topic.is_none() || payload.is_none() {
1504 return (
1505 StatusCode::BAD_REQUEST,
1506 Json(serde_json::json!({
1507 "error": "Invalid request",
1508 "message": "Missing required fields: topic and payload"
1509 })),
1510 );
1511 }
1512
1513 let topic = topic.unwrap();
1514 let payload = payload.unwrap();
1515
1516 if let Some(broker) = &state.mqtt_broker {
1517 if qos > 2 {
1519 return (
1520 StatusCode::BAD_REQUEST,
1521 Json(serde_json::json!({
1522 "error": "Invalid QoS",
1523 "message": "QoS must be 0, 1, or 2"
1524 })),
1525 );
1526 }
1527
1528 let payload_bytes = payload.as_bytes().to_vec();
1530 let client_id = "mockforge-management-api".to_string();
1531
1532 let publish_result = broker
1533 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1534 .await
1535 .map_err(|e| format!("{}", e));
1536
1537 match publish_result {
1538 Ok(_) => {
1539 let event = MessageEvent::Mqtt(MqttMessageEvent {
1541 topic: topic.clone(),
1542 payload: payload.clone(),
1543 qos,
1544 retain,
1545 timestamp: chrono::Utc::now().to_rfc3339(),
1546 });
1547 let _ = state.message_events.send(event);
1548
1549 (
1550 StatusCode::OK,
1551 Json(serde_json::json!({
1552 "success": true,
1553 "message": format!("Message published to topic '{}'", topic),
1554 "topic": topic,
1555 "qos": qos,
1556 "retain": retain
1557 })),
1558 )
1559 }
1560 Err(error_msg) => (
1561 StatusCode::INTERNAL_SERVER_ERROR,
1562 Json(serde_json::json!({
1563 "error": "Failed to publish message",
1564 "message": error_msg
1565 })),
1566 ),
1567 }
1568 } else {
1569 (
1570 StatusCode::SERVICE_UNAVAILABLE,
1571 Json(serde_json::json!({
1572 "error": "MQTT broker not available",
1573 "message": "MQTT broker is not enabled or not available."
1574 })),
1575 )
1576 }
1577}
1578
1579#[cfg(not(feature = "mqtt"))]
1580async fn publish_mqtt_message_handler(
1582 State(_state): State<ManagementState>,
1583 Json(_request): Json<serde_json::Value>,
1584) -> impl IntoResponse {
1585 (
1586 StatusCode::SERVICE_UNAVAILABLE,
1587 Json(serde_json::json!({
1588 "error": "MQTT feature not enabled",
1589 "message": "MQTT support is not compiled into this build"
1590 })),
1591 )
1592}
1593
1594#[cfg(feature = "mqtt")]
1595#[derive(Debug, Deserialize)]
1597pub struct MqttBatchPublishRequest {
1598 pub messages: Vec<MqttPublishRequest>,
1600 #[serde(default = "default_delay")]
1602 pub delay_ms: u64,
1603}
1604
1605#[cfg(feature = "mqtt")]
1606fn default_delay() -> u64 {
1607 100
1608}
1609
1610#[cfg(feature = "mqtt")]
1611async fn publish_mqtt_batch_handler(
1613 State(state): State<ManagementState>,
1614 Json(request): Json<serde_json::Value>,
1615) -> impl IntoResponse {
1616 let messages_json = request.get("messages").and_then(|v| v.as_array());
1618 let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
1619
1620 if messages_json.is_none() {
1621 return (
1622 StatusCode::BAD_REQUEST,
1623 Json(serde_json::json!({
1624 "error": "Invalid request",
1625 "message": "Missing required field: messages"
1626 })),
1627 );
1628 }
1629
1630 let messages_json = messages_json.unwrap();
1631
1632 if let Some(broker) = &state.mqtt_broker {
1633 if messages_json.is_empty() {
1634 return (
1635 StatusCode::BAD_REQUEST,
1636 Json(serde_json::json!({
1637 "error": "Empty batch",
1638 "message": "At least one message is required"
1639 })),
1640 );
1641 }
1642
1643 let mut results = Vec::new();
1644 let client_id = "mockforge-management-api".to_string();
1645
1646 for (index, msg_json) in messages_json.iter().enumerate() {
1647 let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
1648 let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
1649 let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
1650 let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
1651
1652 if topic.is_none() || payload.is_none() {
1653 results.push(serde_json::json!({
1654 "index": index,
1655 "success": false,
1656 "error": "Missing required fields: topic and payload"
1657 }));
1658 continue;
1659 }
1660
1661 let topic = topic.unwrap();
1662 let payload = payload.unwrap();
1663
1664 if qos > 2 {
1666 results.push(serde_json::json!({
1667 "index": index,
1668 "success": false,
1669 "error": "Invalid QoS (must be 0, 1, or 2)"
1670 }));
1671 continue;
1672 }
1673
1674 let payload_bytes = payload.as_bytes().to_vec();
1676
1677 let publish_result = broker
1678 .handle_publish(&client_id, &topic, payload_bytes, qos, retain)
1679 .await
1680 .map_err(|e| format!("{}", e));
1681
1682 match publish_result {
1683 Ok(_) => {
1684 let event = MessageEvent::Mqtt(MqttMessageEvent {
1686 topic: topic.clone(),
1687 payload: payload.clone(),
1688 qos,
1689 retain,
1690 timestamp: chrono::Utc::now().to_rfc3339(),
1691 });
1692 let _ = state.message_events.send(event);
1693
1694 results.push(serde_json::json!({
1695 "index": index,
1696 "success": true,
1697 "topic": topic,
1698 "qos": qos
1699 }));
1700 }
1701 Err(error_msg) => {
1702 results.push(serde_json::json!({
1703 "index": index,
1704 "success": false,
1705 "error": error_msg
1706 }));
1707 }
1708 }
1709
1710 if index < messages_json.len() - 1 && delay_ms > 0 {
1712 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1713 }
1714 }
1715
1716 let success_count =
1717 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
1718
1719 (
1720 StatusCode::OK,
1721 Json(serde_json::json!({
1722 "success": true,
1723 "total": messages_json.len(),
1724 "succeeded": success_count,
1725 "failed": messages_json.len() - success_count,
1726 "results": results
1727 })),
1728 )
1729 } else {
1730 (
1731 StatusCode::SERVICE_UNAVAILABLE,
1732 Json(serde_json::json!({
1733 "error": "MQTT broker not available",
1734 "message": "MQTT broker is not enabled or not available."
1735 })),
1736 )
1737 }
1738}
1739
1740#[cfg(not(feature = "mqtt"))]
1741async fn publish_mqtt_batch_handler(
1743 State(_state): State<ManagementState>,
1744 Json(_request): Json<serde_json::Value>,
1745) -> impl IntoResponse {
1746 (
1747 StatusCode::SERVICE_UNAVAILABLE,
1748 Json(serde_json::json!({
1749 "error": "MQTT feature not enabled",
1750 "message": "MQTT support is not compiled into this build"
1751 })),
1752 )
1753}
1754
1755#[derive(Debug, Deserialize)]
1759struct SetMigrationModeRequest {
1760 mode: String,
1761}
1762
1763async fn get_migration_routes(
1765 State(state): State<ManagementState>,
1766) -> Result<Json<serde_json::Value>, StatusCode> {
1767 let proxy_config = match &state.proxy_config {
1768 Some(config) => config,
1769 None => {
1770 return Ok(Json(serde_json::json!({
1771 "error": "Migration not configured. Proxy config not available."
1772 })));
1773 }
1774 };
1775
1776 let config = proxy_config.read().await;
1777 let routes = config.get_migration_routes();
1778
1779 Ok(Json(serde_json::json!({
1780 "routes": routes
1781 })))
1782}
1783
1784async fn toggle_route_migration(
1786 State(state): State<ManagementState>,
1787 Path(pattern): Path<String>,
1788) -> Result<Json<serde_json::Value>, StatusCode> {
1789 let proxy_config = match &state.proxy_config {
1790 Some(config) => config,
1791 None => {
1792 return Ok(Json(serde_json::json!({
1793 "error": "Migration not configured. Proxy config not available."
1794 })));
1795 }
1796 };
1797
1798 let mut config = proxy_config.write().await;
1799 let new_mode = match config.toggle_route_migration(&pattern) {
1800 Some(mode) => mode,
1801 None => {
1802 return Ok(Json(serde_json::json!({
1803 "error": format!("Route pattern not found: {}", pattern)
1804 })));
1805 }
1806 };
1807
1808 Ok(Json(serde_json::json!({
1809 "pattern": pattern,
1810 "mode": format!("{:?}", new_mode).to_lowercase()
1811 })))
1812}
1813
1814async fn set_route_migration_mode(
1816 State(state): State<ManagementState>,
1817 Path(pattern): Path<String>,
1818 Json(request): Json<SetMigrationModeRequest>,
1819) -> Result<Json<serde_json::Value>, StatusCode> {
1820 let proxy_config = match &state.proxy_config {
1821 Some(config) => config,
1822 None => {
1823 return Ok(Json(serde_json::json!({
1824 "error": "Migration not configured. Proxy config not available."
1825 })));
1826 }
1827 };
1828
1829 use mockforge_core::proxy::config::MigrationMode;
1830 let mode = match request.mode.to_lowercase().as_str() {
1831 "mock" => MigrationMode::Mock,
1832 "shadow" => MigrationMode::Shadow,
1833 "real" => MigrationMode::Real,
1834 "auto" => MigrationMode::Auto,
1835 _ => {
1836 return Ok(Json(serde_json::json!({
1837 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1838 })));
1839 }
1840 };
1841
1842 let mut config = proxy_config.write().await;
1843 let updated = config.update_rule_migration_mode(&pattern, mode);
1844
1845 if !updated {
1846 return Ok(Json(serde_json::json!({
1847 "error": format!("Route pattern not found: {}", pattern)
1848 })));
1849 }
1850
1851 Ok(Json(serde_json::json!({
1852 "pattern": pattern,
1853 "mode": format!("{:?}", mode).to_lowercase()
1854 })))
1855}
1856
1857async fn toggle_group_migration(
1859 State(state): State<ManagementState>,
1860 Path(group): Path<String>,
1861) -> Result<Json<serde_json::Value>, StatusCode> {
1862 let proxy_config = match &state.proxy_config {
1863 Some(config) => config,
1864 None => {
1865 return Ok(Json(serde_json::json!({
1866 "error": "Migration not configured. Proxy config not available."
1867 })));
1868 }
1869 };
1870
1871 let mut config = proxy_config.write().await;
1872 let new_mode = config.toggle_group_migration(&group);
1873
1874 Ok(Json(serde_json::json!({
1875 "group": group,
1876 "mode": format!("{:?}", new_mode).to_lowercase()
1877 })))
1878}
1879
1880async fn set_group_migration_mode(
1882 State(state): State<ManagementState>,
1883 Path(group): Path<String>,
1884 Json(request): Json<SetMigrationModeRequest>,
1885) -> Result<Json<serde_json::Value>, StatusCode> {
1886 let proxy_config = match &state.proxy_config {
1887 Some(config) => config,
1888 None => {
1889 return Ok(Json(serde_json::json!({
1890 "error": "Migration not configured. Proxy config not available."
1891 })));
1892 }
1893 };
1894
1895 use mockforge_core::proxy::config::MigrationMode;
1896 let mode = match request.mode.to_lowercase().as_str() {
1897 "mock" => MigrationMode::Mock,
1898 "shadow" => MigrationMode::Shadow,
1899 "real" => MigrationMode::Real,
1900 "auto" => MigrationMode::Auto,
1901 _ => {
1902 return Ok(Json(serde_json::json!({
1903 "error": format!("Invalid migration mode: {}. Must be one of: mock, shadow, real, auto", request.mode)
1904 })));
1905 }
1906 };
1907
1908 let mut config = proxy_config.write().await;
1909 config.update_group_migration_mode(&group, mode);
1910
1911 Ok(Json(serde_json::json!({
1912 "group": group,
1913 "mode": format!("{:?}", mode).to_lowercase()
1914 })))
1915}
1916
1917async fn get_migration_groups(
1919 State(state): State<ManagementState>,
1920) -> Result<Json<serde_json::Value>, StatusCode> {
1921 let proxy_config = match &state.proxy_config {
1922 Some(config) => config,
1923 None => {
1924 return Ok(Json(serde_json::json!({
1925 "error": "Migration not configured. Proxy config not available."
1926 })));
1927 }
1928 };
1929
1930 let config = proxy_config.read().await;
1931 let groups = config.get_migration_groups();
1932
1933 let groups_json: serde_json::Map<String, serde_json::Value> = groups
1935 .into_iter()
1936 .map(|(name, info)| {
1937 (
1938 name,
1939 serde_json::json!({
1940 "name": info.name,
1941 "migration_mode": format!("{:?}", info.migration_mode).to_lowercase(),
1942 "route_count": info.route_count
1943 }),
1944 )
1945 })
1946 .collect();
1947
1948 Ok(Json(serde_json::json!(groups_json)))
1949}
1950
1951async fn get_migration_status(
1953 State(state): State<ManagementState>,
1954) -> Result<Json<serde_json::Value>, StatusCode> {
1955 let proxy_config = match &state.proxy_config {
1956 Some(config) => config,
1957 None => {
1958 return Ok(Json(serde_json::json!({
1959 "error": "Migration not configured. Proxy config not available."
1960 })));
1961 }
1962 };
1963
1964 let config = proxy_config.read().await;
1965 let routes = config.get_migration_routes();
1966 let groups = config.get_migration_groups();
1967
1968 let mut mock_count = 0;
1969 let mut shadow_count = 0;
1970 let mut real_count = 0;
1971 let mut auto_count = 0;
1972
1973 for route in &routes {
1974 match route.migration_mode {
1975 mockforge_core::proxy::config::MigrationMode::Mock => mock_count += 1,
1976 mockforge_core::proxy::config::MigrationMode::Shadow => shadow_count += 1,
1977 mockforge_core::proxy::config::MigrationMode::Real => real_count += 1,
1978 mockforge_core::proxy::config::MigrationMode::Auto => auto_count += 1,
1979 }
1980 }
1981
1982 Ok(Json(serde_json::json!({
1983 "total_routes": routes.len(),
1984 "mock_routes": mock_count,
1985 "shadow_routes": shadow_count,
1986 "real_routes": real_count,
1987 "auto_routes": auto_count,
1988 "total_groups": groups.len(),
1989 "migration_enabled": config.migration_enabled
1990 })))
1991}
1992
1993#[derive(Debug, Deserialize, Serialize)]
1997pub struct ProxyRuleRequest {
1998 pub pattern: String,
2000 #[serde(rename = "type")]
2002 pub rule_type: String,
2003 #[serde(default)]
2005 pub status_codes: Vec<u16>,
2006 pub body_transforms: Vec<BodyTransformRequest>,
2008 #[serde(default = "default_true")]
2010 pub enabled: bool,
2011}
2012
2013#[derive(Debug, Deserialize, Serialize)]
2015pub struct BodyTransformRequest {
2016 pub path: String,
2018 pub replace: String,
2020 #[serde(default)]
2022 pub operation: String,
2023}
2024
2025#[derive(Debug, Serialize)]
2027pub struct ProxyRuleResponse {
2028 pub id: usize,
2030 pub pattern: String,
2032 #[serde(rename = "type")]
2034 pub rule_type: String,
2035 pub status_codes: Vec<u16>,
2037 pub body_transforms: Vec<BodyTransformRequest>,
2039 pub enabled: bool,
2041}
2042
2043async fn list_proxy_rules(
2045 State(state): State<ManagementState>,
2046) -> Result<Json<serde_json::Value>, StatusCode> {
2047 let proxy_config = match &state.proxy_config {
2048 Some(config) => config,
2049 None => {
2050 return Ok(Json(serde_json::json!({
2051 "error": "Proxy not configured. Proxy config not available."
2052 })));
2053 }
2054 };
2055
2056 let config = proxy_config.read().await;
2057
2058 let mut rules: Vec<ProxyRuleResponse> = Vec::new();
2059
2060 for (idx, rule) in config.request_replacements.iter().enumerate() {
2062 rules.push(ProxyRuleResponse {
2063 id: idx,
2064 pattern: rule.pattern.clone(),
2065 rule_type: "request".to_string(),
2066 status_codes: Vec::new(),
2067 body_transforms: rule
2068 .body_transforms
2069 .iter()
2070 .map(|t| BodyTransformRequest {
2071 path: t.path.clone(),
2072 replace: t.replace.clone(),
2073 operation: format!("{:?}", t.operation).to_lowercase(),
2074 })
2075 .collect(),
2076 enabled: rule.enabled,
2077 });
2078 }
2079
2080 let request_count = config.request_replacements.len();
2082 for (idx, rule) in config.response_replacements.iter().enumerate() {
2083 rules.push(ProxyRuleResponse {
2084 id: request_count + idx,
2085 pattern: rule.pattern.clone(),
2086 rule_type: "response".to_string(),
2087 status_codes: rule.status_codes.clone(),
2088 body_transforms: rule
2089 .body_transforms
2090 .iter()
2091 .map(|t| BodyTransformRequest {
2092 path: t.path.clone(),
2093 replace: t.replace.clone(),
2094 operation: format!("{:?}", t.operation).to_lowercase(),
2095 })
2096 .collect(),
2097 enabled: rule.enabled,
2098 });
2099 }
2100
2101 Ok(Json(serde_json::json!({
2102 "rules": rules
2103 })))
2104}
2105
2106async fn create_proxy_rule(
2108 State(state): State<ManagementState>,
2109 Json(request): Json<ProxyRuleRequest>,
2110) -> Result<Json<serde_json::Value>, StatusCode> {
2111 let proxy_config = match &state.proxy_config {
2112 Some(config) => config,
2113 None => {
2114 return Ok(Json(serde_json::json!({
2115 "error": "Proxy not configured. Proxy config not available."
2116 })));
2117 }
2118 };
2119
2120 if request.body_transforms.is_empty() {
2122 return Ok(Json(serde_json::json!({
2123 "error": "At least one body transform is required"
2124 })));
2125 }
2126
2127 let body_transforms: Vec<BodyTransform> = request
2128 .body_transforms
2129 .iter()
2130 .map(|t| {
2131 let op = match t.operation.as_str() {
2132 "replace" => TransformOperation::Replace,
2133 "add" => TransformOperation::Add,
2134 "remove" => TransformOperation::Remove,
2135 _ => TransformOperation::Replace,
2136 };
2137 BodyTransform {
2138 path: t.path.clone(),
2139 replace: t.replace.clone(),
2140 operation: op,
2141 }
2142 })
2143 .collect();
2144
2145 let new_rule = BodyTransformRule {
2146 pattern: request.pattern.clone(),
2147 status_codes: request.status_codes.clone(),
2148 body_transforms,
2149 enabled: request.enabled,
2150 };
2151
2152 let mut config = proxy_config.write().await;
2153
2154 let rule_id = if request.rule_type == "request" {
2155 config.request_replacements.push(new_rule);
2156 config.request_replacements.len() - 1
2157 } else if request.rule_type == "response" {
2158 config.response_replacements.push(new_rule);
2159 config.request_replacements.len() + config.response_replacements.len() - 1
2160 } else {
2161 return Ok(Json(serde_json::json!({
2162 "error": format!("Invalid rule type: {}. Must be 'request' or 'response'", request.rule_type)
2163 })));
2164 };
2165
2166 Ok(Json(serde_json::json!({
2167 "id": rule_id,
2168 "message": "Rule created successfully"
2169 })))
2170}
2171
2172async fn get_proxy_rule(
2174 State(state): State<ManagementState>,
2175 Path(id): Path<String>,
2176) -> Result<Json<serde_json::Value>, StatusCode> {
2177 let proxy_config = match &state.proxy_config {
2178 Some(config) => config,
2179 None => {
2180 return Ok(Json(serde_json::json!({
2181 "error": "Proxy not configured. Proxy config not available."
2182 })));
2183 }
2184 };
2185
2186 let config = proxy_config.read().await;
2187 let rule_id: usize = match id.parse() {
2188 Ok(id) => id,
2189 Err(_) => {
2190 return Ok(Json(serde_json::json!({
2191 "error": format!("Invalid rule ID: {}", id)
2192 })));
2193 }
2194 };
2195
2196 let request_count = config.request_replacements.len();
2197
2198 if rule_id < request_count {
2199 let rule = &config.request_replacements[rule_id];
2201 Ok(Json(serde_json::json!({
2202 "id": rule_id,
2203 "pattern": rule.pattern,
2204 "type": "request",
2205 "status_codes": [],
2206 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2207 "path": t.path,
2208 "replace": t.replace,
2209 "operation": format!("{:?}", t.operation).to_lowercase()
2210 })).collect::<Vec<_>>(),
2211 "enabled": rule.enabled
2212 })))
2213 } else if rule_id < request_count + config.response_replacements.len() {
2214 let response_idx = rule_id - request_count;
2216 let rule = &config.response_replacements[response_idx];
2217 Ok(Json(serde_json::json!({
2218 "id": rule_id,
2219 "pattern": rule.pattern,
2220 "type": "response",
2221 "status_codes": rule.status_codes,
2222 "body_transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2223 "path": t.path,
2224 "replace": t.replace,
2225 "operation": format!("{:?}", t.operation).to_lowercase()
2226 })).collect::<Vec<_>>(),
2227 "enabled": rule.enabled
2228 })))
2229 } else {
2230 Ok(Json(serde_json::json!({
2231 "error": format!("Rule ID {} not found", rule_id)
2232 })))
2233 }
2234}
2235
2236async fn update_proxy_rule(
2238 State(state): State<ManagementState>,
2239 Path(id): Path<String>,
2240 Json(request): Json<ProxyRuleRequest>,
2241) -> Result<Json<serde_json::Value>, StatusCode> {
2242 let proxy_config = match &state.proxy_config {
2243 Some(config) => config,
2244 None => {
2245 return Ok(Json(serde_json::json!({
2246 "error": "Proxy not configured. Proxy config not available."
2247 })));
2248 }
2249 };
2250
2251 let mut config = proxy_config.write().await;
2252 let rule_id: usize = match id.parse() {
2253 Ok(id) => id,
2254 Err(_) => {
2255 return Ok(Json(serde_json::json!({
2256 "error": format!("Invalid rule ID: {}", id)
2257 })));
2258 }
2259 };
2260
2261 let body_transforms: Vec<BodyTransform> = request
2262 .body_transforms
2263 .iter()
2264 .map(|t| {
2265 let op = match t.operation.as_str() {
2266 "replace" => TransformOperation::Replace,
2267 "add" => TransformOperation::Add,
2268 "remove" => TransformOperation::Remove,
2269 _ => TransformOperation::Replace,
2270 };
2271 BodyTransform {
2272 path: t.path.clone(),
2273 replace: t.replace.clone(),
2274 operation: op,
2275 }
2276 })
2277 .collect();
2278
2279 let updated_rule = BodyTransformRule {
2280 pattern: request.pattern.clone(),
2281 status_codes: request.status_codes.clone(),
2282 body_transforms,
2283 enabled: request.enabled,
2284 };
2285
2286 let request_count = config.request_replacements.len();
2287
2288 if rule_id < request_count {
2289 config.request_replacements[rule_id] = updated_rule;
2291 } else if rule_id < request_count + config.response_replacements.len() {
2292 let response_idx = rule_id - request_count;
2294 config.response_replacements[response_idx] = updated_rule;
2295 } else {
2296 return Ok(Json(serde_json::json!({
2297 "error": format!("Rule ID {} not found", rule_id)
2298 })));
2299 }
2300
2301 Ok(Json(serde_json::json!({
2302 "id": rule_id,
2303 "message": "Rule updated successfully"
2304 })))
2305}
2306
2307async fn delete_proxy_rule(
2309 State(state): State<ManagementState>,
2310 Path(id): Path<String>,
2311) -> Result<Json<serde_json::Value>, StatusCode> {
2312 let proxy_config = match &state.proxy_config {
2313 Some(config) => config,
2314 None => {
2315 return Ok(Json(serde_json::json!({
2316 "error": "Proxy not configured. Proxy config not available."
2317 })));
2318 }
2319 };
2320
2321 let mut config = proxy_config.write().await;
2322 let rule_id: usize = match id.parse() {
2323 Ok(id) => id,
2324 Err(_) => {
2325 return Ok(Json(serde_json::json!({
2326 "error": format!("Invalid rule ID: {}", id)
2327 })));
2328 }
2329 };
2330
2331 let request_count = config.request_replacements.len();
2332
2333 if rule_id < request_count {
2334 config.request_replacements.remove(rule_id);
2336 } else if rule_id < request_count + config.response_replacements.len() {
2337 let response_idx = rule_id - request_count;
2339 config.response_replacements.remove(response_idx);
2340 } else {
2341 return Ok(Json(serde_json::json!({
2342 "error": format!("Rule ID {} not found", rule_id)
2343 })));
2344 }
2345
2346 Ok(Json(serde_json::json!({
2347 "id": rule_id,
2348 "message": "Rule deleted successfully"
2349 })))
2350}
2351
2352async fn get_proxy_inspect(
2354 State(state): State<ManagementState>,
2355 Query(params): Query<std::collections::HashMap<String, String>>,
2356) -> Result<Json<serde_json::Value>, StatusCode> {
2357 let limit: usize = params.get("limit").and_then(|s| s.parse().ok()).unwrap_or(50);
2358 let offset: usize = params.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
2359
2360 let proxy_config = match &state.proxy_config {
2361 Some(config) => config.read().await,
2362 None => {
2363 return Ok(Json(serde_json::json!({
2364 "error": "Proxy not configured. Proxy config not available."
2365 })));
2366 }
2367 };
2368
2369 let mut rules = Vec::new();
2370 for (idx, rule) in proxy_config.request_replacements.iter().enumerate() {
2371 rules.push(serde_json::json!({
2372 "id": idx,
2373 "kind": "request",
2374 "pattern": rule.pattern,
2375 "enabled": rule.enabled,
2376 "status_codes": rule.status_codes,
2377 "transform_count": rule.body_transforms.len(),
2378 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2379 "path": t.path,
2380 "operation": t.operation,
2381 "replace": t.replace
2382 })).collect::<Vec<_>>()
2383 }));
2384 }
2385 let request_rule_count = rules.len();
2386 for (idx, rule) in proxy_config.response_replacements.iter().enumerate() {
2387 rules.push(serde_json::json!({
2388 "id": request_rule_count + idx,
2389 "kind": "response",
2390 "pattern": rule.pattern,
2391 "enabled": rule.enabled,
2392 "status_codes": rule.status_codes,
2393 "transform_count": rule.body_transforms.len(),
2394 "transforms": rule.body_transforms.iter().map(|t| serde_json::json!({
2395 "path": t.path,
2396 "operation": t.operation,
2397 "replace": t.replace
2398 })).collect::<Vec<_>>()
2399 }));
2400 }
2401
2402 let total = rules.len();
2403 let paged_rules: Vec<_> = rules.into_iter().skip(offset).take(limit).collect();
2404
2405 Ok(Json(serde_json::json!({
2406 "enabled": proxy_config.enabled,
2407 "target_url": proxy_config.target_url,
2408 "prefix": proxy_config.prefix,
2409 "timeout_seconds": proxy_config.timeout_seconds,
2410 "follow_redirects": proxy_config.follow_redirects,
2411 "passthrough_by_default": proxy_config.passthrough_by_default,
2412 "rules": paged_rules,
2413 "request_rule_count": request_rule_count,
2414 "response_rule_count": total.saturating_sub(request_rule_count),
2415 "limit": limit,
2416 "offset": offset,
2417 "total": total
2418 })))
2419}
2420
2421pub fn management_router(state: ManagementState) -> Router {
2423 let router = Router::new()
2424 .route("/health", get(health_check))
2425 .route("/stats", get(get_stats))
2426 .route("/config", get(get_config))
2427 .route("/config/validate", post(validate_config))
2428 .route("/config/bulk", post(bulk_update_config))
2429 .route("/mocks", get(list_mocks))
2430 .route("/mocks", post(create_mock))
2431 .route("/mocks/{id}", get(get_mock))
2432 .route("/mocks/{id}", put(update_mock))
2433 .route("/mocks/{id}", delete(delete_mock))
2434 .route("/export", get(export_mocks))
2435 .route("/import", post(import_mocks));
2436
2437 #[cfg(feature = "smtp")]
2438 let router = router
2439 .route("/smtp/mailbox", get(list_smtp_emails))
2440 .route("/smtp/mailbox", delete(clear_smtp_mailbox))
2441 .route("/smtp/mailbox/{id}", get(get_smtp_email))
2442 .route("/smtp/mailbox/export", get(export_smtp_mailbox))
2443 .route("/smtp/mailbox/search", get(search_smtp_emails));
2444
2445 #[cfg(not(feature = "smtp"))]
2446 let router = router;
2447
2448 #[cfg(feature = "mqtt")]
2450 let router = router
2451 .route("/mqtt/stats", get(get_mqtt_stats))
2452 .route("/mqtt/clients", get(get_mqtt_clients))
2453 .route("/mqtt/topics", get(get_mqtt_topics))
2454 .route("/mqtt/clients/{client_id}", delete(disconnect_mqtt_client))
2455 .route("/mqtt/messages/stream", get(mqtt_messages_stream))
2456 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2457 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2458
2459 #[cfg(not(feature = "mqtt"))]
2460 let router = router
2461 .route("/mqtt/publish", post(publish_mqtt_message_handler))
2462 .route("/mqtt/publish/batch", post(publish_mqtt_batch_handler));
2463
2464 #[cfg(feature = "kafka")]
2465 let router = router
2466 .route("/kafka/stats", get(get_kafka_stats))
2467 .route("/kafka/topics", get(get_kafka_topics))
2468 .route("/kafka/topics/{topic}", get(get_kafka_topic))
2469 .route("/kafka/groups", get(get_kafka_groups))
2470 .route("/kafka/groups/{group_id}", get(get_kafka_group))
2471 .route("/kafka/produce", post(produce_kafka_message))
2472 .route("/kafka/produce/batch", post(produce_kafka_batch))
2473 .route("/kafka/messages/stream", get(kafka_messages_stream));
2474
2475 #[cfg(not(feature = "kafka"))]
2476 let router = router;
2477
2478 let router = router
2480 .route("/migration/routes", get(get_migration_routes))
2481 .route("/migration/routes/{pattern}/toggle", post(toggle_route_migration))
2482 .route("/migration/routes/{pattern}", put(set_route_migration_mode))
2483 .route("/migration/groups/{group}/toggle", post(toggle_group_migration))
2484 .route("/migration/groups/{group}", put(set_group_migration_mode))
2485 .route("/migration/groups", get(get_migration_groups))
2486 .route("/migration/status", get(get_migration_status));
2487
2488 let router = router
2490 .route("/proxy/rules", get(list_proxy_rules))
2491 .route("/proxy/rules", post(create_proxy_rule))
2492 .route("/proxy/rules/{id}", get(get_proxy_rule))
2493 .route("/proxy/rules/{id}", put(update_proxy_rule))
2494 .route("/proxy/rules/{id}", delete(delete_proxy_rule))
2495 .route("/proxy/inspect", get(get_proxy_inspect));
2496
2497 let router = router.route("/ai/generate-spec", post(generate_ai_spec));
2499
2500 let router = router.nest(
2502 "/snapshot-diff",
2503 crate::handlers::snapshot_diff::snapshot_diff_router(state.clone()),
2504 );
2505
2506 #[cfg(feature = "behavioral-cloning")]
2507 let router = router.route("/mockai/generate-openapi", post(generate_openapi_from_traffic));
2508
2509 let router = router
2510 .route("/mockai/learn", post(learn_from_examples))
2511 .route("/mockai/rules/explanations", get(list_rule_explanations))
2512 .route("/mockai/rules/{id}/explanation", get(get_rule_explanation))
2513 .route("/chaos/config", get(get_chaos_config))
2514 .route("/chaos/config", post(update_chaos_config))
2515 .route("/network/profiles", get(list_network_profiles))
2516 .route("/network/profile/apply", post(apply_network_profile));
2517
2518 let router =
2520 router.nest("/state-machines", crate::state_machine_api::create_state_machine_routes());
2521
2522 #[cfg(feature = "conformance")]
2524 let router = router.nest_service(
2525 "/conformance",
2526 crate::handlers::conformance::conformance_router(state.conformance_state.clone()),
2527 );
2528 #[cfg(not(feature = "conformance"))]
2529 let router = router;
2530
2531 router.with_state(state)
2532}
2533
2534#[cfg(feature = "kafka")]
2535#[derive(Debug, Clone, Serialize, Deserialize)]
2537pub struct KafkaBrokerStats {
2538 pub topics: usize,
2540 pub partitions: usize,
2542 pub consumer_groups: usize,
2544 pub messages_produced: u64,
2546 pub messages_consumed: u64,
2548}
2549
2550#[cfg(feature = "kafka")]
2551#[allow(missing_docs)]
2552#[derive(Debug, Clone, Serialize, Deserialize)]
2553pub struct KafkaTopicInfo {
2554 pub name: String,
2555 pub partitions: usize,
2556 pub replication_factor: i32,
2557}
2558
2559#[cfg(feature = "kafka")]
2560#[allow(missing_docs)]
2561#[derive(Debug, Clone, Serialize, Deserialize)]
2562pub struct KafkaConsumerGroupInfo {
2563 pub group_id: String,
2564 pub members: usize,
2565 pub state: String,
2566}
2567
2568#[cfg(feature = "kafka")]
2569async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
2571 if let Some(broker) = &state.kafka_broker {
2572 let topics = broker.topics.read().await;
2573 let consumer_groups = broker.consumer_groups.read().await;
2574
2575 let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
2576
2577 let metrics_snapshot = broker.metrics().snapshot();
2579
2580 let stats = KafkaBrokerStats {
2581 topics: topics.len(),
2582 partitions: total_partitions,
2583 consumer_groups: consumer_groups.groups().len(),
2584 messages_produced: metrics_snapshot.messages_produced_total,
2585 messages_consumed: metrics_snapshot.messages_consumed_total,
2586 };
2587
2588 Json(stats).into_response()
2589 } else {
2590 (
2591 StatusCode::SERVICE_UNAVAILABLE,
2592 Json(serde_json::json!({
2593 "error": "Kafka broker not available",
2594 "message": "Kafka broker is not enabled or not available."
2595 })),
2596 )
2597 .into_response()
2598 }
2599}
2600
2601#[cfg(feature = "kafka")]
2602async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
2604 if let Some(broker) = &state.kafka_broker {
2605 let topics = broker.topics.read().await;
2606 let topic_list: Vec<KafkaTopicInfo> = topics
2607 .iter()
2608 .map(|(name, topic)| KafkaTopicInfo {
2609 name: name.clone(),
2610 partitions: topic.partitions.len(),
2611 replication_factor: topic.config.replication_factor as i32,
2612 })
2613 .collect();
2614
2615 Json(serde_json::json!({
2616 "topics": topic_list
2617 }))
2618 .into_response()
2619 } else {
2620 (
2621 StatusCode::SERVICE_UNAVAILABLE,
2622 Json(serde_json::json!({
2623 "error": "Kafka broker not available",
2624 "message": "Kafka broker is not enabled or not available."
2625 })),
2626 )
2627 .into_response()
2628 }
2629}
2630
2631#[cfg(feature = "kafka")]
2632async fn get_kafka_topic(
2634 State(state): State<ManagementState>,
2635 Path(topic_name): Path<String>,
2636) -> impl IntoResponse {
2637 if let Some(broker) = &state.kafka_broker {
2638 let topics = broker.topics.read().await;
2639 if let Some(topic) = topics.get(&topic_name) {
2640 Json(serde_json::json!({
2641 "name": topic_name,
2642 "partitions": topic.partitions.len(),
2643 "replication_factor": topic.config.replication_factor,
2644 "partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
2645 "id": idx as i32,
2646 "leader": 0,
2647 "replicas": vec![0],
2648 "message_count": partition.messages.len()
2649 })).collect::<Vec<_>>()
2650 })).into_response()
2651 } else {
2652 (
2653 StatusCode::NOT_FOUND,
2654 Json(serde_json::json!({
2655 "error": "Topic not found",
2656 "topic": topic_name
2657 })),
2658 )
2659 .into_response()
2660 }
2661 } else {
2662 (
2663 StatusCode::SERVICE_UNAVAILABLE,
2664 Json(serde_json::json!({
2665 "error": "Kafka broker not available",
2666 "message": "Kafka broker is not enabled or not available."
2667 })),
2668 )
2669 .into_response()
2670 }
2671}
2672
2673#[cfg(feature = "kafka")]
2674async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
2676 if let Some(broker) = &state.kafka_broker {
2677 let consumer_groups = broker.consumer_groups.read().await;
2678 let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
2679 .groups()
2680 .iter()
2681 .map(|(group_id, group)| KafkaConsumerGroupInfo {
2682 group_id: group_id.clone(),
2683 members: group.members.len(),
2684 state: "Stable".to_string(), })
2686 .collect();
2687
2688 Json(serde_json::json!({
2689 "groups": groups
2690 }))
2691 .into_response()
2692 } else {
2693 (
2694 StatusCode::SERVICE_UNAVAILABLE,
2695 Json(serde_json::json!({
2696 "error": "Kafka broker not available",
2697 "message": "Kafka broker is not enabled or not available."
2698 })),
2699 )
2700 .into_response()
2701 }
2702}
2703
2704#[cfg(feature = "kafka")]
2705async fn get_kafka_group(
2707 State(state): State<ManagementState>,
2708 Path(group_id): Path<String>,
2709) -> impl IntoResponse {
2710 if let Some(broker) = &state.kafka_broker {
2711 let consumer_groups = broker.consumer_groups.read().await;
2712 if let Some(group) = consumer_groups.groups().get(&group_id) {
2713 Json(serde_json::json!({
2714 "group_id": group_id,
2715 "members": group.members.len(),
2716 "state": "Stable",
2717 "members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
2718 "member_id": member_id,
2719 "client_id": member.client_id,
2720 "assignments": member.assignment.iter().map(|a| serde_json::json!({
2721 "topic": a.topic,
2722 "partitions": a.partitions
2723 })).collect::<Vec<_>>()
2724 })).collect::<Vec<_>>(),
2725 "offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
2726 "topic": topic,
2727 "partition": partition,
2728 "offset": offset
2729 })).collect::<Vec<_>>()
2730 })).into_response()
2731 } else {
2732 (
2733 StatusCode::NOT_FOUND,
2734 Json(serde_json::json!({
2735 "error": "Consumer group not found",
2736 "group_id": group_id
2737 })),
2738 )
2739 .into_response()
2740 }
2741 } else {
2742 (
2743 StatusCode::SERVICE_UNAVAILABLE,
2744 Json(serde_json::json!({
2745 "error": "Kafka broker not available",
2746 "message": "Kafka broker is not enabled or not available."
2747 })),
2748 )
2749 .into_response()
2750 }
2751}
2752
2753#[cfg(feature = "kafka")]
2756#[derive(Debug, Deserialize)]
2758pub struct KafkaProduceRequest {
2759 pub topic: String,
2761 #[serde(default)]
2763 pub key: Option<String>,
2764 pub value: String,
2766 #[serde(default)]
2768 pub partition: Option<i32>,
2769 #[serde(default)]
2771 pub headers: Option<std::collections::HashMap<String, String>>,
2772}
2773
2774#[cfg(feature = "kafka")]
2775async fn produce_kafka_message(
2777 State(state): State<ManagementState>,
2778 Json(request): Json<KafkaProduceRequest>,
2779) -> impl IntoResponse {
2780 if let Some(broker) = &state.kafka_broker {
2781 let mut topics = broker.topics.write().await;
2782
2783 let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
2785 mockforge_kafka::topics::Topic::new(
2786 request.topic.clone(),
2787 mockforge_kafka::topics::TopicConfig::default(),
2788 )
2789 });
2790
2791 let partition_id = if let Some(partition) = request.partition {
2793 partition
2794 } else {
2795 topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
2796 };
2797
2798 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2800 return (
2801 StatusCode::BAD_REQUEST,
2802 Json(serde_json::json!({
2803 "error": "Invalid partition",
2804 "message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2805 })),
2806 )
2807 .into_response();
2808 }
2809
2810 let key_clone = request.key.clone();
2812 let headers_clone = request.headers.clone();
2813 let message = mockforge_kafka::partitions::KafkaMessage {
2814 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
2816 key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
2817 value: request.value.as_bytes().to_vec(),
2818 headers: headers_clone
2819 .clone()
2820 .unwrap_or_default()
2821 .into_iter()
2822 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2823 .collect(),
2824 };
2825
2826 match topic_entry.produce(partition_id, message).await {
2828 Ok(offset) => {
2829 if let Some(broker) = &state.kafka_broker {
2831 broker.metrics().record_messages_produced(1);
2832 }
2833
2834 #[cfg(feature = "kafka")]
2836 {
2837 let event = MessageEvent::Kafka(KafkaMessageEvent {
2838 topic: request.topic.clone(),
2839 key: key_clone,
2840 value: request.value.clone(),
2841 partition: partition_id,
2842 offset,
2843 headers: headers_clone,
2844 timestamp: chrono::Utc::now().to_rfc3339(),
2845 });
2846 let _ = state.message_events.send(event);
2847 }
2848
2849 Json(serde_json::json!({
2850 "success": true,
2851 "message": format!("Message produced to topic '{}'", request.topic),
2852 "topic": request.topic,
2853 "partition": partition_id,
2854 "offset": offset
2855 }))
2856 .into_response()
2857 }
2858 Err(e) => (
2859 StatusCode::INTERNAL_SERVER_ERROR,
2860 Json(serde_json::json!({
2861 "error": "Failed to produce message",
2862 "message": e.to_string()
2863 })),
2864 )
2865 .into_response(),
2866 }
2867 } else {
2868 (
2869 StatusCode::SERVICE_UNAVAILABLE,
2870 Json(serde_json::json!({
2871 "error": "Kafka broker not available",
2872 "message": "Kafka broker is not enabled or not available."
2873 })),
2874 )
2875 .into_response()
2876 }
2877}
2878
2879#[cfg(feature = "kafka")]
2880#[derive(Debug, Deserialize)]
2882pub struct KafkaBatchProduceRequest {
2883 pub messages: Vec<KafkaProduceRequest>,
2885 #[serde(default = "default_delay")]
2887 pub delay_ms: u64,
2888}
2889
2890#[cfg(feature = "kafka")]
2891async fn produce_kafka_batch(
2893 State(state): State<ManagementState>,
2894 Json(request): Json<KafkaBatchProduceRequest>,
2895) -> impl IntoResponse {
2896 if let Some(broker) = &state.kafka_broker {
2897 if request.messages.is_empty() {
2898 return (
2899 StatusCode::BAD_REQUEST,
2900 Json(serde_json::json!({
2901 "error": "Empty batch",
2902 "message": "At least one message is required"
2903 })),
2904 )
2905 .into_response();
2906 }
2907
2908 let mut results = Vec::new();
2909
2910 for (index, msg_request) in request.messages.iter().enumerate() {
2911 let mut topics = broker.topics.write().await;
2912
2913 let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
2915 mockforge_kafka::topics::Topic::new(
2916 msg_request.topic.clone(),
2917 mockforge_kafka::topics::TopicConfig::default(),
2918 )
2919 });
2920
2921 let partition_id = if let Some(partition) = msg_request.partition {
2923 partition
2924 } else {
2925 topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
2926 };
2927
2928 if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
2930 results.push(serde_json::json!({
2931 "index": index,
2932 "success": false,
2933 "error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
2934 }));
2935 continue;
2936 }
2937
2938 let message = mockforge_kafka::partitions::KafkaMessage {
2940 offset: 0,
2941 timestamp: chrono::Utc::now().timestamp_millis(),
2942 key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
2943 value: msg_request.value.as_bytes().to_vec(),
2944 headers: msg_request
2945 .headers
2946 .clone()
2947 .unwrap_or_default()
2948 .into_iter()
2949 .map(|(k, v)| (k, v.as_bytes().to_vec()))
2950 .collect(),
2951 };
2952
2953 match topic_entry.produce(partition_id, message).await {
2955 Ok(offset) => {
2956 if let Some(broker) = &state.kafka_broker {
2958 broker.metrics().record_messages_produced(1);
2959 }
2960
2961 let event = MessageEvent::Kafka(KafkaMessageEvent {
2963 topic: msg_request.topic.clone(),
2964 key: msg_request.key.clone(),
2965 value: msg_request.value.clone(),
2966 partition: partition_id,
2967 offset,
2968 headers: msg_request.headers.clone(),
2969 timestamp: chrono::Utc::now().to_rfc3339(),
2970 });
2971 let _ = state.message_events.send(event);
2972
2973 results.push(serde_json::json!({
2974 "index": index,
2975 "success": true,
2976 "topic": msg_request.topic,
2977 "partition": partition_id,
2978 "offset": offset
2979 }));
2980 }
2981 Err(e) => {
2982 results.push(serde_json::json!({
2983 "index": index,
2984 "success": false,
2985 "error": e.to_string()
2986 }));
2987 }
2988 }
2989
2990 if index < request.messages.len() - 1 && request.delay_ms > 0 {
2992 tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
2993 }
2994 }
2995
2996 let success_count =
2997 results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
2998
2999 Json(serde_json::json!({
3000 "success": true,
3001 "total": request.messages.len(),
3002 "succeeded": success_count,
3003 "failed": request.messages.len() - success_count,
3004 "results": results
3005 }))
3006 .into_response()
3007 } else {
3008 (
3009 StatusCode::SERVICE_UNAVAILABLE,
3010 Json(serde_json::json!({
3011 "error": "Kafka broker not available",
3012 "message": "Kafka broker is not enabled or not available."
3013 })),
3014 )
3015 .into_response()
3016 }
3017}
3018
3019#[cfg(feature = "mqtt")]
3022async fn mqtt_messages_stream(
3024 State(state): State<ManagementState>,
3025 Query(params): Query<std::collections::HashMap<String, String>>,
3026) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3027 let rx = state.message_events.subscribe();
3028 let topic_filter = params.get("topic").cloned();
3029
3030 let stream = stream::unfold(rx, move |mut rx| {
3031 let topic_filter = topic_filter.clone();
3032
3033 async move {
3034 loop {
3035 match rx.recv().await {
3036 Ok(MessageEvent::Mqtt(event)) => {
3037 if let Some(filter) = &topic_filter {
3039 if !event.topic.contains(filter) {
3040 continue;
3041 }
3042 }
3043
3044 let event_json = serde_json::json!({
3045 "protocol": "mqtt",
3046 "topic": event.topic,
3047 "payload": event.payload,
3048 "qos": event.qos,
3049 "retain": event.retain,
3050 "timestamp": event.timestamp,
3051 });
3052
3053 if let Ok(event_data) = serde_json::to_string(&event_json) {
3054 let sse_event = Event::default().event("mqtt_message").data(event_data);
3055 return Some((Ok(sse_event), rx));
3056 }
3057 }
3058 #[cfg(feature = "kafka")]
3059 Ok(MessageEvent::Kafka(_)) => {
3060 continue;
3062 }
3063 Err(broadcast::error::RecvError::Closed) => {
3064 return None;
3065 }
3066 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3067 warn!("MQTT message stream lagged, skipped {} messages", skipped);
3068 continue;
3069 }
3070 }
3071 }
3072 }
3073 });
3074
3075 Sse::new(stream).keep_alive(
3076 axum::response::sse::KeepAlive::new()
3077 .interval(std::time::Duration::from_secs(15))
3078 .text("keep-alive-text"),
3079 )
3080}
3081
3082#[cfg(feature = "kafka")]
3083async fn kafka_messages_stream(
3085 State(state): State<ManagementState>,
3086 Query(params): Query<std::collections::HashMap<String, String>>,
3087) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
3088 let rx = state.message_events.subscribe();
3089 let topic_filter = params.get("topic").cloned();
3090
3091 let stream = stream::unfold(rx, move |mut rx| {
3092 let topic_filter = topic_filter.clone();
3093
3094 async move {
3095 loop {
3096 match rx.recv().await {
3097 #[cfg(feature = "mqtt")]
3098 Ok(MessageEvent::Mqtt(_)) => {
3099 continue;
3101 }
3102 Ok(MessageEvent::Kafka(event)) => {
3103 if let Some(filter) = &topic_filter {
3105 if !event.topic.contains(filter) {
3106 continue;
3107 }
3108 }
3109
3110 let event_json = serde_json::json!({
3111 "protocol": "kafka",
3112 "topic": event.topic,
3113 "key": event.key,
3114 "value": event.value,
3115 "partition": event.partition,
3116 "offset": event.offset,
3117 "headers": event.headers,
3118 "timestamp": event.timestamp,
3119 });
3120
3121 if let Ok(event_data) = serde_json::to_string(&event_json) {
3122 let sse_event =
3123 Event::default().event("kafka_message").data(event_data);
3124 return Some((Ok(sse_event), rx));
3125 }
3126 }
3127 Err(broadcast::error::RecvError::Closed) => {
3128 return None;
3129 }
3130 Err(broadcast::error::RecvError::Lagged(skipped)) => {
3131 warn!("Kafka message stream lagged, skipped {} messages", skipped);
3132 continue;
3133 }
3134 }
3135 }
3136 }
3137 });
3138
3139 Sse::new(stream).keep_alive(
3140 axum::response::sse::KeepAlive::new()
3141 .interval(std::time::Duration::from_secs(15))
3142 .text("keep-alive-text"),
3143 )
3144}
3145
3146#[derive(Debug, Deserialize)]
3150pub struct GenerateSpecRequest {
3151 pub query: String,
3153 pub spec_type: String,
3155 pub api_version: Option<String>,
3157}
3158
3159#[derive(Debug, Deserialize)]
3161pub struct GenerateOpenApiFromTrafficRequest {
3162 #[serde(default)]
3164 pub database_path: Option<String>,
3165 #[serde(default)]
3167 pub since: Option<String>,
3168 #[serde(default)]
3170 pub until: Option<String>,
3171 #[serde(default)]
3173 pub path_pattern: Option<String>,
3174 #[serde(default = "default_min_confidence")]
3176 pub min_confidence: f64,
3177}
3178
3179fn default_min_confidence() -> f64 {
3180 0.7
3181}
3182
3183#[cfg(feature = "data-faker")]
3185async fn generate_ai_spec(
3186 State(_state): State<ManagementState>,
3187 Json(request): Json<GenerateSpecRequest>,
3188) -> impl IntoResponse {
3189 use mockforge_data::rag::{
3190 config::{LlmProvider, RagConfig},
3191 engine::RagEngine,
3192 storage::DocumentStorage,
3193 };
3194 use std::sync::Arc;
3195
3196 let api_key = std::env::var("MOCKFORGE_RAG_API_KEY")
3198 .ok()
3199 .or_else(|| std::env::var("OPENAI_API_KEY").ok());
3200
3201 if api_key.is_none() {
3203 return (
3204 StatusCode::SERVICE_UNAVAILABLE,
3205 Json(serde_json::json!({
3206 "error": "AI service not configured",
3207 "message": "Please provide an API key via MOCKFORGE_RAG_API_KEY or OPENAI_API_KEY"
3208 })),
3209 )
3210 .into_response();
3211 }
3212
3213 let provider_str = std::env::var("MOCKFORGE_RAG_PROVIDER")
3215 .unwrap_or_else(|_| "openai".to_string())
3216 .to_lowercase();
3217
3218 let provider = match provider_str.as_str() {
3219 "openai" => LlmProvider::OpenAI,
3220 "anthropic" => LlmProvider::Anthropic,
3221 "ollama" => LlmProvider::Ollama,
3222 "openai-compatible" | "openai_compatible" => LlmProvider::OpenAICompatible,
3223 _ => LlmProvider::OpenAI,
3224 };
3225
3226 let api_endpoint =
3227 std::env::var("MOCKFORGE_RAG_API_ENDPOINT").unwrap_or_else(|_| match provider {
3228 LlmProvider::OpenAI => "https://api.openai.com/v1".to_string(),
3229 LlmProvider::Anthropic => "https://api.anthropic.com/v1".to_string(),
3230 LlmProvider::Ollama => "http://localhost:11434/api".to_string(),
3231 LlmProvider::OpenAICompatible => "http://localhost:8000/v1".to_string(),
3232 });
3233
3234 let model = std::env::var("MOCKFORGE_RAG_MODEL").unwrap_or_else(|_| match provider {
3235 LlmProvider::OpenAI => "gpt-3.5-turbo".to_string(),
3236 LlmProvider::Anthropic => "claude-3-sonnet-20240229".to_string(),
3237 LlmProvider::Ollama => "llama2".to_string(),
3238 LlmProvider::OpenAICompatible => "gpt-3.5-turbo".to_string(),
3239 });
3240
3241 let rag_config = RagConfig {
3243 provider,
3244 api_endpoint,
3245 api_key,
3246 model,
3247 max_tokens: std::env::var("MOCKFORGE_RAG_MAX_TOKENS")
3248 .unwrap_or_else(|_| "4096".to_string())
3249 .parse()
3250 .unwrap_or(4096),
3251 temperature: std::env::var("MOCKFORGE_RAG_TEMPERATURE")
3252 .unwrap_or_else(|_| "0.3".to_string())
3253 .parse()
3254 .unwrap_or(0.3), timeout_secs: std::env::var("MOCKFORGE_RAG_TIMEOUT")
3256 .unwrap_or_else(|_| "60".to_string())
3257 .parse()
3258 .unwrap_or(60),
3259 max_context_length: std::env::var("MOCKFORGE_RAG_CONTEXT_WINDOW")
3260 .unwrap_or_else(|_| "4000".to_string())
3261 .parse()
3262 .unwrap_or(4000),
3263 ..Default::default()
3264 };
3265
3266 let spec_type_label = match request.spec_type.as_str() {
3268 "openapi" => "OpenAPI 3.0",
3269 "graphql" => "GraphQL",
3270 "asyncapi" => "AsyncAPI",
3271 _ => "OpenAPI 3.0",
3272 };
3273
3274 let api_version = request.api_version.as_deref().unwrap_or("3.0.0");
3275
3276 let prompt = format!(
3277 r#"You are an expert API architect. Generate a complete {} specification based on the following user requirements.
3278
3279User Requirements:
3280{}
3281
3282Instructions:
32831. Generate a complete, valid {} specification
32842. Include all paths, operations, request/response schemas, and components
32853. Use realistic field names and data types
32864. Include proper descriptions and examples
32875. Follow {} best practices
32886. Return ONLY the specification, no additional explanation
32897. For OpenAPI, use version {}
3290
3291Return the specification in {} format."#,
3292 spec_type_label,
3293 request.query,
3294 spec_type_label,
3295 spec_type_label,
3296 api_version,
3297 if request.spec_type == "graphql" {
3298 "GraphQL SDL"
3299 } else {
3300 "YAML"
3301 }
3302 );
3303
3304 use mockforge_data::rag::storage::InMemoryStorage;
3309 let storage: Arc<dyn DocumentStorage> = Arc::new(InMemoryStorage::new());
3310
3311 let mut rag_engine = match RagEngine::new(rag_config.clone(), storage) {
3313 Ok(engine) => engine,
3314 Err(e) => {
3315 return (
3316 StatusCode::INTERNAL_SERVER_ERROR,
3317 Json(serde_json::json!({
3318 "error": "Failed to initialize RAG engine",
3319 "message": e.to_string()
3320 })),
3321 )
3322 .into_response();
3323 }
3324 };
3325
3326 match rag_engine.generate(&prompt, None).await {
3328 Ok(generated_text) => {
3329 let spec = if request.spec_type == "graphql" {
3331 extract_graphql_schema(&generated_text)
3333 } else {
3334 extract_yaml_spec(&generated_text)
3336 };
3337
3338 Json(serde_json::json!({
3339 "success": true,
3340 "spec": spec,
3341 "spec_type": request.spec_type,
3342 }))
3343 .into_response()
3344 }
3345 Err(e) => (
3346 StatusCode::INTERNAL_SERVER_ERROR,
3347 Json(serde_json::json!({
3348 "error": "AI generation failed",
3349 "message": e.to_string()
3350 })),
3351 )
3352 .into_response(),
3353 }
3354}
3355
3356#[cfg(not(feature = "data-faker"))]
3357async fn generate_ai_spec(
3358 State(_state): State<ManagementState>,
3359 Json(_request): Json<GenerateSpecRequest>,
3360) -> impl IntoResponse {
3361 (
3362 StatusCode::NOT_IMPLEMENTED,
3363 Json(serde_json::json!({
3364 "error": "AI features not enabled",
3365 "message": "Please enable the 'data-faker' feature to use AI-powered specification generation"
3366 })),
3367 )
3368 .into_response()
3369}
3370
3371#[cfg(feature = "behavioral-cloning")]
3373async fn generate_openapi_from_traffic(
3374 State(_state): State<ManagementState>,
3375 Json(request): Json<GenerateOpenApiFromTrafficRequest>,
3376) -> impl IntoResponse {
3377 use chrono::{DateTime, Utc};
3378 use mockforge_core::intelligent_behavior::{
3379 openapi_generator::{OpenApiGenerationConfig, OpenApiSpecGenerator},
3380 IntelligentBehaviorConfig,
3381 };
3382 use mockforge_recorder::{
3383 database::RecorderDatabase,
3384 openapi_export::{QueryFilters, RecordingsToOpenApi},
3385 };
3386 use std::path::PathBuf;
3387
3388 let db_path = if let Some(ref path) = request.database_path {
3390 PathBuf::from(path)
3391 } else {
3392 std::env::current_dir()
3393 .unwrap_or_else(|_| PathBuf::from("."))
3394 .join("recordings.db")
3395 };
3396
3397 let db = match RecorderDatabase::new(&db_path).await {
3399 Ok(db) => db,
3400 Err(e) => {
3401 return (
3402 StatusCode::BAD_REQUEST,
3403 Json(serde_json::json!({
3404 "error": "Database error",
3405 "message": format!("Failed to open recorder database: {}", e)
3406 })),
3407 )
3408 .into_response();
3409 }
3410 };
3411
3412 let since_dt = if let Some(ref since_str) = request.since {
3414 match DateTime::parse_from_rfc3339(since_str) {
3415 Ok(dt) => Some(dt.with_timezone(&Utc)),
3416 Err(e) => {
3417 return (
3418 StatusCode::BAD_REQUEST,
3419 Json(serde_json::json!({
3420 "error": "Invalid date format",
3421 "message": format!("Invalid --since format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3422 })),
3423 )
3424 .into_response();
3425 }
3426 }
3427 } else {
3428 None
3429 };
3430
3431 let until_dt = if let Some(ref until_str) = request.until {
3432 match DateTime::parse_from_rfc3339(until_str) {
3433 Ok(dt) => Some(dt.with_timezone(&Utc)),
3434 Err(e) => {
3435 return (
3436 StatusCode::BAD_REQUEST,
3437 Json(serde_json::json!({
3438 "error": "Invalid date format",
3439 "message": format!("Invalid --until format: {}. Use ISO 8601 format (e.g., 2025-01-01T00:00:00Z)", e)
3440 })),
3441 )
3442 .into_response();
3443 }
3444 }
3445 } else {
3446 None
3447 };
3448
3449 let query_filters = QueryFilters {
3451 since: since_dt,
3452 until: until_dt,
3453 path_pattern: request.path_pattern.clone(),
3454 min_status_code: None,
3455 max_requests: Some(1000),
3456 };
3457
3458 let exchanges_from_recorder =
3463 match RecordingsToOpenApi::query_http_exchanges(&db, Some(query_filters)).await {
3464 Ok(exchanges) => exchanges,
3465 Err(e) => {
3466 return (
3467 StatusCode::INTERNAL_SERVER_ERROR,
3468 Json(serde_json::json!({
3469 "error": "Query error",
3470 "message": format!("Failed to query HTTP exchanges: {}", e)
3471 })),
3472 )
3473 .into_response();
3474 }
3475 };
3476
3477 if exchanges_from_recorder.is_empty() {
3478 return (
3479 StatusCode::NOT_FOUND,
3480 Json(serde_json::json!({
3481 "error": "No exchanges found",
3482 "message": "No HTTP exchanges found matching the specified filters"
3483 })),
3484 )
3485 .into_response();
3486 }
3487
3488 use mockforge_core::intelligent_behavior::openapi_generator::HttpExchange as LocalHttpExchange;
3490 let exchanges: Vec<LocalHttpExchange> = exchanges_from_recorder
3491 .into_iter()
3492 .map(|e| LocalHttpExchange {
3493 method: e.method,
3494 path: e.path,
3495 query_params: e.query_params,
3496 headers: e.headers,
3497 body: e.body,
3498 body_encoding: e.body_encoding,
3499 status_code: e.status_code,
3500 response_headers: e.response_headers,
3501 response_body: e.response_body,
3502 response_body_encoding: e.response_body_encoding,
3503 timestamp: e.timestamp,
3504 })
3505 .collect();
3506
3507 let behavior_config = IntelligentBehaviorConfig::default();
3509 let gen_config = OpenApiGenerationConfig {
3510 min_confidence: request.min_confidence,
3511 behavior_model: Some(behavior_config.behavior_model),
3512 };
3513
3514 let generator = OpenApiSpecGenerator::new(gen_config);
3516 let result = match generator.generate_from_exchanges(exchanges).await {
3517 Ok(result) => result,
3518 Err(e) => {
3519 return (
3520 StatusCode::INTERNAL_SERVER_ERROR,
3521 Json(serde_json::json!({
3522 "error": "Generation error",
3523 "message": format!("Failed to generate OpenAPI spec: {}", e)
3524 })),
3525 )
3526 .into_response();
3527 }
3528 };
3529
3530 let spec_json = if let Some(ref raw) = result.spec.raw_document {
3532 raw.clone()
3533 } else {
3534 match serde_json::to_value(&result.spec.spec) {
3535 Ok(json) => json,
3536 Err(e) => {
3537 return (
3538 StatusCode::INTERNAL_SERVER_ERROR,
3539 Json(serde_json::json!({
3540 "error": "Serialization error",
3541 "message": format!("Failed to serialize OpenAPI spec: {}", e)
3542 })),
3543 )
3544 .into_response();
3545 }
3546 }
3547 };
3548
3549 let response = serde_json::json!({
3551 "spec": spec_json,
3552 "metadata": {
3553 "requests_analyzed": result.metadata.requests_analyzed,
3554 "paths_inferred": result.metadata.paths_inferred,
3555 "path_confidence": result.metadata.path_confidence,
3556 "generated_at": result.metadata.generated_at.to_rfc3339(),
3557 "duration_ms": result.metadata.duration_ms,
3558 }
3559 });
3560
3561 Json(response).into_response()
3562}
3563
3564async fn list_rule_explanations(
3566 State(state): State<ManagementState>,
3567 Query(params): Query<std::collections::HashMap<String, String>>,
3568) -> impl IntoResponse {
3569 use mockforge_core::intelligent_behavior::RuleType;
3570
3571 let explanations = state.rule_explanations.read().await;
3572 let mut explanations_vec: Vec<_> = explanations.values().cloned().collect();
3573
3574 if let Some(rule_type_str) = params.get("rule_type") {
3576 if let Ok(rule_type) = serde_json::from_str::<RuleType>(&format!("\"{}\"", rule_type_str)) {
3577 explanations_vec.retain(|e| e.rule_type == rule_type);
3578 }
3579 }
3580
3581 if let Some(min_confidence_str) = params.get("min_confidence") {
3583 if let Ok(min_confidence) = min_confidence_str.parse::<f64>() {
3584 explanations_vec.retain(|e| e.confidence >= min_confidence);
3585 }
3586 }
3587
3588 explanations_vec.sort_by(|a, b| {
3590 b.confidence
3591 .partial_cmp(&a.confidence)
3592 .unwrap_or(std::cmp::Ordering::Equal)
3593 .then_with(|| b.generated_at.cmp(&a.generated_at))
3594 });
3595
3596 Json(serde_json::json!({
3597 "explanations": explanations_vec,
3598 "total": explanations_vec.len(),
3599 }))
3600 .into_response()
3601}
3602
3603async fn get_rule_explanation(
3605 State(state): State<ManagementState>,
3606 Path(rule_id): Path<String>,
3607) -> impl IntoResponse {
3608 let explanations = state.rule_explanations.read().await;
3609
3610 match explanations.get(&rule_id) {
3611 Some(explanation) => Json(serde_json::json!({
3612 "explanation": explanation,
3613 }))
3614 .into_response(),
3615 None => (
3616 StatusCode::NOT_FOUND,
3617 Json(serde_json::json!({
3618 "error": "Rule explanation not found",
3619 "message": format!("No explanation found for rule ID: {}", rule_id)
3620 })),
3621 )
3622 .into_response(),
3623 }
3624}
3625
3626#[derive(Debug, Deserialize)]
3628pub struct LearnFromExamplesRequest {
3629 pub examples: Vec<ExamplePairRequest>,
3631 #[serde(default)]
3633 pub config: Option<serde_json::Value>,
3634}
3635
3636#[derive(Debug, Deserialize)]
3638pub struct ExamplePairRequest {
3639 pub request: serde_json::Value,
3641 pub response: serde_json::Value,
3643}
3644
3645async fn learn_from_examples(
3650 State(state): State<ManagementState>,
3651 Json(request): Json<LearnFromExamplesRequest>,
3652) -> impl IntoResponse {
3653 use mockforge_core::intelligent_behavior::{
3654 config::{BehaviorModelConfig, IntelligentBehaviorConfig},
3655 rule_generator::{ExamplePair, RuleGenerator},
3656 };
3657
3658 if request.examples.is_empty() {
3659 return (
3660 StatusCode::BAD_REQUEST,
3661 Json(serde_json::json!({
3662 "error": "No examples provided",
3663 "message": "At least one example pair is required"
3664 })),
3665 )
3666 .into_response();
3667 }
3668
3669 let example_pairs: Result<Vec<ExamplePair>, String> = request
3671 .examples
3672 .into_iter()
3673 .enumerate()
3674 .map(|(idx, ex)| {
3675 let method = ex
3677 .request
3678 .get("method")
3679 .and_then(|v| v.as_str())
3680 .map(|s| s.to_string())
3681 .unwrap_or_else(|| "GET".to_string());
3682 let path = ex
3683 .request
3684 .get("path")
3685 .and_then(|v| v.as_str())
3686 .map(|s| s.to_string())
3687 .unwrap_or_else(|| "/".to_string());
3688 let request_body = ex.request.get("body").cloned();
3689 let query_params = ex
3690 .request
3691 .get("query_params")
3692 .and_then(|v| v.as_object())
3693 .map(|obj| {
3694 obj.iter()
3695 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3696 .collect()
3697 })
3698 .unwrap_or_default();
3699 let headers = ex
3700 .request
3701 .get("headers")
3702 .and_then(|v| v.as_object())
3703 .map(|obj| {
3704 obj.iter()
3705 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
3706 .collect()
3707 })
3708 .unwrap_or_default();
3709
3710 let status = ex
3712 .response
3713 .get("status_code")
3714 .or_else(|| ex.response.get("status"))
3715 .and_then(|v| v.as_u64())
3716 .map(|n| n as u16)
3717 .unwrap_or(200);
3718 let response_body = ex.response.get("body").cloned();
3719
3720 Ok(ExamplePair {
3721 method,
3722 path,
3723 request: request_body,
3724 status,
3725 response: response_body,
3726 query_params,
3727 headers,
3728 metadata: {
3729 let mut meta = std::collections::HashMap::new();
3730 meta.insert("source".to_string(), "api".to_string());
3731 meta.insert("example_index".to_string(), idx.to_string());
3732 meta
3733 },
3734 })
3735 })
3736 .collect();
3737
3738 let example_pairs = match example_pairs {
3739 Ok(pairs) => pairs,
3740 Err(e) => {
3741 return (
3742 StatusCode::BAD_REQUEST,
3743 Json(serde_json::json!({
3744 "error": "Invalid examples",
3745 "message": e
3746 })),
3747 )
3748 .into_response();
3749 }
3750 };
3751
3752 let behavior_config = if let Some(config_json) = request.config {
3754 serde_json::from_value(config_json)
3756 .unwrap_or_else(|_| IntelligentBehaviorConfig::default())
3757 .behavior_model
3758 } else {
3759 BehaviorModelConfig::default()
3760 };
3761
3762 let generator = RuleGenerator::new(behavior_config);
3764
3765 let (rules, explanations) =
3767 match generator.generate_rules_with_explanations(example_pairs).await {
3768 Ok(result) => result,
3769 Err(e) => {
3770 return (
3771 StatusCode::INTERNAL_SERVER_ERROR,
3772 Json(serde_json::json!({
3773 "error": "Rule generation failed",
3774 "message": format!("Failed to generate rules: {}", e)
3775 })),
3776 )
3777 .into_response();
3778 }
3779 };
3780
3781 {
3783 let mut stored_explanations = state.rule_explanations.write().await;
3784 for explanation in &explanations {
3785 stored_explanations.insert(explanation.rule_id.clone(), explanation.clone());
3786 }
3787 }
3788
3789 let response = serde_json::json!({
3791 "success": true,
3792 "rules_generated": {
3793 "consistency_rules": rules.consistency_rules.len(),
3794 "schemas": rules.schemas.len(),
3795 "state_machines": rules.state_transitions.len(),
3796 "system_prompt": !rules.system_prompt.is_empty(),
3797 },
3798 "explanations": explanations.iter().map(|e| serde_json::json!({
3799 "rule_id": e.rule_id,
3800 "rule_type": e.rule_type,
3801 "confidence": e.confidence,
3802 "reasoning": e.reasoning,
3803 })).collect::<Vec<_>>(),
3804 "total_explanations": explanations.len(),
3805 });
3806
3807 Json(response).into_response()
3808}
3809
3810#[cfg(feature = "data-faker")]
3811fn extract_yaml_spec(text: &str) -> String {
3812 if let Some(start) = text.find("```yaml") {
3814 let yaml_start = text[start + 7..].trim_start();
3815 if let Some(end) = yaml_start.find("```") {
3816 return yaml_start[..end].trim().to_string();
3817 }
3818 }
3819 if let Some(start) = text.find("```") {
3820 let content_start = text[start + 3..].trim_start();
3821 if let Some(end) = content_start.find("```") {
3822 return content_start[..end].trim().to_string();
3823 }
3824 }
3825
3826 if text.trim_start().starts_with("openapi:") || text.trim_start().starts_with("asyncapi:") {
3828 return text.trim().to_string();
3829 }
3830
3831 text.trim().to_string()
3833}
3834
3835#[cfg(feature = "data-faker")]
3837fn extract_graphql_schema(text: &str) -> String {
3838 if let Some(start) = text.find("```graphql") {
3840 let schema_start = text[start + 10..].trim_start();
3841 if let Some(end) = schema_start.find("```") {
3842 return schema_start[..end].trim().to_string();
3843 }
3844 }
3845 if let Some(start) = text.find("```") {
3846 let content_start = text[start + 3..].trim_start();
3847 if let Some(end) = content_start.find("```") {
3848 return content_start[..end].trim().to_string();
3849 }
3850 }
3851
3852 if text.trim_start().starts_with("type ") || text.trim_start().starts_with("schema ") {
3854 return text.trim().to_string();
3855 }
3856
3857 text.trim().to_string()
3858}
3859
3860async fn get_chaos_config(State(_state): State<ManagementState>) -> impl IntoResponse {
3864 #[cfg(feature = "chaos")]
3865 {
3866 if let Some(chaos_state) = &_state.chaos_api_state {
3867 let config = chaos_state.config.read().await;
3868 Json(serde_json::json!({
3870 "enabled": config.enabled,
3871 "latency": config.latency.as_ref().map(|l| serde_json::to_value(l).unwrap_or(serde_json::Value::Null)),
3872 "fault_injection": config.fault_injection.as_ref().map(|f| serde_json::to_value(f).unwrap_or(serde_json::Value::Null)),
3873 "rate_limit": config.rate_limit.as_ref().map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null)),
3874 "traffic_shaping": config.traffic_shaping.as_ref().map(|t| serde_json::to_value(t).unwrap_or(serde_json::Value::Null)),
3875 }))
3876 .into_response()
3877 } else {
3878 Json(serde_json::json!({
3880 "enabled": false,
3881 "latency": null,
3882 "fault_injection": null,
3883 "rate_limit": null,
3884 "traffic_shaping": null,
3885 }))
3886 .into_response()
3887 }
3888 }
3889 #[cfg(not(feature = "chaos"))]
3890 {
3891 Json(serde_json::json!({
3893 "enabled": false,
3894 "latency": null,
3895 "fault_injection": null,
3896 "rate_limit": null,
3897 "traffic_shaping": null,
3898 }))
3899 .into_response()
3900 }
3901}
3902
3903#[derive(Debug, Deserialize)]
3905pub struct ChaosConfigUpdate {
3906 pub enabled: Option<bool>,
3908 pub latency: Option<serde_json::Value>,
3910 pub fault_injection: Option<serde_json::Value>,
3912 pub rate_limit: Option<serde_json::Value>,
3914 pub traffic_shaping: Option<serde_json::Value>,
3916}
3917
3918async fn update_chaos_config(
3920 State(_state): State<ManagementState>,
3921 Json(_config_update): Json<ChaosConfigUpdate>,
3922) -> impl IntoResponse {
3923 #[cfg(feature = "chaos")]
3924 {
3925 if let Some(chaos_state) = &_state.chaos_api_state {
3926 use mockforge_chaos::config::{
3927 FaultInjectionConfig, LatencyConfig, RateLimitConfig, TrafficShapingConfig,
3928 };
3929
3930 let mut config = chaos_state.config.write().await;
3931
3932 if let Some(enabled) = _config_update.enabled {
3934 config.enabled = enabled;
3935 }
3936
3937 if let Some(latency_json) = _config_update.latency {
3939 if let Ok(latency) = serde_json::from_value::<LatencyConfig>(latency_json) {
3940 config.latency = Some(latency);
3941 }
3942 }
3943
3944 if let Some(fault_json) = _config_update.fault_injection {
3946 if let Ok(fault) = serde_json::from_value::<FaultInjectionConfig>(fault_json) {
3947 config.fault_injection = Some(fault);
3948 }
3949 }
3950
3951 if let Some(rate_json) = _config_update.rate_limit {
3953 if let Ok(rate) = serde_json::from_value::<RateLimitConfig>(rate_json) {
3954 config.rate_limit = Some(rate);
3955 }
3956 }
3957
3958 if let Some(traffic_json) = _config_update.traffic_shaping {
3960 if let Ok(traffic) = serde_json::from_value::<TrafficShapingConfig>(traffic_json) {
3961 config.traffic_shaping = Some(traffic);
3962 }
3963 }
3964
3965 drop(config);
3968
3969 info!("Chaos configuration updated successfully");
3970 Json(serde_json::json!({
3971 "success": true,
3972 "message": "Chaos configuration updated and applied"
3973 }))
3974 .into_response()
3975 } else {
3976 (
3977 StatusCode::SERVICE_UNAVAILABLE,
3978 Json(serde_json::json!({
3979 "success": false,
3980 "error": "Chaos API not available",
3981 "message": "Chaos engineering is not enabled or configured"
3982 })),
3983 )
3984 .into_response()
3985 }
3986 }
3987 #[cfg(not(feature = "chaos"))]
3988 {
3989 (
3990 StatusCode::NOT_IMPLEMENTED,
3991 Json(serde_json::json!({
3992 "success": false,
3993 "error": "Chaos feature not enabled",
3994 "message": "Chaos engineering feature is not compiled into this build"
3995 })),
3996 )
3997 .into_response()
3998 }
3999}
4000
4001async fn list_network_profiles() -> impl IntoResponse {
4005 use mockforge_core::network_profiles::NetworkProfileCatalog;
4006
4007 let catalog = NetworkProfileCatalog::default();
4008 let profiles: Vec<serde_json::Value> = catalog
4009 .list_profiles_with_description()
4010 .iter()
4011 .map(|(name, description)| {
4012 serde_json::json!({
4013 "name": name,
4014 "description": description,
4015 })
4016 })
4017 .collect();
4018
4019 Json(serde_json::json!({
4020 "profiles": profiles
4021 }))
4022 .into_response()
4023}
4024
4025#[derive(Debug, Deserialize)]
4026pub struct ApplyNetworkProfileRequest {
4028 pub profile_name: String,
4030}
4031
4032async fn apply_network_profile(
4034 State(state): State<ManagementState>,
4035 Json(request): Json<ApplyNetworkProfileRequest>,
4036) -> impl IntoResponse {
4037 use mockforge_core::network_profiles::NetworkProfileCatalog;
4038
4039 let catalog = NetworkProfileCatalog::default();
4040 if let Some(profile) = catalog.get(&request.profile_name) {
4041 if let Some(server_config) = &state.server_config {
4044 let mut config = server_config.write().await;
4045
4046 use mockforge_core::config::NetworkShapingConfig;
4048
4049 let network_shaping = NetworkShapingConfig {
4053 enabled: profile.traffic_shaping.bandwidth.enabled
4054 || profile.traffic_shaping.burst_loss.enabled,
4055 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4057 max_connections: 1000, };
4059
4060 if let Some(ref mut chaos) = config.observability.chaos {
4063 chaos.traffic_shaping = Some(network_shaping);
4064 } else {
4065 use mockforge_core::config::ChaosEngConfig;
4067 config.observability.chaos = Some(ChaosEngConfig {
4068 enabled: true,
4069 latency: None,
4070 fault_injection: None,
4071 rate_limit: None,
4072 traffic_shaping: Some(network_shaping),
4073 scenario: None,
4074 });
4075 }
4076
4077 info!("Network profile '{}' applied to server configuration", request.profile_name);
4078 } else {
4079 warn!("Server configuration not available in ManagementState - profile applied but not persisted");
4080 }
4081
4082 #[cfg(feature = "chaos")]
4084 {
4085 if let Some(chaos_state) = &state.chaos_api_state {
4086 use mockforge_chaos::config::TrafficShapingConfig;
4087
4088 let mut chaos_config = chaos_state.config.write().await;
4089 let chaos_traffic_shaping = TrafficShapingConfig {
4091 enabled: profile.traffic_shaping.bandwidth.enabled
4092 || profile.traffic_shaping.burst_loss.enabled,
4093 bandwidth_limit_bps: profile.traffic_shaping.bandwidth.max_bytes_per_sec * 8, packet_loss_percent: profile.traffic_shaping.burst_loss.loss_rate_during_burst,
4095 max_connections: 0,
4096 connection_timeout_ms: 30000,
4097 };
4098 chaos_config.traffic_shaping = Some(chaos_traffic_shaping);
4099 chaos_config.enabled = true; drop(chaos_config);
4101 info!("Network profile '{}' applied to chaos API state", request.profile_name);
4102 }
4103 }
4104
4105 Json(serde_json::json!({
4106 "success": true,
4107 "message": format!("Network profile '{}' applied", request.profile_name),
4108 "profile": {
4109 "name": profile.name,
4110 "description": profile.description,
4111 }
4112 }))
4113 .into_response()
4114 } else {
4115 (
4116 StatusCode::NOT_FOUND,
4117 Json(serde_json::json!({
4118 "error": "Profile not found",
4119 "message": format!("Network profile '{}' not found", request.profile_name)
4120 })),
4121 )
4122 .into_response()
4123 }
4124}
4125
4126pub fn management_router_with_ui_builder(
4128 state: ManagementState,
4129 server_config: mockforge_core::config::ServerConfig,
4130) -> Router {
4131 use crate::ui_builder::{create_ui_builder_router, UIBuilderState};
4132
4133 let management = management_router(state);
4135
4136 let ui_builder_state = UIBuilderState::new(server_config);
4138 let ui_builder = create_ui_builder_router(ui_builder_state);
4139
4140 management.nest("/ui-builder", ui_builder)
4142}
4143
4144pub fn management_router_with_spec_import(state: ManagementState) -> Router {
4146 use crate::spec_import::{spec_import_router, SpecImportState};
4147
4148 let management = management_router(state);
4150
4151 Router::new()
4153 .merge(management)
4154 .merge(spec_import_router(SpecImportState::new()))
4155}
4156
4157#[cfg(test)]
4158mod tests {
4159 use super::*;
4160
4161 #[tokio::test]
4162 async fn test_create_and_get_mock() {
4163 let state = ManagementState::new(None, None, 3000);
4164
4165 let mock = MockConfig {
4166 id: "test-1".to_string(),
4167 name: "Test Mock".to_string(),
4168 method: "GET".to_string(),
4169 path: "/test".to_string(),
4170 response: MockResponse {
4171 body: serde_json::json!({"message": "test"}),
4172 headers: None,
4173 },
4174 enabled: true,
4175 latency_ms: None,
4176 status_code: Some(200),
4177 request_match: None,
4178 priority: None,
4179 scenario: None,
4180 required_scenario_state: None,
4181 new_scenario_state: None,
4182 };
4183
4184 {
4186 let mut mocks = state.mocks.write().await;
4187 mocks.push(mock.clone());
4188 }
4189
4190 let mocks = state.mocks.read().await;
4192 let found = mocks.iter().find(|m| m.id == "test-1");
4193 assert!(found.is_some());
4194 assert_eq!(found.unwrap().name, "Test Mock");
4195 }
4196
4197 #[tokio::test]
4198 async fn test_server_stats() {
4199 let state = ManagementState::new(None, None, 3000);
4200
4201 {
4203 let mut mocks = state.mocks.write().await;
4204 mocks.push(MockConfig {
4205 id: "1".to_string(),
4206 name: "Mock 1".to_string(),
4207 method: "GET".to_string(),
4208 path: "/test1".to_string(),
4209 response: MockResponse {
4210 body: serde_json::json!({}),
4211 headers: None,
4212 },
4213 enabled: true,
4214 latency_ms: None,
4215 status_code: Some(200),
4216 request_match: None,
4217 priority: None,
4218 scenario: None,
4219 required_scenario_state: None,
4220 new_scenario_state: None,
4221 });
4222 mocks.push(MockConfig {
4223 id: "2".to_string(),
4224 name: "Mock 2".to_string(),
4225 method: "POST".to_string(),
4226 path: "/test2".to_string(),
4227 response: MockResponse {
4228 body: serde_json::json!({}),
4229 headers: None,
4230 },
4231 enabled: false,
4232 latency_ms: None,
4233 status_code: Some(201),
4234 request_match: None,
4235 priority: None,
4236 scenario: None,
4237 required_scenario_state: None,
4238 new_scenario_state: None,
4239 });
4240 }
4241
4242 let mocks = state.mocks.read().await;
4243 assert_eq!(mocks.len(), 2);
4244 assert_eq!(mocks.iter().filter(|m| m.enabled).count(), 1);
4245 }
4246
4247 #[test]
4248 fn test_mock_matches_request_with_xpath_absolute_path() {
4249 let mock = MockConfig {
4250 id: "xpath-1".to_string(),
4251 name: "XPath Match".to_string(),
4252 method: "POST".to_string(),
4253 path: "/xml".to_string(),
4254 response: MockResponse {
4255 body: serde_json::json!({"ok": true}),
4256 headers: None,
4257 },
4258 enabled: true,
4259 latency_ms: None,
4260 status_code: Some(200),
4261 request_match: Some(RequestMatchCriteria {
4262 xpath: Some("/root/order/id".to_string()),
4263 ..Default::default()
4264 }),
4265 priority: None,
4266 scenario: None,
4267 required_scenario_state: None,
4268 new_scenario_state: None,
4269 };
4270
4271 let body = br#"<root><order><id>123</id></order></root>"#;
4272 let headers = std::collections::HashMap::new();
4273 let query = std::collections::HashMap::new();
4274
4275 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4276 }
4277
4278 #[test]
4279 fn test_mock_matches_request_with_xpath_text_predicate() {
4280 let mock = MockConfig {
4281 id: "xpath-2".to_string(),
4282 name: "XPath Predicate Match".to_string(),
4283 method: "POST".to_string(),
4284 path: "/xml".to_string(),
4285 response: MockResponse {
4286 body: serde_json::json!({"ok": true}),
4287 headers: None,
4288 },
4289 enabled: true,
4290 latency_ms: None,
4291 status_code: Some(200),
4292 request_match: Some(RequestMatchCriteria {
4293 xpath: Some("//order/id[text()='123']".to_string()),
4294 ..Default::default()
4295 }),
4296 priority: None,
4297 scenario: None,
4298 required_scenario_state: None,
4299 new_scenario_state: None,
4300 };
4301
4302 let body = br#"<root><order><id>123</id></order></root>"#;
4303 let headers = std::collections::HashMap::new();
4304 let query = std::collections::HashMap::new();
4305
4306 assert!(mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4307 }
4308
4309 #[test]
4310 fn test_mock_matches_request_with_xpath_no_match() {
4311 let mock = MockConfig {
4312 id: "xpath-3".to_string(),
4313 name: "XPath No Match".to_string(),
4314 method: "POST".to_string(),
4315 path: "/xml".to_string(),
4316 response: MockResponse {
4317 body: serde_json::json!({"ok": true}),
4318 headers: None,
4319 },
4320 enabled: true,
4321 latency_ms: None,
4322 status_code: Some(200),
4323 request_match: Some(RequestMatchCriteria {
4324 xpath: Some("//order/id[text()='456']".to_string()),
4325 ..Default::default()
4326 }),
4327 priority: None,
4328 scenario: None,
4329 required_scenario_state: None,
4330 new_scenario_state: None,
4331 };
4332
4333 let body = br#"<root><order><id>123</id></order></root>"#;
4334 let headers = std::collections::HashMap::new();
4335 let query = std::collections::HashMap::new();
4336
4337 assert!(!mock_matches_request(&mock, "POST", "/xml", &headers, &query, Some(body)));
4338 }
4339}